Implement an End-to-End AI Training Workflow with Built-in Traceability and Versioning
Contributors
Download PDF of this page
The example DAG outlined in this section implements a workflow that takes advantage of NetApp Snapshot technology to integrate rapid and efficient dataset and model versioning and traceability into an end-to-end AI/ML model training workflow.
Prerequisites
For this DAG to function correctly, you must complete the following prerequisites:
-
You must have created a connection in Airflow for your ONTAP system.
To manage connections in Airflow, navigate to Admin > Connections in the Airflow web service UI. The example screenshot that follows shows the creation of a connection for a specific ONTAP system. The following values are required:
-
Conn ID. Unique name for the connection.
-
Host. The host name or IP address of the ONTAP cluster on which your dataset and model volumes are stored.
-
*Login. *Username of the cluster admin account for the ONTAP cluster on which your volumes reside.
-
Password. Password of the cluster admin account for the ONTAP cluster on which your volumes reside.
-
-
There must be an existing PersistentVolumeClaim (PVC) in the
airflow
namespace that is tied to the volume that contains the data that you want to use to train your model. -
There must be an existing PersistentVolumeClaim (PVC) in the
airflow
namespace that is tied to the volume on which you want to store your trained model.
DAG Definition
The Python code excerpt that follows contains the definition for the example DAG. Before executing this example DAG in your environment, you must modify the parameter values in the DEFINE PARAMETERS
section to match your environment.
# Airflow DAG Definition: AI Training Run # # Steps: # 1. Data prep job # 2. Dataset snapshot (for traceability) # 3. Training job # 4. Model snapshot (for versioning/baselining) # 5. Inference validation job from airflow.utils.dates import days_ago from airflow.secrets import get_connections from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.contrib.kubernetes.pod import Resources from airflow.contrib.kubernetes.volume import Volume from airflow.contrib.kubernetes.volume_mount import VolumeMount ##### DEFINE PARAMETERS: Modify parameter values in this section to match your environment ##### ## Define default args for DAG ai_training_run_dag_default_args = { 'owner': 'NetApp' } ## Define DAG details ai_training_run_dag = DAG( dag_id='ai_training_run', default_args=ai_training_run_dag_default_args, schedule_interval=None, start_date=days_ago(2), tags=['training'] ) ## Define volume details (change values as necessary to match your environment) # ONTAP system details airflowConnectionName = 'ontap_ai' # Name of the Airflow connection that contains connection details for your ONTAP system's cluster admin account verifySSLCert = False # Denotes whether or not to verify the SSL cert when calling the ONTAP API # Dataset volume dataset_volume_mount = VolumeMount( 'dataset-volume', mount_path='/mnt/dataset', sub_path=None, read_only=False ) dataset_volume_config= { 'persistentVolumeClaim': { 'claimName': 'dataset-vol' } } dataset_volume = Volume(name='dataset-volume', configs=dataset_volume_config) dataset_volume_pv_name = 'pvc-79e0855a-30a1-4f63-b34c-1029b1df49f6' # Model volume model_volume_mount = VolumeMount( 'model-volume', mount_path='/mnt/model', sub_path=None, read_only=False ) model_volume_config= { 'persistentVolumeClaim': { 'claimName': 'airflow-model-vol' } } model_volume = Volume(name='model-volume', configs=model_volume_config) model_volume_pv_name = 'pvc-b3e7cb62-2694-45a3-a56d-9fad6b1262e4' ## Define job details (change values as needed) # Data prep step data_prep_step_container_image = "ubuntu:bionic" data_prep_step_command = ["echo", "'No data prep command entered'"] # Replace this echo command with the data prep command that you wish to execute data_prep_step_resources = {} # Hint: To request that 1 GPU be allocated to job pod, change to: {'limit_gpu': 1} # Training step train_step_container_image = "nvcr.io/nvidia/tensorflow:20.07-tf1-py3" train_step_command = ["echo", "'No training command entered'"] # Replace this echo command with the training command that you wish to execute train_step_resources = {} # Hint: To request that 1 GPU be allocated to job pod, change to: {'limit_gpu': 1} # Inference validation step validate_step_container_image = "nvcr.io/nvidia/tensorflow:20.07-tf1-py3" validate_step_command = ["echo", "'No inference validation command entered'"] # Replace this echo command with the inference validation command that you wish to execute validate_step_resources = {} # Hint: To request that 1 GPU be allocated to job pod, change to: {'limit_gpu': 1} ################################################################################################ # Define function that triggers the creation of a NetApp snapshot def netappSnapshot(**kwargs) -> str : # Parse args for key, value in kwargs.items() : if key == 'pvName' : pvName = value elif key == 'verifySSLCert' : verifySSLCert = value elif key == 'airflowConnectionName' : airflowConnectionName = value # Install netapp_ontap package import sys, subprocess result = subprocess.check_output([sys.executable, '-m', 'pip', 'install', '--user', 'netapp-ontap']) print(str(result).replace('\\n', '\n')) # Import needed functions/classes from netapp_ontap import config as netappConfig from netapp_ontap.host_connection import HostConnection as NetAppHostConnection from netapp_ontap.resources import Volume, Snapshot from datetime import datetime import json # Retrieve ONTAP cluster admin account details from Airflow connection connections = get_connections(conn_id = airflowConnectionName) ontapConnection = connections[0] # Assumes that you only have one connection with the specified conn_id configured in Airflow ontapClusterAdminUsername = ontapConnection.login ontapClusterAdminPassword = ontapConnection.password ontapClusterMgmtHostname = ontapConnection.host # Configure connection to ONTAP cluster/instance netappConfig.CONNECTION = NetAppHostConnection( host = ontapClusterMgmtHostname, username = ontapClusterAdminUsername, password = ontapClusterAdminPassword, verify = verifySSLCert ) # Convert pv name to ONTAP volume name # The following will not work if you specified a custom storagePrefix when creating your # Trident backend. If you specified a custom storagePrefix, you will need to update this # code to match your prefix. volumeName = 'trident_%s' % pvName.replace("-", "_") print('\npv name: ', pvName) print('ONTAP volume name: ', volumeName) # Create snapshot; print API response volume = Volume.find(name = volumeName) timestamp = datetime.today().strftime("%Y%m%d_%H%M%S") snapshot = Snapshot.from_dict({ 'name': 'airflow_%s' % timestamp, 'comment': 'Snapshot created by a Apache Airflow DAG', 'volume': volume.to_dict() }) response = snapshot.post() print("\nAPI Response:") print(response.http_response.text) # Retrieve snapshot details snapshot.get() # Convert snapshot details to JSON string and print snapshotDetails = snapshot.to_dict() print("\nSnapshot Details:") print(json.dumps(snapshotDetails, indent=2)) # Return name of newly created snapshot return snapshotDetails['name'] # Define DAG steps/workflow with ai_training_run_dag as dag : # Define data prep step using Kubernetes Pod operator (https://airflow.apache.org/docs/stable/kubernetes.html#kubernetespodoperator) data_prep = KubernetesPodOperator( namespace='airflow', image=data_prep_step_container_image, cmds=data_prep_step_command, resources = data_prep_step_resources, volumes=[dataset_volume, model_volume], volume_mounts=[dataset_volume_mount, model_volume_mount], name="ai-training-run-data-prep", task_id="data-prep", is_delete_operator_pod=True, hostnetwork=False ) # Define step to take a snapshot of the dataset volume for traceability dataset_snapshot = PythonOperator( task_id='dataset-snapshot', python_callable=netappSnapshot, op_kwargs={ 'airflowConnectionName': airflowConnectionName, 'pvName': dataset_volume_pv_name, 'verifySSLCert': verifySSLCert }, dag=dag ) # State that the dataset snapshot should be created after the data prep job completes data_prep >> dataset_snapshot # Define training step using Kubernetes Pod operator (https://airflow.apache.org/docs/stable/kubernetes.html#kubernetespodoperator) train = KubernetesPodOperator( namespace='airflow', image=train_step_container_image, cmds=train_step_command, resources = train_step_resources, volumes=[dataset_volume, model_volume], volume_mounts=[dataset_volume_mount, model_volume_mount], name="ai-training-run-train", task_id="train", is_delete_operator_pod=True, hostnetwork=False ) # State that training job should be executed after dataset volume snapshot is taken dataset_snapshot >> train # Define step to take a snapshot of the model volume for versioning/baselining model_snapshot = PythonOperator( task_id='model-snapshot', python_callable=netappSnapshot, op_kwargs={ 'airflowConnectionName': airflowConnectionName, 'pvName': model_volume_pv_name, 'verifySSLCert': verifySSLCert }, dag=dag ) # State that the model snapshot should be created after the training job completes train >> model_snapshot # Define inference validation step using Kubernetes Pod operator (https://airflow.apache.org/docs/stable/kubernetes.html#kubernetespodoperator) validate = KubernetesPodOperator( namespace='airflow', image=validate_step_container_image, cmds=validate_step_command, resources = validate_step_resources, volumes=[dataset_volume, model_volume], volume_mounts=[dataset_volume_mount, model_volume_mount], name="ai-training-run-validate", task_id="validate", is_delete_operator_pod=True, hostnetwork=False ) # State that inference validation job should be executed after model volume snapshot is taken model_snapshot >> validate