Create a Kubeflow Pipeline to Execute an End-to-End AI Training Workflow with Built-in Traceability and Versioning

Contributors netapp-dorianh kevin-hoke Download PDF of this page

To define and execute a new Kubeflow Pipeline that takes advantage of NetApp Snapshot technology in order to integrate rapid and efficient dataset and model versioning and traceability into an end-to-end AI/ML model training workflow, perform the following tasks. For more information about Kubeflow pipelines, see the official Kubeflow documentation. Note that the example pipeline that is shown in this section only works with volumes that reside on ONTAP storage systems or software-defined instances.

  1. Create a Kubernetes secret containing the username and password of the cluster admin account for the ONTAP cluster on which your volumes reside. This secret must be created in the kubeflow namespace because this is the namespace that pipelines are executed in. Note that you must replace username and password with your username and password when executing these commands, and you must use the output of the base64 commands (see highlighted text) in your secret definition accordingly.

    $ echo -n 'username' | base64
    dXNlcm5hbWU=
    $ echo -n 'password' | base64
    cGFzc3dvcmQ=
    $ cat << EOF > ./secret-ontap-cluster-mgmt-account.yaml
    apiVersion: v1
    kind: Secret
    metadata:
      name: ontap-cluster-mgmt-account
      namespace: kubeflow
    data:
      username: dXNlcm5hbWU=
      password: cGFzc3dvcmQ=
    EOF
    $ kubectl create -f ./secret-ontap-cluster-mgmt-account.yaml
    secret/ontap-cluster-mgmt-account created
  2. If the volume containing the data that you plan to use to train your model is not tied to a PVC in the kubeflow namespace, then you must import this volume into that namespace. Use the Trident volume import functionality to import this volume. The volume must be imported into the kubeflow namespace because this is the namespace that pipelines are executed in.

    If your dataset volume is already tied to a PVC in the kubeflow namespace, then you can skip this step. If you do not yet have a dataset volume, then you must provision one and then transfer your data to it. See the section Provision a New Volume for an example showing how to provision a new volume with Trident.

    The example commands that follow show the importing of an existing FlexVol volume, named dataset_vol, into the kubeflow namespace. For more information about PVCs, see the official Kubernetes documentation. For more information about the volume import functionality, see the Trident documentation. For a detailed example showing the importing of a volume using Trident, see the section Import an Existing Volume.

    $ cat << EOF > ./pvc-import- dataset-vol-kubeflow.yaml
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
      name: dataset-vol
      namespace: kubeflow
    spec:
      accessModes:
        - ReadWriteMany
      storageClassName: ontap-ai-flexvols-retain
    EOF
    $ tridentctl import volume ontap-ai-flexvols dataset_vol -f ./pvc-import- dataset-vol- kubeflow.yaml -n trident
    +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+
    |                   NAME                   |  SIZE  |      STORAGE CLASS       | PROTOCOL |             BACKEND UUID             | STATE  | MANAGED |
    +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+
    | pvc-3c70ad14-d88f-11e9-b5e2-00505681f3d9 | 10 TiB | ontap-ai-flexvols-retain | file     | 2942d386-afcf-462e-bf89-1d2aa3376a7b | online | true    |
    +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+
    $ kubectl get pvc -n kubeflow
    NAME                                      STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS               AGE
    imagenet-benchmark-job-gblgq-kfpresults   Bound    pvc-a4e32212-d65c-11e9-a043-00505681a82d   1Gi        RWX            ontap-ai-flexvols-retain   2d19h
    katib-mysql                               Bound    pvc-b07f293e-d028-11e9-9b9d-00505681a82d   10Gi       RWO            ontap-ai-flexvols-retain   10d
    dataset-vol                               Bound    pvc-43b12235-f32e-4dc4-a7b8-88e90d935a12   10Ti       ROX            ontap-ai-flexvols-retain   8s
    metadata-mysql                            Bound    pvc-b0f3f032-d028-11e9-9b9d-00505681a82d   10Gi       RWO            ontap-ai-flexvols-retain   10d
    minio-pv-claim                            Bound    pvc-b22727ee-d028-11e9-9b9d-00505681a82d   20Gi       RWO            ontap-ai-flexvols-retain   10d
    mysql-pv-claim                            Bound    pvc-b2429afd-d028-11e9-9b9d-00505681a82d   20Gi       RWO            ontap-ai-flexvols-retain   10d
  3. If the volume on which you wish to store your trained model is not tied to a PVC in the kubeflow namespace, then you must import this volume into that namespace. Use the Trident volume import functionality to import this volume. The volume must be imported into the kubeflow namespace because this is the namespace that pipelines are executed in.

    If your trained model volume is already tied to a PVC in the kubeflow namespace, then you can skip this step. If you do not yet have a trained model volume, then you must provision one. See the section Provision a New Volume for an example showing how to provision a new volume with Trident.

    The example commands that follow show the importing of an existing FlexVol volume, named kfp_model_vol, into the kubeflow namespace. For more information about PVCs, see the official Kubernetes documentation. For more information about the volume import functionality, see the Trident documentation. For a detailed example showing the importing of a volume using Trident, see the section Import an Existing Volume.

    $ cat << EOF > ./pvc-import-dataset-vol-kubeflow.yaml
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
      name: kfp-model-vol
      namespace: kubeflow
    spec:
      accessModes:
        - ReadWriteMany
      storageClassName: ontap-ai-flexvols-retain
    EOF
    $ tridentctl import volume ontap-ai-flexvols kfp_model_vol -f ./pvc-import- kfp-model-vol-kubeflow.yaml -n trident
    +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+
    |                   NAME                   |  SIZE  |      STORAGE CLASS       | PROTOCOL |             BACKEND UUID             | STATE  | MANAGED |
    +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+
    | pvc-3c70ad14-d88f-11e9-b5e2-00505681f3d9 | 10 TiB | ontap-ai-flexvols-retain | file     | 2942d386-afcf-462e-bf89-1d2aa3376a7b | online | true    |
    +------------------------------------------+--------+--------------------------+----------+--------------------------------------+--------+---------+
    $ kubectl get pvc -n kubeflow
    NAME                                      STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS               AGE
    imagenet-benchmark-job-gblgq-kfpresults   Bound    pvc-a4e32212-d65c-11e9-a043-00505681a82d   1Gi        RWX            ontap-ai-flexvols-retain   2d19h
    katib-mysql                               Bound    pvc-b07f293e-d028-11e9-9b9d-00505681a82d   10Gi       RWO            ontap-ai-flexvols-retain   10d
    kfp-model-vol                             Bound    pvc-236e893b-63b4-40d3-963b-e709b9b2816b   10Ti       ROX            ontap-ai-flexvols-retain   8s
    metadata-mysql                            Bound    pvc-b0f3f032-d028-11e9-9b9d-00505681a82d   10Gi       RWO            ontap-ai-flexvols-retain   10d
    minio-pv-claim                            Bound    pvc-b22727ee-d028-11e9-9b9d-00505681a82d   20Gi       RWO            ontap-ai-flexvols-retain   10d
    mysql-pv-claim                            Bound    pvc-b2429afd-d028-11e9-9b9d-00505681a82d   20Gi       RWO            ontap-ai-flexvols-retain   10d
  4. If you have not already done so, you must install the Kubeflow Pipelines SDK. See the official Kubeflow documentation for installation instructions.

  5. Define your Kubeflow Pipeline in Python using the Kubeflow Pipelines SDK. The example commands that follow show the creation of a pipeline definition for a pipeline that will accept the following parameters at run-time and then execute the following steps. Modify the pipeline definition as needed depending on your specific process.

    Run-time parameters:

    • ontap_cluster_mgmt_hostname: Hostname or IP address of the ONTAP cluster on which your dataset and model volumes are stored.

    • ontap_cluster_admin_acct_k8s_secret: Name of the Kubernetes secret that was created in step 1.

    • ontap_verify_ssl_cert: Denotes whether to verify your cluster’s SSL certificate when communicating with the ONTAP API (True/False).

    • dataset_volume_pvc_existing: Name of the Kubernetes PersistentVolumeClaim (PVC) in the kubeflow namespace that is tied to the volume that contains the data that you want to use to train your model.

    • dataset_volume_pv_existing: Name of the Kubernetes PersistentVolume (PV) object that corresponds to the dataset volume PVC. To get the name of the PV, you can run kubectl -n kubeflow get pvc. The name of the PV that corresponds to a given PVC can be found in the VOLUME column.

    • trained_model_volume_pvc_existing: Name of the Kubernetes PersistentVolumeClaim (PVC) in the kubeflow namespace that is tied to the volume on which you want to store your trained model.

    • trained_model_volume_pv_existing: Name of the Kubernetes PersistentVolume (PV) object that corresponds to the trained model volume PVC. To get the name of the PV, you can run kubectl -n kubeflow get pvc. The name of the PV that corresponds to a given PVC can be found in the VOLUME column.

    • execute_data_prep_step__yes_or_no: Denotes whether you wish to execute a data prep step as part of this particular pipeline execution (yes/no).

    • data_prep_step_container_image: Container image in which you wish to execute your data prep step.

    • data_prep_step_command: Command that you want to execute as your data prep step.

    • data_prep_step_dataset_volume_mountpoint: Mountpoint at which you want to mount your dataset volume for your data prep step.

    • train_step_container_image: Container image in which you wish to execute your training step.

    • train_step_command: Command that you want to execute as your training step.

    • train_step_dataset_volume_mountpoint: Mountpoint at which you want to mount your dataset volume for your training step.

    • train_step_model_volume_mountpoint: Mountpoint at which you want to mount your model volume for your training step.

    • validation_step_container_image: Container image in which you wish to execute your validation step.

    • validation_step_command: Command that you want to execute as your validation step.

    • validation_step_dataset_volume_mountpoint: Mountpoint at which you want to mount your dataset volume for your validation step.

    • validation_step_model_volume_mountpoint: Mountpoint at which you want to mount your model volume for your validation step.

      Pipeline Steps:

      1. Optional: Executes a data prep step.

      2. Triggers the creation of a Snapshot copy, using NetApp Snapshot technology, of your dataset volume.

        This Snapshot copy is created for traceability purposes. Each time that this pipeline workflow is executed, a Snapshot copy is created. Therefore, as long as the Snapshot copy is not deleted, it is always possible to trace a specific training run back to the exact training dataset that was used for that run.

      3. Executes a training step.

      4. Triggers the creation of a Snapshot copy, using NetApp Snapshot technology, of your trained model volume.

        This Snapshot copy is created for versioning purposes. Each time that this pipeline workflow is executed, a Snapshot copy is created. Therefore, for each individual training run, a read-only versioned copy of the resulting trained model is automatically saved.

      5. Executes a validation step.

        $ cat << EOF > ./ai-training-run.py
        # Kubeflow Pipeline Definition: AI Training Run
        import kfp.dsl as dsl
        import kfp.onprem as onprem
        import kfp.components as comp
        from kubernetes import client as k8s_client
        # Define function that triggers the creation of a NetApp snapshot
        def netappSnapshot(
            ontapClusterMgmtHostname: str,
            pvName: str,
            verifySSLCert: bool = True
        ) -> str :
            # Install netapp_ontap package
            import sys, subprocess;
            subprocess.run([sys.executable, '-m', 'pip', 'install', 'netapp_ontap'])
        
            # Import needed functions/classes
            from netapp_ontap import config as netappConfig
            from netapp_ontap.host_connection import HostConnection as NetAppHostConnection
            from netapp_ontap.resources import Volume, Snapshot
            from datetime import datetime
            import json
            # Retrieve ONTAP cluster admin account details from mounted K8s secrets
            usernameSecret = open('/mnt/secret/username', 'r')
            ontapClusterAdminUsername = usernameSecret.read().strip()
            passwordSecret = open('/mnt/secret/password', 'r')
            ontapClusterAdminPassword = passwordSecret.read().strip()
        
            # Configure connection to ONTAP cluster/instance
            netappConfig.CONNECTION = NetAppHostConnection(
                host = ontapClusterMgmtHostname,
                username = ontapClusterAdminUsername,
                password = ontapClusterAdminPassword,
                verify = verifySSLCert
            )
        
            # Convert pv name to ONTAP volume name
            # The following will not work if you specified a custom storagePrefix when creating your
            #   Trident backend. If you specified a custom storagePrefix, you will need to update this
            #   code to match your prefix.
            volumeName = 'trident_%s' % pvName.replace("-", "_")
            print('\npv name: ', pvName)
            print('ONTAP volume name: ', volumeName)
            # Create snapshot; print API response
            volume = Volume.find(name = volumeName)
            timestamp = datetime.today().strftime("%Y%m%d_%H%M%S")
            snapshot = Snapshot.from_dict({
                'name': 'kfp_%s' % timestamp,
                'comment': 'Snapshot created by a Kubeflow pipeline',
                'volume': volume.to_dict()
            })
            response = snapshot.post()
            print("\nAPI Response:")
            print(response.http_response.text)
            # Retrieve snapshot details
            snapshot.get()
            # Convert snapshot details to JSON string and print
            snapshotDetails = snapshot.to_dict()
            print("\nSnapshot Details:")
            print(json.dumps(snapshotDetails, indent=2))
            # Return name of newly created snapshot
            return snapshotDetails['name']
        # Convert netappSnapshot function to Kubeflow Pipeline ContainerOp named 'NetappSnapshotOp'
        NetappSnapshotOp = comp.func_to_container_op(netappSnapshot, base_image='python:3')
        # Define Kubeflow Pipeline
        @dsl.pipeline(
            # Define pipeline metadata
            name="AI Training Run",
            description="Template for executing an AI training run with built-in training dataset traceability and trained model versioning"
        )
        def ai_training_run(
            # Define variables that the user can set in the pipelines UI; set default values
            ontap_cluster_mgmt_hostname: str = "10.61.188.40",
            ontap_cluster_admin_acct_k8s_secret: str = "ontap-cluster-mgmt-account",
            ontap_api_verify_ssl_cert: bool = True,
            dataset_volume_pvc_existing: str = "dataset-vol",
            dataset_volume_pv_existing: str = "pvc-43b12235-f32e-4dc4-a7b8-88e90d935a12",
            trained_model_volume_pvc_existing: str = "kfp-model-vol",
            trained_model_volume_pv_existing: str = "pvc-236e893b-63b4-40d3-963b-e709b9b2816b",
            execute_data_prep_step__yes_or_no: str = "yes",
            data_prep_step_container_image: str = "ubuntu:bionic",
            data_prep_step_command: str = "<insert command here>",
            data_prep_step_dataset_volume_mountpoint: str = "/mnt/dataset",
            train_step_container_image: str = "nvcr.io/nvidia/tensorflow:19.12-tf1-py3",
            train_step_command: str = "<insert command here>",
            train_step_dataset_volume_mountpoint: str = "/mnt/dataset",
            train_step_model_volume_mountpoint: str = "/mnt/model",
            validation_step_container_image: str = "nvcr.io/nvidia/tensorflow:19.12-tf1-py3",
            validation_step_command: str = "<insert command here>",
            validation_step_dataset_volume_mountpoint: str = "/mnt/dataset",
            validation_step_model_volume_mountpoint: str = "/mnt/model"
        ) :
            # Set GPU limits; Due to SDK limitations, this must be hardcoded
            train_step_num_gpu = 0
            validation_step_num_gpu = 0
            # Pipeline Steps:
            # Execute data prep step
            with dsl.Condition(execute_data_prep_step__yes_or_no == "yes") :
                data_prep = dsl.ContainerOp(
                    name="data-prep",
                    image=data_prep_step_container_image,
                    command=["sh", "-c"],
                    arguments=[data_prep_step_command]
                )
                # Mount dataset volume/pvc
                data_prep.apply(
                    onprem.mount_pvc(dataset_volume_pvc_existing, 'dataset', data_prep_step_dataset_volume_mountpoint)
                )
            # Create a snapshot of the dataset volume/pvc for traceability
            dataset_snapshot = NetappSnapshotOp(
                ontap_cluster_mgmt_hostname,
                dataset_volume_pv_existing,
                ontap_api_verify_ssl_cert
            )
            # Mount k8s secret containing ONTAP cluster admin account details
            dataset_snapshot.add_pvolumes({
                '/mnt/secret': k8s_client.V1Volume(
                    name='ontap-cluster-admin',
                    secret=k8s_client.V1SecretVolumeSource(
                        secret_name=ontap_cluster_admin_acct_k8s_secret
                    )
                )
            })
            # State that snapshot should be created after the data prep job completes
            dataset_snapshot.after(data_prep)
            # Execute training step
            train = dsl.ContainerOp(
                name="train-model",
                image=train_step_container_image,
                command=["sh", "-c"],
                arguments=[train_step_command]
            )
            # Mount dataset volume/pvc
            train.apply(
                onprem.mount_pvc(dataset_volume_pvc_existing, 'datavol', train_step_dataset_volume_mountpoint)
            )
            # Mount model volume/pvc
            train.apply(
                onprem.mount_pvc(trained_model_volume_pvc_existing, 'modelvol', train_step_model_volume_mountpoint)
            )
            # Request that GPUs be allocated to training job pod
            if train_step_num_gpu > 0 :
                train.set_gpu_limit(train_step_num_gpu, 'nvidia')
            # State that training job should be executed after dataset volume snapshot is taken
            train.after(dataset_snapshot)
            # Create a snapshot of the model volume/pvc for model versioning
            model_snapshot = NetappSnapshotOp(
                ontap_cluster_mgmt_hostname,
                trained_model_volume_pv_existing,
                ontap_api_verify_ssl_cert
            )
            # Mount k8s secret containing ONTAP cluster admin account details
            model_snapshot.add_pvolumes({
                '/mnt/secret': k8s_client.V1Volume(
                    name='ontap-cluster-admin',
                    secret=k8s_client.V1SecretVolumeSource(
                        secret_name=ontap_cluster_admin_acct_k8s_secret
                    )
                )
            })
            # State that snapshot should be created after the training job completes
            model_snapshot.after(train)
            # Execute inference validation job
            inference_validation = dsl.ContainerOp(
                name="validate-model",
                image=validation_step_container_image,
                command=["sh", "-c"],
                arguments=[validation_step_command]
            )
            # Mount dataset volume/pvc
            inference_validation.apply(
                onprem.mount_pvc(dataset_volume_pvc_existing, 'datavol', validation_step_dataset_volume_mountpoint)
            )
            # Mount model volume/pvc
            inference_validation.apply(
                onprem.mount_pvc(trained_model_volume_pvc_existing, 'modelvol', validation_step_model_volume_mountpoint)
            )
            # Request that GPUs be allocated to pod
            if validation_step_num_gpu > 0 :
                inference_validation.set_gpu_limit(validation_step_num_gpu, 'nvidia')
            # State that inference validation job should be executed after model volume snapshot is taken
            inference_validation.after(model_snapshot)
        if __name__ == "__main__" :
            import kfp.compiler as compiler
            compiler.Compiler().compile(ai_training_run, __file__ + ".yaml")
        EOF
        $ python3 ai-training-run.py
        $ ls ai-training-run. py. yaml
        ai-training-run. py. yaml
  6. From the Kubeflow central dashboard, click Pipelines in the main menu to navigate to the Kubeflow Pipelines administration page.

    Error: Missing Graphic Image

  7. Click Upload Pipeline to upload your pipeline definition.

    Error: Missing Graphic Image

  8. Choose the . yaml archive containing your pipeline definition that you created in step 5, give your pipeline a name, and click Upload.

    Error: Missing Graphic Image

  9. You should now see your new pipeline in the list of pipelines on the pipeline administration page. Click your pipeline’s name to view it.

    Error: Missing Graphic Image

  10. Review your pipeline to confirm that it looks correct.

    Error: Missing Graphic Image

  11. Click Create run to run your pipeline.

    Error: Missing Graphic Image

  12. You are now presented with a screen from which you can start a pipeline run. Create a name for the run, enter a description, choose an experiment to file the run under, and choose whether you want to initiate a one-off run or schedule a recurring run.

    Error: Missing Graphic Image

  13. Define parameters for the run, and then click Start.

    In the following example, the default values are accepted for most parameters. Details for the volume that was imported into the kubeflow namespace in step 2 are entered for dataset_volume_pvc_existing and dataset_volume_pv_existing. Details for the volume that was imported into the kubeflow namespace in step 3 are entered for trained_model_volume_pvc_existing and trained_model_volume_pv_existing. Non-AI-related commands are entered for the data_prep_step_command, train_step_command, and validation_step_command parameters in order to plainly demonstrate the functionality of the pipeline. Note that you defined the default values for the parameters within your pipeline definition (see step 5).

    Error: Missing Graphic Image

    Error: Missing Graphic Image

  14. You are now presented with a screen listing all runs that fall under the specific experiment. Click the name of the run that you just started to view it.

    Error: Missing Graphic Image

    At this point, the run is likely still in progress.

    Error: Missing Graphic Image

  15. Confirm that the run completed successfully. When the run is complete, every stage of the pipeline shows a green checkmark icon.

    Error: Missing Graphic Image

  16. Click a specific stage, and then click Logs to view output for that stage.

Error: Missing Graphic Image

Error: Missing Graphic Image

Error: Missing Graphic Image

Error: Missing Graphic Image