Skip to main content
NetApp Solutions
本繁體中文版使用機器翻譯,譯文僅供參考,若與英文版本牴觸,應以英文版本為準。

第 2 部分:運用 AWS Amazon FSX for NetApp ONTAP ( FSX ONTAP )做為 SageMaker 模型訓練的資料來源

貢獻者

本文是使用 Amazon FSX for NetApp ONTAP ( FSX ONTAP )來訓練 SageMaker 中的 PyTorch 機型的教學課程、特別是針對輪胎品質分類專案。

  • 作者: *
    NetApp 資深資料與應用科學家 Jian Jian ( Ken )

簡介

本教學課程提供電腦視覺分類專案的實用範例、提供建置 ML 模型的實際經驗、這些模型將 FSX ONTAP 作為 SageMaker 環境中的資料來源。此專案著重於使用深層學習架構 PyTorch 、根據輪胎影像來分類輪胎品質。它強調使用 FSX ONTAP 作為 Amazon SageMaker 資料來源的機器學習模型開發。

什麼是 FSX ONTAP

Amazon FSX ONTAP 確實是 AWS 提供的完全託管儲存解決方案。它利用 NetApp 的 ONTAP 檔案系統來提供可靠且高效能的儲存設備。支援 NFS 、 SMB 和 iSCSI 等傳輸協定、可從不同的運算執行個體和容器進行無縫存取。此服務旨在提供卓越的效能、確保快速且有效率的資料作業。它也提供高可用度和耐用度、確保資料保持可存取和保護的狀態。此外、 Amazon FSX ONTAP 的儲存容量可擴充、可讓您根據自己的需求輕鬆調整。

先決條件

網路環境

網路環境

FSX ONTAP ( Amazon FSX ONTAP )是 AWS 儲存服務。其中包括在 NetApp ONTAP 系統上執行的檔案系統、以及連接到它的 AWS 託管系統虛擬機器( SVM )。在所提供的圖表中、 AWS 管理的 NetApp ONTAP 伺服器位於 VPC 之外。SVM 是 SageMaker 和 NetApp ONTAP 系統之間的中介、可接收來自 SageMaker 的作業要求、並將其轉送至基礎儲存設備。若要存取 FSX ONTAP 、 SageMaker 必須與 FSX ONTAP 部署位於同一個 VPC 內。此組態可確保 SageMaker 和 FSX ONTAP 之間的通訊和資料存取。

資料存取

在實際案例中、資料科學家通常會利用儲存在 FSX ONTAP 中的現有資料來建置機器學習模型。不過、為了展示目的、由於 FSX ONTAP 檔案系統在建立後一開始是空的、因此必須手動上傳訓練資料。將 FSX ONTAP 當作 Volume 安裝至 SageMaker 即可達成此目標。檔案系統成功掛載後、您可以將資料集上傳至掛載位置、以便在 SageMaker 環境中訓練您的模型。此方法可讓您在與 SageMaker 合作進行模型開發與訓練時、善用 FSX ONTAP 的儲存容量與功能。

資料讀取程序包括將 FSX ONTAP 設定為私有 S3 儲存區。若要瞭解詳細的組態指示、請參閱"第 1 部分:將 Amazon FSX for NetApp ONTAP ( FSX ONTAP )整合為私有 S3 儲存區、並整合至 AWS SageMaker"

整合概述

訓練工作流程

使用 FSX ONTAP 中的訓練資料在 SageMaker 中建立深度學習模型的工作流程、可歸納為三個主要步驟:資料 Loader 定義、模型訓練和部署。在高層級、這些步驟是 MLOps 管道的基礎。不過、每個步驟都包含數個詳細的子步驟、以便全面實作。這些子步驟涵蓋各種工作、例如資料預先處理、資料集分割、模型組態、超參數調整、模型評估、 和模型部署。這些步驟可確保在 SageMaker 環境中、使用來自 FSX ONTAP 的訓練資料、建立及部署深度學習模型的完整有效程序。

逐步整合

資料載入器

為了訓練具有資料的 PyTorch 深度學習網路、我們建立了一個資料載入器、以利資料的輸入。資料載入程式不僅會定義批次大小、也會決定在批次中讀取及預先處理每筆記錄的程序。藉由設定資料載入器、我們可以分批處理資料處理、以便訓練深度學習網路。

資料載入器包含 3 個部分。

預處理功能

from torchvision import transforms

preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224,224)),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

