Trigger a Cloud Sync Replication Update

Contributors netapp-dorianh kevin-hoke Download PDF of this page

The example DAG outlined in this section implements a workflow that takes advantage of NetApp Cloud Sync replication technology to replicate data to and from a variety of file and object storage platforms. Potential use cases include the following:

  • Replicating newly acquired sensor data gathered at the edge back to the core data center or to the cloud to be used for AI/ML model training or retraining.

  • Replicating a newly trained or newly updated model from the core data center to the edge or to the cloud to be deployed as part of an inferencing application.

  • Copying data from an S3 data lake to a high-performance AI/ML training environment for use in the training of an AI/ML model.

  • Copying data from a Hadoop data lake (through Hadoop NFS Gateway) to a high-performance AI/ML training environment for use in the training of an AI/ML model.

  • Saving a new version of a trained model to an S3 or Hadoop data lake for permanent storage.

  • Copying NFS-accessible data from a legacy or non-NetApp system of record to a high-performance AI/ML training environment for use in the training of an AI/ML model.

Prerequisites

For this DAG to function correctly, you must complete the following prerequisites.

  1. You must have created a connection in Airflow for the NetApp Cloud Sync API.

    To manage connections in Airflow, navigate to Admin > Connections in the Airflow web service UI. The example screenshot that follows shows the creation of a connection for the Cloud Sync API. The following values are required:

    • Conn ID. Unique name for the connection.

    • Password. Your Cloud Sync API refresh token.

Error: Missing Graphic Image

  1. You must have already initiated the Cloud Sync relationship that you wish to trigger an update for. To initiate a relationship, visit cloudsync.netapp.com.

DAG Definition

The Python code excerpt that follows contains the definition for the example DAG. Before executing this example DAG in your environment, you must modify the parameter values in the DEFINE PARAMETERS section to match your environment.

