Implement an End-to-End AI Training Workflow with Built-in Traceability and Versioning

Contributors netapp-dorianh kevin-hoke Download PDF of this page

The example DAG outlined in this section implements a workflow that takes advantage of NetApp Snapshot technology to integrate rapid and efficient dataset and model versioning and traceability into an end-to-end AI/ML model training workflow.

Prerequisites

For this DAG to function correctly, you must complete the following prerequisites:

  1. You must have created a connection in Airflow for your ONTAP system.

    To manage connections in Airflow, navigate to Admin > Connections in the Airflow web service UI. The example screenshot that follows shows the creation of a connection for a specific ONTAP system. The following values are required:

    • Conn ID. Unique name for the connection.

    • Host. The host name or IP address of the ONTAP cluster on which your dataset and model volumes are stored.

    • *Login. *Username of the cluster admin account for the ONTAP cluster on which your volumes reside.

    • Password. Password of the cluster admin account for the ONTAP cluster on which your volumes reside.

Error: Missing Graphic Image

  1. There must be an existing PersistentVolumeClaim (PVC) in the airflow namespace that is tied to the volume that contains the data that you want to use to train your model.

  2. There must be an existing PersistentVolumeClaim (PVC) in the airflow namespace that is tied to the volume on which you want to store your trained model.

DAG Definition

The Python code excerpt that follows contains the definition for the example DAG. Before executing this example DAG in your environment, you must modify the parameter values in the DEFINE PARAMETERS section to match your environment.

# Airflow DAG Definition: AI Training Run
#
# Steps:
#   1. Data prep job
#   2. Dataset snapshot (for traceability)
#   3. Training job
#   4. Model snapshot (for versioning/baselining)
#   5. Inference validation job
from airflow.utils.dates import days_ago
from airflow.secrets import get_connections
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.pod import Resources
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
##### DEFINE PARAMETERS: Modify parameter values in this section to match your environment #####
## Define default args for DAG
ai_training_run_dag_default_args = {
    'owner': 'NetApp'
}
## Define DAG details
ai_training_run_dag = DAG(
    dag_id='ai_training_run',
    default_args=ai_training_run_dag_default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['training']
)
## Define volume details (change values as necessary to match your environment)
# ONTAP system details
airflowConnectionName = 'ontap_ai'  # Name of the Airflow connection that contains connection details for your ONTAP system's cluster admin account
verifySSLCert = False   # Denotes whether or not to verify the SSL cert when calling the ONTAP API
# Dataset volume
dataset_volume_mount = VolumeMount(
    'dataset-volume',
    mount_path='/mnt/dataset',
    sub_path=None,
    read_only=False
)
dataset_volume_config= {
    'persistentVolumeClaim': {
        'claimName': 'dataset-vol'
    }
}
dataset_volume = Volume(name='dataset-volume', configs=dataset_volume_config)
dataset_volume_pv_name = 'pvc-79e0855a-30a1-4f63-b34c-1029b1df49f6'
# Model volume
model_volume_mount = VolumeMount(
    'model-volume',
    mount_path='/mnt/model',
    sub_path=None,
    read_only=False
)
model_volume_config= {
    'persistentVolumeClaim': {
        'claimName': 'airflow-model-vol'
    }
}
model_volume = Volume(name='model-volume', configs=model_volume_config)
model_volume_pv_name = 'pvc-b3e7cb62-2694-45a3-a56d-9fad6b1262e4'
## Define job details (change values as needed)
# Data prep step
data_prep_step_container_image = "ubuntu:bionic"
data_prep_step_command = ["echo", "'No data prep command entered'"] # Replace this echo command with the data prep command that you wish to execute
data_prep_step_resources = {} # Hint: To request that 1 GPU be allocated to job pod, change to: {'limit_gpu': 1}
# Training step
train_step_container_image = "nvcr.io/nvidia/tensorflow:20.07-tf1-py3"
train_step_command = ["echo", "'No training command entered'"] # Replace this echo command with the training command that you wish to execute
train_step_resources = {} # Hint: To request that 1 GPU be allocated to job pod, change to: {'limit_gpu': 1}
# Inference validation step
validate_step_container_image = "nvcr.io/nvidia/tensorflow:20.07-tf1-py3"
validate_step_command = ["echo", "'No inference validation command entered'"] # Replace this echo command with the inference validation command that you wish to execute
validate_step_resources = {} # Hint: To request that 1 GPU be allocated to job pod, change to: {'limit_gpu': 1}
################################################################################################
# Define function that triggers the creation of a NetApp snapshot
def netappSnapshot(**kwargs) -> str :
    # Parse args
    for key, value in kwargs.items() :
        if key == 'pvName' :
            pvName = value
        elif key == 'verifySSLCert' :
            verifySSLCert = value
        elif key == 'airflowConnectionName' :
            airflowConnectionName = value
    # Install netapp_ontap package
    import sys, subprocess
    result = subprocess.check_output([sys.executable, '-m', 'pip', 'install', '--user', 'netapp-ontap'])
    print(str(result).replace('\\n', '\n'))

    # Import needed functions/classes
    from netapp_ontap import config as netappConfig
    from netapp_ontap.host_connection import HostConnection as NetAppHostConnection
    from netapp_ontap.resources import Volume, Snapshot
    from datetime import datetime
    import json
    # Retrieve ONTAP cluster admin account details from Airflow connection
    connections = get_connections(conn_id = airflowConnectionName)
    ontapConnection = connections[0]    # Assumes that you only have one connection with the specified conn_id configured in Airflow
    ontapClusterAdminUsername = ontapConnection.login
    ontapClusterAdminPassword = ontapConnection.password
    ontapClusterMgmtHostname = ontapConnection.host

    # Configure connection to ONTAP cluster/instance
    netappConfig.CONNECTION = NetAppHostConnection(
        host = ontapClusterMgmtHostname,
        username = ontapClusterAdminUsername,
        password = ontapClusterAdminPassword,
        verify = verifySSLCert
    )

    # Convert pv name to ONTAP volume name
    # The following will not work if you specified a custom storagePrefix when creating your
    #   Trident backend. If you specified a custom storagePrefix, you will need to update this
    #   code to match your prefix.
    volumeName = 'trident_%s' % pvName.replace("-", "_")
    print('\npv name: ', pvName)
    print('ONTAP volume name: ', volumeName)
    # Create snapshot; print API response
    volume = Volume.find(name = volumeName)
    timestamp = datetime.today().strftime("%Y%m%d_%H%M%S")
    snapshot = Snapshot.from_dict({
        'name': 'airflow_%s' % timestamp,
        'comment': 'Snapshot created by a Apache Airflow DAG',
        'volume': volume.to_dict()
    })
    response = snapshot.post()
    print("\nAPI Response:")
    print(response.http_response.text)
    # Retrieve snapshot details
    snapshot.get()
    # Convert snapshot details to JSON string and print
    snapshotDetails = snapshot.to_dict()
    print("\nSnapshot Details:")
    print(json.dumps(snapshotDetails, indent=2))
    # Return name of newly created snapshot
    return snapshotDetails['name']
