Skip to main content
NetApp Solutions
本繁體中文版使用機器翻譯,譯文僅供參考,若與英文版本牴觸,應以英文版本為準。

第 2 部分:運用 AWS FSx for NetApp ONTAP ( FSxN )做為 SageMaker 模型訓練的資料來源

貢獻者
  • 作者: *
    NetApp 資深資料與應用科學家 Jian Jian ( Ken )

簡介

本教學課程提供電腦視覺分類專案的實用範例、提供建置 ML 模型的實際經驗、這些模型將 FSxN 作為 SageMaker 環境中的資料來源。此專案著重於使用深層學習架構 PyTorch 、根據輪胎影像來分類輪胎品質。它強調使用 FSxN 做為 Amazon SageMaker 資料來源的機器學習模型開發。

什麼是 FSxN

Amazon FSX for NetApp ONTAP 確實是 AWS 提供的完全託管儲存解決方案。它利用 NetApp 的 ONTAP 檔案系統來提供可靠且高效能的儲存設備。支援 NFS 、 SMB 和 iSCSI 等傳輸協定、可從不同的運算執行個體和容器進行無縫存取。此服務旨在提供卓越的效能、確保快速且有效率的資料作業。它也提供高可用度和耐用度、確保資料保持可存取和保護的狀態。此外、 Amazon FSX for NetApp ONTAP 的儲存容量可擴充、可讓您根據自己的需求輕鬆調整。

先決條件

網路環境

錯誤:網路環境

FSxN ( Amazon FSx for NetApp ONTAP )是 AWS 儲存服務。其中包括在 NetApp ONTAP 系統上執行的檔案系統、以及連接到它的 AWS 託管系統虛擬機器( SVM )。在所提供的圖表中、 AWS 管理的 NetApp ONTAP 伺服器位於 VPC 之外。SVM 是 SageMaker 和 NetApp ONTAP 系統之間的中介、可接收來自 SageMaker 的作業要求、並將其轉送至基礎儲存設備。若要存取 FSxN 、 SageMaker 必須與 FSxN 部署位於同一個 VPC 內。此組態可確保 SageMaker 和 FSxN 之間的通訊和資料存取。

資料存取

在實際案例中、資料科學家通常會利用儲存在 FSxN 中的現有資料來建置機器學習模型。不過、為了進行示範、由於 FSxN 檔案系統在建立後一開始是空的、因此必須手動上傳訓練資料。這可以透過將 FSxN 當作 Volume 安裝到 SageMaker 來達成。檔案系統成功掛載後、您可以將資料集上傳至掛載位置、以便在 SageMaker 環境中訓練您的模型。此方法可讓您在與 SageMaker 合作進行模型開發與訓練時、善用 FSxN 的儲存容量與功能。

資料讀取程序包括將 FSxN 設定為私有 S3 儲存區。若要瞭解詳細的組態指示、請參閱 "第 1 部分:將 AWS FSx for NetApp ONTAP ( FSxN )整合為私有 S3 儲存區、並整合至 AWS SageMaker"

整合概述

錯誤:訓練工作流程

使用 FSxN 中的訓練資料在 SageMaker 中建立深度學習模型的工作流程、可歸納為三個主要步驟:資料載入程式定義、模型訓練和部署。在高層級、這些步驟是 MLOps 管道的基礎。不過、每個步驟都包含數個詳細的子步驟、以便全面實作。這些子步驟涵蓋各種工作、例如資料預先處理、資料集分割、模型組態、超參數調整、模型評估、 和模型部署。這些步驟可確保在 SageMaker 環境中、使用 FSxN 的訓練資料、建立及部署深度學習模型的完整且有效的程序。

逐步整合

資料載入器

為了訓練具有資料的 PyTorch 深度學習網路、我們建立了一個資料載入器、以利資料的輸入。資料載入程式不僅會定義批次大小、也會決定在批次中讀取及預先處理每筆記錄的程序。藉由設定資料載入器、我們可以分批處理資料處理、以便訓練深度學習網路。

資料載入器包含 3 個部分。

預處理功能

from torchvision import transforms

preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224,224)),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

上述程式碼片段示範使用 * torchsore.transformation* 模組來定義影像處理前轉換。在此轉乘中、會建立處理前物件以套用一系列的轉換。首先, ToTensor() 轉換會將影像轉換成張量表示法。隨後, Resize(224,224) 轉換會將影像調整為 24 x 224 像素的固定大小。最後、 * 標準化() * 轉換會減去平均值、並除以每個通道的標準差、以標準化張量值。用於正規化的平均和標準差值通常用於預先訓練的神經網路模型。整體而言、此程式碼會將影像資料轉換成張量、調整大小、以及將像素值正規化、以準備影像資料、以便進一步處理或輸入到預先訓練的模型中。