# Airflow DAG Definition: Replicate Data - Cloud Sync
#
# Steps:
#   1. Trigger NetApp Cloud Sync update
from airflow.utils.dates import days_ago
from airflow.secrets import get_connections
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
##### DEFINE PARAMETERS: Modify parameter values in this section to match your environment #####
## Define default args for DAG
replicate_data_cloud_sync_dag_default_args = {
    'owner': 'NetApp'
}
## Define DAG details
replicate_data_cloud_sync_dag = DAG(
    dag_id='replicate_data_cloud_sync',
    default_args=replicate_data_cloud_sync_dag_default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['data-movement']
)
## Define Cloud Sync details (change values as necessary to match your environment)
# Cloud Sync refresh token details
airflowConnectionName = 'cloud_sync'  # Name of the Airflow connection that contains your Cloud Sync refresh token
# Cloud Sync relationship details (existing Cloud Sync relationship for which you want to trigger an update)
relationshipId = '5ed00996ca85650009a83db2'
################################################################################################
## Function for triggering an update for a specific Cloud Sync relationship
def netappCloudSyncUpdate(**kwargs) :
    # Parse args
    printResponse = False # Default value
    keepCheckingUntilComplete = True # Default value
    for key, value in kwargs.items() :
        if key == 'relationshipId' :
            relationshipId = value
        elif key == 'printResponse' :
            printResponse = value
        elif key == 'keepCheckingUntilComplete' :
            keepCheckingUntilComplete = value
        elif key == 'airflowConnectionName' :
            airflowConnectionName = value

    # Install requests module
    import sys, subprocess
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'requests'])
    # Import needed modules
    import requests, json, time
    ## API response error class; objects of this class will be raised when an API resposne is not as expected
    class APIResponseError(Exception) :
        '''Error that will be raised when an API response is not as expected'''
        pass
    ## Generic function for printing an API response
    def printAPIResponse(response: requests.Response) :
        print("API Response:")
        print("Status Code: ", response.status_code)
        print("Header: ", response.headers)
        if response.text :
            print("Body: ", response.text)
    ## Function for obtaining access token and account ID for calling Cloud Sync API
    def netappCloudSyncAuth(refreshToken: str) :
        ## Step 1: Obtain limited time access token using refresh token
        # Define parameters for API call
        url = "https://netapp-cloud-account.auth0.com/oauth/token"
        headers = {
            "Content-Type": "application/json"
        }
        data = {
            "grant_type": "refresh_token",
            "refresh_token": refreshToken,
            "client_id": "Mu0V1ywgYteI6w1MbD15fKfVIUrNXGWC"
        }
        # Call API to optain access token
        response = requests.post(url = url, headers = headers, data = json.dumps(data))
        # Parse response to retrieve access token
        try :
            responseBody = json.loads(response.text)
            accessToken = responseBody["access_token"]
        except :
            errorMessage = "Error obtaining access token from Cloud Sync API"
            raise APIResponseError(errorMessage, response)
        ## Step 2: Obtain account ID
        # Define parameters for API call
        url = "https://cloudsync.netapp.com/api/accounts"
        headers = {
            "Content-Type": "application/json",
            "Authorization": "Bearer " + accessToken
        }
        # Call API to obtain account ID
        response = requests.get(url = url, headers = headers)
        # Parse response to retrieve account ID
        try :
            responseBody = json.loads(response.text)
            accountId = responseBody[0]["accountId"]
        except :
            errorMessage = "Error obtaining account ID from Cloud Sync API"
            raise APIResponseError(errorMessage, response)
        # Return access token and account ID
        return accessToken, accountId
    ## Function for monitoring the progress of the latest update for a specific Cloud Sync relationship
    def netappCloudSyncMonitor(refreshToken: str, relationshipId: str, keepCheckingUntilComplete: bool = True, printProgress: bool = True, printResponses: bool = False) :
        # Step 1: Obtain access token and account ID for accessing Cloud Sync API
        try :
            accessToken, accountId = netappCloudSyncAuth(refreshToken = refreshToken)
        except APIResponseError as err:
            if printResponse :
                errorMessage = err.args[0]
                response = err.args[1]
                print(errorMessage)
                printAPIResponse(response)
            raise
        # Step 2: Obtain status of the latest update; optionally, keep checking until the latest update has completed
        while True :
            # Define parameters for API call
            url = "https://cloudsync.netapp.com/api/relationships-v2/%s" % (relationshipId)
            headers = {
                "Accept": "application/json",
                "x-account-id": accountId,
                "Authorization": "Bearer " + accessToken
            }
            # Call API to obtain status of latest update
            response = requests.get(url = url, headers = headers)
            # Print API response
            if printResponses :
                printAPIResponse(response)
            # Parse response to retrieve status of latest update
            try :
                responseBody = json.loads(response.text)
                latestActivityType = responseBody["activity"]["type"]
                latestActivityStatus = responseBody["activity"]["status"]
            except :
                errorMessage = "Error retrieving status of latest update from Cloud Sync API"
                raise APIResponseError(errorMessage, response)

            # End execution if the latest update is complete
            if latestActivityType == "Sync" and latestActivityStatus == "DONE" :
                if printProgress :
                    print("Success: Cloud Sync update is complete.")
                break
            # Print message re: progress
            if printProgress :
                print("Cloud Sync update is not yet complete.")
            # End execution if calling program doesn't want to monitor until the latest update has completed
            if not keepCheckingUntilComplete :
                break
            # Sleep for 60 seconds before checking progress again
            print("Checking again in 60 seconds...")
            time.sleep(60)
    # Retrieve Cloud Sync refresh token from Airflow connection
    connections = get_connections(conn_id = airflowConnectionName)
    cloudSyncConnection = connections[0]    # Assumes that you only have one connection with the specified conn_id configured in Airflow
    refreshToken = cloudSyncConnection.password

    # Step 1: Obtain access token and account ID for accessing Cloud Sync API
    try :
        accessToken, accountId = netappCloudSyncAuth(refreshToken = refreshToken)
    except APIResponseError as err:
        errorMessage = err.args[0]
        response = err.args[1]
        print(errorMessage)
        if printResponse :
            printAPIResponse(response)
        raise
    # Step 2: Trigger Cloud Sync update
    # Define parameters for API call
    url = "https://cloudsync.netapp.com/api/relationships/%s/sync" % (relationshipId)
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "x-account-id": accountId,
        "Authorization": "Bearer " + accessToken
    }
    # Call API to trigger update
    print("Triggering Cloud Sync update.")
    response = requests.put(url = url, headers = headers)
    # Check for API response status code of 202; if not 202, raise error
    if response.status_code != 202 :
        errorMessage = "Error calling Cloud Sync API to trigger update."
        if printResponse :
            print(errorMessage)
            printAPIResponse(response)
        raise APIResponseError(errorMessage, response)
    # Print API response
    if printResponse :
        print("Note: Status Code 202 denotes that update was successfully triggered.")
        printAPIResponse(response)

    print("Checking progress.")
    netappCloudSyncMonitor(refreshToken = refreshToken, relationshipId = relationshipId, keepCheckingUntilComplete = keepCheckingUntilComplete, printResponses = printResponse)
# Define DAG steps/workflow
with replicate_data_cloud_sync_dag as dag :
    # Define step to trigger a NetApp Cloud Sync update
    trigger_cloud_sync = PythonOperator(
        task_id='trigger-cloud-sync',
        python_callable=netappCloudSyncUpdate,
        op_kwargs={
            'airflowConnectionName': airflowConnectionName,
            'relationshipId': relationshipId
        },
        dag=dag
    )