# Define DAG steps/workflow
with ai_training_run_dag as dag :
    # Define data prep step using Kubernetes Pod operator (https://airflow.apache.org/docs/stable/kubernetes.html#kubernetespodoperator)
    data_prep = KubernetesPodOperator(
        namespace='airflow',
        image=data_prep_step_container_image,
        cmds=data_prep_step_command,
        resources = data_prep_step_resources,
        volumes=[dataset_volume, model_volume],
        volume_mounts=[dataset_volume_mount, model_volume_mount],
        name="ai-training-run-data-prep",
        task_id="data-prep",
        is_delete_operator_pod=True,
        hostnetwork=False
    )
    # Define step to take a snapshot of the dataset volume for traceability
    dataset_snapshot = PythonOperator(
        task_id='dataset-snapshot',
        python_callable=netappSnapshot,
        op_kwargs={
            'airflowConnectionName': airflowConnectionName,
            'pvName': dataset_volume_pv_name,
            'verifySSLCert': verifySSLCert
        },
        dag=dag
    )
    # State that the dataset snapshot should be created after the data prep job completes
    data_prep >> dataset_snapshot
    # Define training step using Kubernetes Pod operator (https://airflow.apache.org/docs/stable/kubernetes.html#kubernetespodoperator)
    train = KubernetesPodOperator(
        namespace='airflow',
        image=train_step_container_image,
        cmds=train_step_command,
        resources = train_step_resources,
        volumes=[dataset_volume, model_volume],
        volume_mounts=[dataset_volume_mount, model_volume_mount],
        name="ai-training-run-train",
        task_id="train",
        is_delete_operator_pod=True,
        hostnetwork=False
    )
    # State that training job should be executed after dataset volume snapshot is taken
    dataset_snapshot >> train
    # Define step to take a snapshot of the model volume for versioning/baselining
    model_snapshot = PythonOperator(
        task_id='model-snapshot',
        python_callable=netappSnapshot,
        op_kwargs={
            'airflowConnectionName': airflowConnectionName,
            'pvName': model_volume_pv_name,
            'verifySSLCert': verifySSLCert
        },
        dag=dag
    )
    # State that the model snapshot should be created after the training job completes
    train >> model_snapshot
    # Define inference validation step using Kubernetes Pod operator (https://airflow.apache.org/docs/stable/kubernetes.html#kubernetespodoperator)
    validate = KubernetesPodOperator(
        namespace='airflow',
        image=validate_step_container_image,
        cmds=validate_step_command,
        resources = validate_step_resources,
        volumes=[dataset_volume, model_volume],
        volume_mounts=[dataset_volume_mount, model_volume_mount],
        name="ai-training-run-validate",
        task_id="validate",
        is_delete_operator_pod=True,
        hostnetwork=False
    )
    # State that inference validation job should be executed after model volume snapshot is taken
    model_snapshot >> validate