上述程式碼片段示範使用 * torchsore.transformation* 模組來定義影像處理前轉換。在此轉乘中、會建立處理前物件以套用一系列的轉換。首先, ToTensor() 轉換會將影像轉換成張量表示法。隨後, Resize(224,224) 轉換會將影像調整為 24 x 224 像素的固定大小。最後、 * 標準化() * 轉換會減去平均值、並除以每個通道的標準差、以標準化張量值。用於正規化的平均和標準差值通常用於預先訓練的神經網路模型。整體而言、此程式碼會將影像資料轉換成張量、調整大小、以及將像素值正規化、以準備影像資料、以便進一步處理或輸入到預先訓練的模型中。

PyTorch Dataset Class

import torch
from io import BytesIO
from PIL import Image


class FSxNImageDataset(torch.utils.data.Dataset):
    def __init__(self, bucket, prefix='', preprocess=None):
        self.image_keys = [
            s3_obj.key
            for s3_obj in list(bucket.objects.filter(Prefix=prefix).all())
        ]
        self.preprocess = preprocess

    def __len__(self):
        return len(self.image_keys)

    def __getitem__(self, index):
        key = self.image_keys[index]
        response = bucket.Object(key)

        label = 1 if key[13:].startswith('defective') else 0

        image_bytes = response.get()['Body'].read()
        image = Image.open(BytesIO(image_bytes))
        if image.mode == 'L':
            image = image.convert('RGB')

        if self.preprocess is not None:
            image = self.preprocess(image)
        return image, label

此類別提供的功能可取得資料集中的記錄總數、並定義讀取每筆記錄資料的方法。在 getertis 函數中,代碼使用 boto3 S3 儲存區物件從 FSX ONTAP 擷取二進位資料。從 FSX ONTAP 存取資料的程式碼樣式類似於從 Amazon S3 讀取資料。隨後的說明將深入到私有 S3 物件 * 儲存庫 * 的建立程序。

將 FSX ONTAP 做為私有 S3 儲存庫

seed = 77                                                   # Random seed
bucket_name = '<Your ONTAP bucket name>'                    # The bucket name in ONTAP
aws_access_key_id = '<Your ONTAP bucket key id>'            # Please get this credential from ONTAP
aws_secret_access_key = '<Your ONTAP bucket access key>'    # Please get this credential from ONTAP
fsx_endpoint_ip = '<Your FSx ONTAP IP address>'                  # Please get this IP address from FSXN
import boto3

# Get session info
region_name = boto3.session.Session().region_name

# Initialize Fsxn S3 bucket object
# --- Start integrating SageMaker with FSXN ---
# This is the only code change we need to incorporate SageMaker with FSXN
s3_client: boto3.client = boto3.resource(
    's3',
    region_name=region_name,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    use_ssl=False,
    endpoint_url=f'http://{fsx_endpoint_ip}',
    config=boto3.session.Config(
        signature_version='s3v4',
        s3={'addressing_style': 'path'}
    )
)
# s3_client = boto3.resource('s3')
bucket = s3_client.Bucket(bucket_name)
# --- End integrating SageMaker with FSXN ---

若要從 SageMaker 中的 FSX ONTAP 讀取資料、會建立使用 S3 傳輸協定指向 FSX ONTAP 儲存設備的處理常式。如此可將 FSX ONTAP 視為私有 S3 儲存區。處理常式組態包括指定 FSX ONTAP SVM 的 IP 位址、貯體名稱和必要的認證。有關獲取這些配置項目的詳細說明,請參閱上的文檔"第 1 部分:將 Amazon FSX for NetApp ONTAP ( FSX ONTAP )整合為私有 S3 儲存區、並整合至 AWS SageMaker"

在上述範例中、貯體物件用於產生 PyTorch 資料集物件。後續章節將進一步說明 DataSet 物件。

PyTorch Data Loader

from torch.utils.data import DataLoader
torch.manual_seed(seed)

# 1. Hyperparameters
batch_size = 64

# 2. Preparing for the dataset
dataset = FSxNImageDataset(bucket, 'dataset/tyre', preprocess=preprocess)

train, test = torch.utils.data.random_split(dataset, [1500, 356])

data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

在所提供的範例中、會指定 64 個批次大小、表示每個批次將包含 64 個記錄。結合 PyTorch * Dataset* 課程、預處理功能和訓練批次大小、我們獲得訓練用的資料載入器。此資料載入器可協助在訓練階段中分批重複資料集的程序。

示範訓練

from torch import nn


class TyreQualityClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3,32,(3,3)),
            nn.ReLU(),
            nn.Conv2d(32,32,(3,3)),
            nn.ReLU(),
            nn.Conv2d(32,64,(3,3)),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64*(224-6)*(224-6),2)
        )
    def forward(self, x):
        return self.model(x)
import datetime

num_epochs = 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = TyreQualityClassifier()
fn_loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