PyTorch Dataset Class

import torch
from io import BytesIO
from PIL import Image


class FSxNImageDataset(torch.utils.data.Dataset):
    def __init__(self, bucket, prefix='', preprocess=None):
        self.image_keys = [
            s3_obj.key
            for s3_obj in list(bucket.objects.filter(Prefix=prefix).all())
        ]
        self.preprocess = preprocess

    def __len__(self):
        return len(self.image_keys)

    def __getitem__(self, index):
        key = self.image_keys[index]
        response = bucket.Object(key)

        label = 1 if key[13:].startswith('defective') else 0

        image_bytes = response.get()['Body'].read()
        image = Image.open(BytesIO(image_bytes))
        if image.mode == 'L':
            image = image.convert('RGB')

        if self.preprocess is not None:
            image = self.preprocess(image)
        return image, label

此類別提供的功能可取得資料集中的記錄總數、並定義讀取每筆記錄資料的方法。在 getertis_ 函數中,代碼使用 boto3 S3 儲存區物件從 FSxN 擷取二進位資料。從 FSxN 存取資料的程式碼樣式類似於從 Amazon S3 讀取資料。隨後的說明將深入到私有 S3 物件 * 儲存庫 * 的建立程序。

FSxN 做為私有 S3 儲存庫

seed = 77                                                   # Random seed
bucket_name = '<Your ONTAP bucket name>'                    # The bucket name in ONTAP
aws_access_key_id = '<Your ONTAP bucket key id>'            # Please get this credential from ONTAP
aws_secret_access_key = '<Your ONTAP bucket access key>'    # Please get this credential from ONTAP
fsx_endpoint_ip = '<Your FSxN IP address>'                  # Please get this IP address from FSXN
import boto3

# Get session info
region_name = boto3.session.Session().region_name

# Initialize Fsxn S3 bucket object
# --- Start integrating SageMaker with FSXN ---
# This is the only code change we need to incorporate SageMaker with FSXN
s3_client: boto3.client = boto3.resource(
    's3',
    region_name=region_name,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    use_ssl=False,
    endpoint_url=f'http://{fsx_endpoint_ip}',
    config=boto3.session.Config(
        signature_version='s3v4',
        s3={'addressing_style': 'path'}
    )
)
# s3_client = boto3.resource('s3')
bucket = s3_client.Bucket(bucket_name)
# --- End integrating SageMaker with FSXN ---

若要從 SageMaker 中的 FSxN 讀取資料、會建立一個處理常式、使用 S3 傳輸協定指向 FSxN 儲存設備。如此可將 FSxN 視為私有 S3 儲存區。處理常式組態包括指定 FSxN SVM 的 IP 位址、貯體名稱和必要的認證。如需取得這些組態項目的完整說明、請參閱上的文件 "第 1 部分:將 AWS FSx for NetApp ONTAP ( FSxN )整合為私有 S3 儲存區、並整合至 AWS SageMaker"

在上述範例中、貯體物件用於產生 PyTorch 資料集物件。後續章節將進一步說明 DataSet 物件。

PyTorch Data Loader

from torch.utils.data import DataLoader
torch.manual_seed(seed)

# 1. Hyperparameters
batch_size = 64

# 2. Preparing for the dataset
dataset = FSxNImageDataset(bucket, 'dataset/tyre', preprocess=preprocess)

train, test = torch.utils.data.random_split(dataset, [1500, 356])

data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

在所提供的範例中、會指定 64 個批次大小、表示每個批次將包含 64 個記錄。結合 PyTorch * Dataset* 課程、預處理功能和訓練批次大小、我們獲得訓練用的資料載入器。此資料載入器可協助在訓練階段中分批重複資料集的程序。

示範訓練

from torch import nn


class TyreQualityClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3,32,(3,3)),
            nn.ReLU(),
            nn.Conv2d(32,32,(3,3)),
            nn.ReLU(),
            nn.Conv2d(32,64,(3,3)),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64*(224-6)*(224-6),2)
        )
    def forward(self, x):
        return self.model(x)
import datetime

num_epochs = 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = TyreQualityClassifier()
fn_loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


model.to(device)
for epoch in range(num_epochs):
    for idx, (X, y) in enumerate(data_loader):
        X = X.to(device)
        y = y.to(device)

        y_hat = model(X)

        loss = fn_loss(y_hat, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"Current Time: {current_time} - Epoch [{epoch+1}/{num_epochs}]- Batch [{idx + 1}] - Loss: {loss}", end='\r')

