Create a Kubeflow Pipeline to Execute an End-to-End AI Training Workflow with Built-in Traceability and Versioning
Contributors
Download PDF of this page
To define and execute a new Kubeflow Pipeline that takes advantage of NetApp Snapshot technology in order to integrate rapid and efficient dataset and model versioning and traceability into an end-to-end AI/ML model training workflow, perform the following tasks. For more information about Kubeflow pipelines, see the official Kubeflow documentation. Note that the example pipeline that is shown in this section only works with volumes that reside on ONTAP storage systems or software-defined instances.
-
Create a Kubernetes secret containing the username and password of the cluster admin account for the ONTAP cluster on which your volumes reside. This secret must be created in the
kubeflow
namespace because this is the namespace that pipelines are executed in. Note that you must replaceusername
andpassword
with your username and password when executing these commands, and you must use the output of the base64 commands (see highlighted text) in your secret definition accordingly.$ echo -n 'username' | base64 dXNlcm5hbWU= $ echo -n 'password' | base64 cGFzc3dvcmQ= $ cat << EOF > ./secret-ontap-cluster-mgmt-account.yaml apiVersion: v1 kind: Secret metadata: name: ontap-cluster-mgmt-account namespace: kubeflow data: username: dXNlcm5hbWU= password: cGFzc3dvcmQ= EOF $ kubectl create -f ./secret-ontap-cluster-mgmt-account.yaml secret/ontap-cluster-mgmt-account created
-
If the volume containing the data that you plan to use to train your model is not tied to a PVC in the
kubeflow
namespace, then you must import this volume into that namespace. Use the Trident volume import functionality to import this volume. The volume must be imported into thekubeflow
namespace because this is the namespace that pipelines are executed in.If your dataset volume is already tied to a PVC in the
kubeflow
namespace, then you can skip this step. If you do not yet have a dataset volume, then you must provision one and then transfer your data to it. See the section Provision a New Volume for an example showing how to provision a new volume with Trident.The example commands that follow show the importing of an existing FlexVol volume, named
dataset_vol
, into thekubeflow
namespace. For more information about PVCs, see the official Kubernetes documentation. For more information about the volume import functionality, see the Trident documentation. For a detailed example showing the importing of a volume using Trident, see the section Import an Existing Volume.$ cat << EOF > ./pvc-import- dataset-vol-kubeflow.yaml kind: PersistentVolumeClaim apiVersion: v1 metadata: name: dataset-vol namespace: kubeflow spec: accessModes: - ReadWriteMany storageClassName: ontap-ai-flexvols-retain EOF $ tridentctl import volume ontap-ai-flexvols dataset_vol -f ./pvc-import- dataset-vol- kubeflow.yaml -n trident +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+ | NAME | SIZE | STORAGE CLASS | PROTOCOL | BACKEND UUID | STATE | MANAGED | +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+ | pvc-3c70ad14-d88f-11e9-b5e2-00505681f3d9 | 10 TiB | ontap-ai-flexvols-retain | file | 2942d386-afcf-462e-bf89-1d2aa3376a7b | online | true | +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+ $ kubectl get pvc -n kubeflow NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE imagenet-benchmark-job-gblgq-kfpresults Bound pvc-a4e32212-d65c-11e9-a043-00505681a82d 1Gi RWX ontap-ai-flexvols-retain 2d19h katib-mysql Bound pvc-b07f293e-d028-11e9-9b9d-00505681a82d 10Gi RWO ontap-ai-flexvols-retain 10d dataset-vol Bound pvc-43b12235-f32e-4dc4-a7b8-88e90d935a12 10Ti ROX ontap-ai-flexvols-retain 8s metadata-mysql Bound pvc-b0f3f032-d028-11e9-9b9d-00505681a82d 10Gi RWO ontap-ai-flexvols-retain 10d minio-pv-claim Bound pvc-b22727ee-d028-11e9-9b9d-00505681a82d 20Gi RWO ontap-ai-flexvols-retain 10d mysql-pv-claim Bound pvc-b2429afd-d028-11e9-9b9d-00505681a82d 20Gi RWO ontap-ai-flexvols-retain 10d
-
If the volume on which you wish to store your trained model is not tied to a PVC in the
kubeflow
namespace, then you must import this volume into that namespace. Use the Trident volume import functionality to import this volume. The volume must be imported into thekubeflow
namespace because this is the namespace that pipelines are executed in.If your trained model volume is already tied to a PVC in the
kubeflow
namespace, then you can skip this step. If you do not yet have a trained model volume, then you must provision one. See the section Provision a New Volume for an example showing how to provision a new volume with Trident.The example commands that follow show the importing of an existing FlexVol volume, named
kfp_model_vol
, into thekubeflow
namespace. For more information about PVCs, see the official Kubernetes documentation. For more information about the volume import functionality, see the Trident documentation. For a detailed example showing the importing of a volume using Trident, see the section Import an Existing Volume.$ cat << EOF > ./pvc-import-dataset-vol-kubeflow.yaml kind: PersistentVolumeClaim apiVersion: v1 metadata: name: kfp-model-vol namespace: kubeflow spec: accessModes: - ReadWriteMany storageClassName: ontap-ai-flexvols-retain EOF $ tridentctl import volume ontap-ai-flexvols kfp_model_vol -f ./pvc-import- kfp-model-vol-kubeflow.yaml -n trident +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+ | NAME | SIZE | STORAGE CLASS | PROTOCOL | BACKEND UUID | STATE | MANAGED | +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+ | pvc-3c70ad14-d88f-11e9-b5e2-00505681f3d9 | 10 TiB | ontap-ai-flexvols-retain | file | 2942d386-afcf-462e-bf89-1d2aa3376a7b | online | true | +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+ $ kubectl get pvc -n kubeflow NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE imagenet-benchmark-job-gblgq-kfpresults Bound pvc-a4e32212-d65c-11e9-a043-00505681a82d 1Gi RWX ontap-ai-flexvols-retain 2d19h katib-mysql Bound pvc-b07f293e-d028-11e9-9b9d-00505681a82d 10Gi RWO ontap-ai-flexvols-retain 10d kfp-model-vol Bound pvc-236e893b-63b4-40d3-963b-e709b9b2816b 10Ti ROX ontap-ai-flexvols-retain 8s metadata-mysql Bound pvc-b0f3f032-d028-11e9-9b9d-00505681a82d 10Gi RWO ontap-ai-flexvols-retain 10d minio-pv-claim Bound pvc-b22727ee-d028-11e9-9b9d-00505681a82d 20Gi RWO ontap-ai-flexvols-retain 10d mysql-pv-claim Bound pvc-b2429afd-d028-11e9-9b9d-00505681a82d 20Gi RWO ontap-ai-flexvols-retain 10d
-
If you have not already done so, you must install the Kubeflow Pipelines SDK. See the official Kubeflow documentation for installation instructions.
-
Define your Kubeflow Pipeline in Python using the Kubeflow Pipelines SDK. The example commands that follow show the creation of a pipeline definition for a pipeline that will accept the following parameters at run-time and then execute the following steps. Modify the pipeline definition as needed depending on your specific process.
Run-time parameters:
-
ontap_cluster_mgmt_hostname
: Hostname or IP address of the ONTAP cluster on which your dataset and model volumes are stored. -
ontap_cluster_admin_acct_k8s_secret
: Name of the Kubernetes secret that was created in step 1. -
ontap_verify_ssl_cert
: Denotes whether to verify your cluster’s SSL certificate when communicating with the ONTAP API (True/False). -
dataset_volume_pvc_existing
: Name of the Kubernetes PersistentVolumeClaim (PVC) in thekubeflow
namespace that is tied to the volume that contains the data that you want to use to train your model. -
dataset_volume_pv_existing
: Name of the Kubernetes PersistentVolume (PV) object that corresponds to the dataset volume PVC. To get the name of the PV, you can runkubectl -n kubeflow get pvc
. The name of the PV that corresponds to a given PVC can be found in theVOLUME
column. -
trained_model_volume_pvc_existing
: Name of the Kubernetes PersistentVolumeClaim (PVC) in thekubeflow
namespace that is tied to the volume on which you want to store your trained model. -
trained_model_volume_pv_existing
: Name of the Kubernetes PersistentVolume (PV) object that corresponds to the trained model volume PVC. To get the name of the PV, you can runkubectl -n kubeflow get pvc
. The name of the PV that corresponds to a given PVC can be found in theVOLUME
column. -
execute_data_prep_step__yes_or_no
: Denotes whether you wish to execute a data prep step as part of this particular pipeline execution (yes/no). -
data_prep_step_container_image
: Container image in which you wish to execute your data prep step. -
data_prep_step_command
: Command that you want to execute as your data prep step. -
data_prep_step_dataset_volume_mountpoint
: Mountpoint at which you want to mount your dataset volume for your data prep step. -
train_step_container_image
: Container image in which you wish to execute your training step. -
train_step_command
: Command that you want to execute as your training step. -
train_step_dataset_volume_mountpoint
: Mountpoint at which you want to mount your dataset volume for your training step. -
train_step_model_volume_mountpoint
: Mountpoint at which you want to mount your model volume for your training step. -
validation_step_container_image
: Container image in which you wish to execute your validation step. -
validation_step_command
: Command that you want to execute as your validation step. -
validation_step_dataset_volume_mountpoint
: Mountpoint at which you want to mount your dataset volume for your validation step. -
validation_step_model_volume_mountpoint
: Mountpoint at which you want to mount your model volume for your validation step.Pipeline Steps:
-
Optional: Executes a data prep step.
-
Triggers the creation of a Snapshot copy, using NetApp Snapshot technology, of your dataset volume.
This Snapshot copy is created for traceability purposes. Each time that this pipeline workflow is executed, a Snapshot copy is created. Therefore, as long as the Snapshot copy is not deleted, it is always possible to trace a specific training run back to the exact training dataset that was used for that run.
-
Executes a training step.
-
Triggers the creation of a Snapshot copy, using NetApp Snapshot technology, of your trained model volume.
This Snapshot copy is created for versioning purposes. Each time that this pipeline workflow is executed, a Snapshot copy is created. Therefore, for each individual training run, a read-only versioned copy of the resulting trained model is automatically saved.
-
Executes a validation step.
$ cat << EOF > ./ai-training-run.py # Kubeflow Pipeline Definition: AI Training Run import kfp.dsl as dsl import kfp.onprem as onprem import kfp.components as comp from kubernetes import client as k8s_client # Define function that triggers the creation of a NetApp snapshot def netappSnapshot( ontapClusterMgmtHostname: str, pvName: str, verifySSLCert: bool = True ) -> str : # Install netapp_ontap package import sys, subprocess; subprocess.run([sys.executable, '-m', 'pip', 'install', 'netapp_ontap']) # Import needed functions/classes from netapp_ontap import config as netappConfig from netapp_ontap.host_connection import HostConnection as NetAppHostConnection from netapp_ontap.resources import Volume, Snapshot from datetime import datetime import json # Retrieve ONTAP cluster admin account details from mounted K8s secrets usernameSecret = open('/mnt/secret/username', 'r') ontapClusterAdminUsername = usernameSecret.read().strip() passwordSecret = open('/mnt/secret/password', 'r') ontapClusterAdminPassword = passwordSecret.read().strip() # Configure connection to ONTAP cluster/instance netappConfig.CONNECTION = NetAppHostConnection( host = ontapClusterMgmtHostname, username = ontapClusterAdminUsername, password = ontapClusterAdminPassword, verify = verifySSLCert ) # Convert pv name to ONTAP volume name # The following will not work if you specified a custom storagePrefix when creating your # Trident backend. If you specified a custom storagePrefix, you will need to update this # code to match your prefix. volumeName = 'trident_%s' % pvName.replace("-", "_") print('\npv name: ', pvName) print('ONTAP volume name: ', volumeName) # Create snapshot; print API response volume = Volume.find(name = volumeName) timestamp = datetime.today().strftime("%Y%m%d_%H%M%S") snapshot = Snapshot.from_dict({ 'name': 'kfp_%s' % timestamp, 'comment': 'Snapshot created by a Kubeflow pipeline', 'volume': volume.to_dict() }) response = snapshot.post() print("\nAPI Response:") print(response.http_response.text) # Retrieve snapshot details snapshot.get() # Convert snapshot details to JSON string and print snapshotDetails = snapshot.to_dict() print("\nSnapshot Details:") print(json.dumps(snapshotDetails, indent=2)) # Return name of newly created snapshot return snapshotDetails['name'] # Convert netappSnapshot function to Kubeflow Pipeline ContainerOp named 'NetappSnapshotOp' NetappSnapshotOp = comp.func_to_container_op(netappSnapshot, base_image='python:3') # Define Kubeflow Pipeline @dsl.pipeline( # Define pipeline metadata name="AI Training Run", description="Template for executing an AI training run with built-in training dataset traceability and trained model versioning" ) def ai_training_run( # Define variables that the user can set in the pipelines UI; set default values ontap_cluster_mgmt_hostname: str = "10.61.188.40", ontap_cluster_admin_acct_k8s_secret: str = "ontap-cluster-mgmt-account", ontap_api_verify_ssl_cert: bool = True, dataset_volume_pvc_existing: str = "dataset-vol", dataset_volume_pv_existing: str = "pvc-43b12235-f32e-4dc4-a7b8-88e90d935a12", trained_model_volume_pvc_existing: str = "kfp-model-vol", trained_model_volume_pv_existing: str = "pvc-236e893b-63b4-40d3-963b-e709b9b2816b", execute_data_prep_step__yes_or_no: str = "yes", data_prep_step_container_image: str = "ubuntu:bionic", data_prep_step_command: str = "<insert command here>", data_prep_step_dataset_volume_mountpoint: str = "/mnt/dataset", train_step_container_image: str = "nvcr.io/nvidia/tensorflow:19.12-tf1-py3", train_step_command: str = "<insert command here>", train_step_dataset_volume_mountpoint: str = "/mnt/dataset", train_step_model_volume_mountpoint: str = "/mnt/model", validation_step_container_image: str = "nvcr.io/nvidia/tensorflow:19.12-tf1-py3", validation_step_command: str = "<insert command here>", validation_step_dataset_volume_mountpoint: str = "/mnt/dataset", validation_step_model_volume_mountpoint: str = "/mnt/model" ) : # Set GPU limits; Due to SDK limitations, this must be hardcoded train_step_num_gpu = 0 validation_step_num_gpu = 0 # Pipeline Steps: # Execute data prep step with dsl.Condition(execute_data_prep_step__yes_or_no == "yes") : data_prep = dsl.ContainerOp( name="data-prep", image=data_prep_step_container_image, command=["sh", "-c"], arguments=[data_prep_step_command] ) # Mount dataset volume/pvc data_prep.apply( onprem.mount_pvc(dataset_volume_pvc_existing, 'dataset', data_prep_step_dataset_volume_mountpoint) ) # Create a snapshot of the dataset volume/pvc for traceability dataset_snapshot = NetappSnapshotOp( ontap_cluster_mgmt_hostname, dataset_volume_pv_existing, ontap_api_verify_ssl_cert ) # Mount k8s secret containing ONTAP cluster admin account details dataset_snapshot.add_pvolumes({ '/mnt/secret': k8s_client.V1Volume( name='ontap-cluster-admin', secret=k8s_client.V1SecretVolumeSource( secret_name=ontap_cluster_admin_acct_k8s_secret ) ) }) # State that snapshot should be created after the data prep job completes dataset_snapshot.after(data_prep) # Execute training step train = dsl.ContainerOp( name="train-model", image=train_step_container_image, command=["sh", "-c"], arguments=[train_step_command] ) # Mount dataset volume/pvc train.apply( onprem.mount_pvc(dataset_volume_pvc_existing, 'datavol', train_step_dataset_volume_mountpoint) ) # Mount model volume/pvc train.apply( onprem.mount_pvc(trained_model_volume_pvc_existing, 'modelvol', train_step_model_volume_mountpoint) ) # Request that GPUs be allocated to training job pod if train_step_num_gpu > 0 : train.set_gpu_limit(train_step_num_gpu, 'nvidia') # State that training job should be executed after dataset volume snapshot is taken train.after(dataset_snapshot) # Create a snapshot of the model volume/pvc for model versioning model_snapshot = NetappSnapshotOp( ontap_cluster_mgmt_hostname, trained_model_volume_pv_existing, ontap_api_verify_ssl_cert ) # Mount k8s secret containing ONTAP cluster admin account details model_snapshot.add_pvolumes({ '/mnt/secret': k8s_client.V1Volume( name='ontap-cluster-admin', secret=k8s_client.V1SecretVolumeSource( secret_name=ontap_cluster_admin_acct_k8s_secret ) ) }) # State that snapshot should be created after the training job completes model_snapshot.after(train) # Execute inference validation job inference_validation = dsl.ContainerOp( name="validate-model", image=validation_step_container_image, command=["sh", "-c"], arguments=[validation_step_command] ) # Mount dataset volume/pvc inference_validation.apply( onprem.mount_pvc(dataset_volume_pvc_existing, 'datavol', validation_step_dataset_volume_mountpoint) ) # Mount model volume/pvc inference_validation.apply( onprem.mount_pvc(trained_model_volume_pvc_existing, 'modelvol', validation_step_model_volume_mountpoint) ) # Request that GPUs be allocated to pod if validation_step_num_gpu > 0 : inference_validation.set_gpu_limit(validation_step_num_gpu, 'nvidia') # State that inference validation job should be executed after model volume snapshot is taken inference_validation.after(model_snapshot) if __name__ == "__main__" : import kfp.compiler as compiler compiler.Compiler().compile(ai_training_run, __file__ + ".yaml") EOF $ python3 ai-training-run.py $ ls ai-training-run. py. yaml ai-training-run. py. yaml
-
-
-
From the Kubeflow central dashboard, click Pipelines in the main menu to navigate to the Kubeflow Pipelines administration page.
-
Click Upload Pipeline to upload your pipeline definition.
-
Choose the
. yaml
archive containing your pipeline definition that you created in step 5, give your pipeline a name, and click Upload. -
You should now see your new pipeline in the list of pipelines on the pipeline administration page. Click your pipeline’s name to view it.
-
Review your pipeline to confirm that it looks correct.
-
Click Create run to run your pipeline.
-
You are now presented with a screen from which you can start a pipeline run. Create a name for the run, enter a description, choose an experiment to file the run under, and choose whether you want to initiate a one-off run or schedule a recurring run.
-
Define parameters for the run, and then click Start.
In the following example, the default values are accepted for most parameters. Details for the volume that was imported into the
kubeflow
namespace in step 2 are entered fordataset_volume_pvc_existing
anddataset_volume_pv_existing
. Details for the volume that was imported into thekubeflow
namespace in step 3 are entered fortrained_model_volume_pvc_existing
andtrained_model_volume_pv_existing
. Non-AI-related commands are entered for thedata_prep_step_command
,train_step_command
, andvalidation_step_command
parameters in order to plainly demonstrate the functionality of the pipeline. Note that you defined the default values for the parameters within your pipeline definition (see step 5). -
You are now presented with a screen listing all runs that fall under the specific experiment. Click the name of the run that you just started to view it.
At this point, the run is likely still in progress.
-
Confirm that the run completed successfully. When the run is complete, every stage of the pipeline shows a green checkmark icon.
-
Click a specific stage, and then click Logs to view output for that stage.