model.to(device)
for epoch in range(num_epochs):
    for idx, (X, y) in enumerate(data_loader):
        X = X.to(device)
        y = y.to(device)

        y_hat = model(X)

        loss = fn_loss(y_hat, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"Current Time: {current_time} - Epoch [{epoch+1}/{num_epochs}]- Batch [{idx + 1}] - Loss: {loss}", end='\r')

此程式碼可實作標準的 PyTorch 訓練程序。它定義了一個稱為 TireQualityClassifier 的神經網路模型、使用卷積層和線性層來分類輪胎品質。訓練循環會反覆循環資料批次、計算遺失、並使用反向傳播和最佳化來更新模型參數。此外、它會列印目前時間、時期、批次和遺失、以供監控。

建構部署模式

部署

import io
import os
import tarfile
import sagemaker

# 1. Save the PyTorch model to memory
buffer_model = io.BytesIO()
traced_model = torch.jit.script(model)
torch.jit.save(traced_model, buffer_model)

# 2. Upload to AWS S3
sagemaker_session = sagemaker.Session()
bucket_name_default = sagemaker_session.default_bucket()
model_name = f'tyre_quality_classifier.pth'

# 2.1. Zip PyTorch model into tar.gz file
buffer_zip = io.BytesIO()
with tarfile.open(fileobj=buffer_zip, mode="w:gz") as tar:
    # Add PyTorch pt file
    file_name = os.path.basename(model_name)
    file_name_with_extension = os.path.split(file_name)[-1]
    tarinfo = tarfile.TarInfo(file_name_with_extension)
    tarinfo.size = len(buffer_model.getbuffer())
    buffer_model.seek(0)
    tar.addfile(tarinfo, buffer_model)

# 2.2. Upload the tar.gz file to S3 bucket
buffer_zip.seek(0)
boto3.resource('s3') \
    .Bucket(bucket_name_default) \
    .Object(f'pytorch/{model_name}.tar.gz') \
    .put(Body=buffer_zip.getvalue())

此程式碼會將 PyTorch 模型儲存至 * Amazon S2* 、因為 SageMaker 需要將模型儲存在 S3 中以進行部署。將模型上傳至 * Amazon S2* 、即可讓 SageMaker 存取、讓部署模型的部署和推斷得以實現。

import time
from sagemaker.pytorch import PyTorchModel
from sagemaker.predictor import Predictor
from sagemaker.serializers import IdentitySerializer
from sagemaker.deserializers import JSONDeserializer


class TyreQualitySerializer(IdentitySerializer):
    CONTENT_TYPE = 'application/x-torch'

    def serialize(self, data):
        transformed_image = preprocess(data)
        tensor_image = torch.Tensor(transformed_image)

        serialized_data = io.BytesIO()
        torch.save(tensor_image, serialized_data)
        serialized_data.seek(0)
        serialized_data = serialized_data.read()

        return serialized_data


class TyreQualityPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super().__init__(
            endpoint_name,
            sagemaker_session=sagemaker_session,
            serializer=TyreQualitySerializer(),
            deserializer=JSONDeserializer(),
        )

sagemaker_model = PyTorchModel(
    model_data=f's3://{bucket_name_default}/pytorch/{model_name}.tar.gz',
    role=sagemaker.get_execution_role(),
    framework_version='2.0.1',
    py_version='py310',
    predictor_cls=TyreQualityPredictor,
    entry_point='inference.py',
    source_dir='code',
)

timestamp = int(time.time())
pytorch_endpoint_name = '{}-{}-{}'.format('tyre-quality-classifier', 'pt', timestamp)
sagemaker_predictor = sagemaker_model.deploy(
    initial_instance_count=1,
    instance_type='ml.p3.2xlarge',
    endpoint_name=pytorch_endpoint_name
)

此程式碼有助於在 SageMaker 上部署 PyTorch 模型。它定義了自訂序列化器 * TireQualitySerializer* 、可將輸入資料預先處理並序列化為 PyTorch Tensor 。TireQualityPredictor 類是一種自定義的謂詞,它使用定義的序列化器和 JSONDeserializer 。程式碼也會建立一個 * PyTorchModel* 物件、以指定模型的 S3 位置、 IAM 角色、架構版本和推斷的進入點。程式碼會產生時間戳記、並根據模型和時間戳記來建構端點名稱。最後、使用部署方法來部署模型、指定執行個體數、執行個體類型和產生的端點名稱。如此一來、即可部署並存取 PyTorch 模型、以供 SageMaker 的推斷。

推斷

image_object = list(bucket.objects.filter('dataset/tyre'))[0].get()
image_bytes = image_object['Body'].read()

with Image.open(with Image.open(BytesIO(image_bytes)) as image:
    predicted_classes = sagemaker_predictor.predict(image)

    print(predicted_classes)

這是使用已部署端點進行推斷的範例。