Trigger an XCP Copy or Sync Operation

Contributors netapp-dorianh kevin-hoke Download PDF of this page

The example DAG outlined in this section implements a workflow that invokes NetApp XCP to quickly and reliably replicate data between NFS endpoints. Potential use cases include the following:

  • Replicating newly acquired sensor data gathered at the edge back to the core data center or to the cloud to be used for AI/ML model training or retraining.

  • Replicating a newly trained or newly updated model from the core data center to the edge or to the cloud to be deployed as part of an inferencing application.

  • Copying data from a Hadoop data lake (through Hadoop NFS Gateway) to a high-performance AI/ML training environment for use in the training of an AI/ML model.

  • Copying NFS-accessible data from a legacy or non-NetApp system of record to a high-performance AI/ML training environment for use in the training of an AI/ML model.

Prerequisites

For this DAG to function correctly, you must complete the following prerequisites.

  1. You must have created a connection in Airflow for a host that is accessible via SSH and on which NetApp XCP is installed and configured. For details regarding how to install and configure NetApp XCP, refer to the NetApp XCP homepage and the official NetApp XCP documentation.

    To manage connections in Airflow, navigate to Admin > Connections in the Airflow web service UI. The example screenshot that follows shows the creation of a connection for a specific host on which NetApp XCP is installed and configured. The following values are required:

    • Conn ID. Unique name for the connection.

    • *Conn Type. *Must be set to SSH.

    • Host. The host name or IP address of the host.

    • *Login. *Username to use when accessing the host via SSH.

    • Password. Password to use when accessing the host via SSH.

Error: Missing Graphic Image

DAG Definition

The Python code excerpt that follows contains the definition for the example DAG. Before executing this example DAG in your environment, you must modify the parameter values in the DEFINE PARAMETERS section to match your environment.

# Airflow DAG Definition: Replicate Data - XCP
#
# Steps:
#   1. Invoke NetApp XCP copy or sync operation
from airflow.utils.dates import days_ago
from airflow.secrets import get_connections
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime
##### DEFINE PARAMETERS: Modify parameter values in this section to match your environment #####
## Define default args for DAG
replicate_data_xcp_dag_default_args = {
    'owner': 'NetApp'
}
## Define DAG details
replicate_data_xcp_dag = DAG(
    dag_id='replicate_data_xcp',
    default_args=replicate_data_xcp_dag_default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['data-movement']
)
## Define xcp operation details (change values as necessary to match your environment and desired operation)
# Define xcp operation to perform
xcpOperation = 'sync' # Must be 'copy' or 'sync'
# Define source and destination for copy operation
xcpCopySource = '192.168.200.41:/trident_pvc_957318e1_9b73_4e16_b857_dca7819dd263'
xcpCopyDestination = '192.168.200.41:/trident_pvc_9e7607c2_29c8_4dbf_9b08_551ba72d0273'
# Define catalog id for sync operation
xcpSyncId = 'autoname_copy_2020-10-06_16.37.44.963391'
## Define xcp host details (change values as necessary to match your environment)
xcpAirflowConnectionName = 'xcp_host' # Name of the Airflow connection of type 'ssh' that contains connection details for a host on which xcp is installed, configured, and accessible within $PATH
################################################################################################
# Construct xcp command
xcpCommand = 'xcp help'
if xcpOperation == 'copy' :
    xcpCommand = 'xcp copy ' + xcpCopySource + ' ' + xcpCopyDestination
elif xcpOperation == 'sync' :
    xcpCommand = 'xcp sync -id ' + xcpSyncId
# Define DAG steps/workflow
with replicate_data_xcp_dag as dag :
    # Define step to invoke a NetApp XCP copy or sync operation
    invoke_xcp = SSHOperator(
        task_id="invoke-xcp",
        command=xcpCommand,
        ssh_conn_id=xcpAirflowConnectionName
    )