此程式碼可實作標準的 PyTorch 訓練程序。它定義了一個稱為 TireQualityClassifier 的神經網路模型、使用卷積層和線性層來分類輪胎品質。訓練循環會反覆循環資料批次、計算遺失、並使用反向傳播和最佳化來更新模型參數。此外、它會列印目前時間、時期、批次和遺失、以供監控。

建構部署模式

部署

import io
import os
import tarfile
import sagemaker

# 1. Save the PyTorch model to memory
buffer_model = io.BytesIO()
traced_model = torch.jit.script(model)
torch.jit.save(traced_model, buffer_model)

# 2. Upload to AWS S3
sagemaker_session = sagemaker.Session()
bucket_name_default = sagemaker_session.default_bucket()
model_name = f'tyre_quality_classifier.pth'

# 2.1. Zip PyTorch model into tar.gz file
buffer_zip = io.BytesIO()
with tarfile.open(fileobj=buffer_zip, mode="w:gz") as tar:
    # Add PyTorch pt file
    file_name = os.path.basename(model_name)
    file_name_with_extension = os.path.split(file_name)[-1]
    tarinfo = tarfile.TarInfo(file_name_with_extension)
    tarinfo.size = len(buffer_model.getbuffer())
    buffer_model.seek(0)
    tar.addfile(tarinfo, buffer_model)

# 2.2. Upload the tar.gz file to S3 bucket
buffer_zip.seek(0)
boto3.resource('s3') \
    .Bucket(bucket_name_default) \
    .Object(f'pytorch/{model_name}.tar.gz') \
    .put(Body=buffer_zip.getvalue())

此程式碼會將 PyTorch 模型儲存至 * Amazon S2* 、因為 SageMaker 需要將模型儲存在 S3 中以進行部署。將模型上傳至 * Amazon S2* 、即可讓 SageMaker 存取、讓部署模型的部署和推斷得以實現。

import time
from sagemaker.pytorch import PyTorchModel
from sagemaker.predictor import Predictor
from sagemaker.serializers import IdentitySerializer
from sagemaker.deserializers import JSONDeserializer


class TyreQualitySerializer(IdentitySerializer):
    CONTENT_TYPE = 'application/x-torch'

    def serialize(self, data):
        transformed_image = preprocess(data)
        tensor_image = torch.Tensor(transformed_image)

        serialized_data = io.BytesIO()
        torch.save(tensor_image, serialized_data)
        serialized_data.seek(0)
        serialized_data = serialized_data.read()

        return serialized_data


class TyreQualityPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super().__init__(
            endpoint_name,
            sagemaker_session=sagemaker_session,
            serializer=TyreQualitySerializer(),
            deserializer=JSONDeserializer(),
        )

sagemaker_model = PyTorchModel(
    model_data=f's3://{bucket_name_default}/pytorch/{model_name}.tar.gz',
    role=sagemaker.get_execution_role(),
    framework_version='2.0.1',
    py_version='py310',
    predictor_cls=TyreQualityPredictor,
    entry_point='inference.py',
    source_dir='code',
)

timestamp = int(time.time())
pytorch_endpoint_name = '{}-{}-{}'.format('tyre-quality-classifier', 'pt', timestamp)
sagemaker_predictor = sagemaker_model.deploy(
    initial_instance_count=1,
    instance_type='ml.p3.2xlarge',
    endpoint_name=pytorch_endpoint_name
)

此程式碼有助於在 SageMaker 上部署 PyTorch 模型。它定義了自訂序列化器 * TireQualitySerializer* 、可將輸入資料預先處理並序列化為 PyTorch Tensor 。TireQualityPredictor 類是一種自定義的謂詞,它使用定義的序列化器和 JSONDeserializer 。程式碼也會建立一個 * PyTorchModel* 物件、以指定模型的 S3 位置、 IAM 角色、架構版本和推斷的進入點。程式碼會產生時間戳記、並根據模型和時間戳記來建構端點名稱。最後、使用部署方法來部署模型、指定執行個體數、執行個體類型和產生的端點名稱。如此一來、即可部署並存取 PyTorch 模型、以供 SageMaker 的推斷。

推斷

image_object = list(bucket.objects.filter('dataset/tyre'))[0].get()
image_bytes = image_object['Body'].read()

with Image.open(with Image.open(BytesIO(image_bytes)) as image:
    predicted_classes = sagemaker_predictor.predict(image)

    print(predicted_classes)

這是使用已部署端點進行推斷的範例。