Create a Kubeflow Pipeline to Trigger a SnapMirror Volume Replication Update

Contributors netapp-dorianh kevin-hoke Download PDF of this page

You can define and execute a new Kubeflow pipeline that takes advantage of NetApp SnapMirror data replication technology to replicate the contents of a volume between different ONTAP clusters.

This pipeline can be used to replicate data of any type between ONTAP clusters that might or might not be located at different sites or in different regions. Potential use cases include:

  • Replicating newly-acquired sensor data gathered at the edge back to the core data center or to the cloud to be used for AI/ML model training or retraining.

  • Replicating a newly-trained or newly-updated model from the core data center to the edge or to the cloud to be deployed as part of an inferencing application.For more information about Kubeflow pipelines, see the official Kubeflow documentation. Note that the example pipeline that is shown in this section only works with volumes that reside on ONTAP storage systems or software-defined instances.

To create a new Kubeflow pipeline to trigger a SnapMirror volume replication update, perform the following steps:

Before you perform the exercises that are outlined in this section, we assume that you have already initiated an asynchronous SnapMirror relationship between the source and the destination volume according to standard configuration instructions. For details, refer to official NetApp documentation.
  1. If you have not already done so, create a Kubernetes secret containing the username and password of the cluster admin account for the ONTAP cluster on which your destination volume resides

  2. This secret must be created in the kubeflow namespace because this is the namespace that pipelines are executed in. Replace username and password with your username and password when executing these commands and use the output of the base64 commands (see highlighted text) in your secret definition accordingly.

    $ echo -n 'username' | base64
    dXNlcm5hbWU=
    $ echo -n 'password' | base64
    cGFzc3dvcmQ=
    $ cat << EOF > ./secret-ontap-cluster-mgmt-account.yaml
    apiVersion: v1
    kind: Secret
    metadata:
      name: ontap-cluster-mgmt-account
      namespace: kubeflow
    data:
      username: dXNlcm5hbWU=
      password: cGFzc3dvcmQ=
    EOF
    $ kubectl create -f ./secret-ontap-cluster-mgmt-account.yaml
    secret/ontap-cluster-mgmt-account created
  3. If you have not already done so, install the Kubeflow Pipelines SDK. See the official Kubeflow documentation for installation instructions.

  4. Define your Kubeflow pipeline in Python using the Kubeflow Pipelines SDK.

    Pipeline Steps:

    1. Trigger a replication update for the specified asynchronous SnapMirror relationship.

      The example commands below show the creation of a pipeline definition for a pipeline that accepts the following parameters at run-time. Modify the pipeline definition as needed depending on your specific process.

      Run-time Parameters:

      • ontap_cluster_mgmt_hostname: Hostname or IP address of the ONTAP cluster on which the destination volume resides.

      • ontap_cluster_admin_acct_k8s_secret: Name of the Kubernetes secret that was created in step 1.

      • ontap_api_verify_ssl_cert: Denotes whether to verify your cluster’s SSL certificate when communicating with the ONTAP API (yes/no).

      • source_svm: Name of the SVM on which the source volume resides.

      • source_volume: Name of the source volume (the volume that you are replicating from) on the source cluster.

      • destination_svm: Name of the SVM on which the destination volume resides.

      • destination_volume: Name of the destination volume (the volume that you are replicating to) on the destination cluster.

        $ cat << EOF > ./replicate-data-snapmirror.py
        # Kubeflow Pipeline Definition: Replicate data - SnapMirror
        import kfp.dsl as dsl
        import kfp.components as comp
        from kubernetes import client as k8s_client
        # Define function that triggers the creation of a NetApp snapshot
        def netappSnapMirrorUpdate(
            ontapClusterMgmtHostname: str,
            sourceSvm: str,
            sourceVolume: str,
            destinationSvm: str,
            destinationVolume: str,
            verifySSLCert: str = 'no'
        ) -> int :
            # Install ansible package
            import sys, subprocess
            print("Installing required Python modules:\n")
            subprocess.run([sys.executable, '-m', 'pip', 'install', 'ansible', 'netapp-lib'])
        
            # Retrieve ONTAP cluster admin account details from mounted K8s secrets
            usernameSecret = open('/mnt/secret/username', 'r')
            ontapClusterAdminUsername = usernameSecret.read().strip()
            passwordSecret = open('/mnt/secret/password', 'r')
            ontapClusterAdminPassword = passwordSecret.read().strip()
        
            # Define Ansible playbook for triggering SnapMirror update
            snapMirrorPlaybookContent = """
        ---
        - name: "Trigger SnapMirror Update"
          hosts: localhost
          tasks:
          - name: update snapmirror
            na_ontap_snapmirror:
              state: present
              source_path: '%s:%s'
              destination_path: '%s:%s'
              hostname: '%s'
              username: '%s'
              password: '%s'
              https: 'yes'
              validate_certs: '%s'""" % (sourceSvm, sourceVolume, destinationSvm, destinationVolume, ontapClusterMgmtHostname,
                ontapClusterAdminUsername, ontapClusterAdminPassword, verifySSLCert)
            print("\nCreating Ansible playbook:")
            print(snapMirrorPlaybookContent, "\n")
            snapMirrorPlaybookFile = open("/root/snapmirror-update.yaml", "w")
            snapMirrorPlaybookFile.write(snapMirrorPlaybookContent)
            snapMirrorPlaybookFile.close()
            # Trigger SnapMirror update
            print("Executing Ansible playbook to trigger SnapMirror update:\n")
            try :
                subprocess.run(['ansible-playbook', '/root/snapmirror-update.yaml'])
            except Exception as e :
                print(str(e).strip())
                raise
            # Return success code
            return 0
        # Convert netappSnapMirrorUpdate function to Kubeflow Pipeline ContainerOp named 'NetappSnapMirrorUpdateOp'
        NetappSnapMirrorUpdateOp = comp.func_to_container_op(netappSnapMirrorUpdate, base_image='python:3')
        # Define Kubeflow Pipeline
        @dsl.pipeline(
            name="Replicate Data",
            description="Template for triggering a NetApp SnapMirror update in order to replicate data across environments"
        )
        def replicate_data(
            # Define variables that the user can set in the pipelines UI; set default values
            ontap_cluster_mgmt_hostname: str = "10.61.188.40",
            ontap_cluster_admin_acct_k8s_secret: str = "ontap-cluster-mgmt-account",
            ontap_api_verify_ssl_cert: str = "yes",
            source_svm: str = "ailab",
            source_volume: str = "sm",
            destination_svm: str = "ai221_data",
            destination_volume: str = "sm_dest"
        ) :
            # Pipeline Steps:
            # Trigger SnapMirror replication
            replicate = NetappSnapMirrorUpdateOp(
                ontap_cluster_mgmt_hostname,
                source_svm,
                source_volume,
                destination_svm,
                destination_volume,
                ontap_api_verify_ssl_cert
            )
            # Mount k8s secret containing ONTAP cluster admin account details
            replicate.add_pvolumes({
                '/mnt/secret': k8s_client.V1Volume(
                    name='ontap-cluster-admin',
                    secret=k8s_client.V1SecretVolumeSource(
                        secret_name=ontap_cluster_admin_acct_k8s_secret
                    )
                )
            })
        if __name__ == '__main__' :
            import kfp.compiler as compiler
            compiler.Compiler().compile(replicate_data, __file__ + '.yaml')
        EOF
        $ python3 replicate-data-snapmirror.py
        $ ls replicate-data-snapmirror.py.yaml
        replicate-data-snapmirror.py.yaml
  5. Follow steps 6 through 17 from the section Create a Kubeflow Pipeline to Execute an End-to-End AI Training Workflow with Built-in Traceability and Versioning in this document.

    Be sure to use the pipeline definition that was created in the previous step (step 3) of this section instead of the pipeline definition that was created in Create a Kubeflow Pipeline to Execute an End-to-End AI Training Workflow with Built-in Traceability and Versioning.