Skip to main content
日本語は機械翻訳による参考訳です。内容に矛盾や不一致があった場合には、英語の内容が優先されます。

パート2 - SageMakerのモデルトレーニングのデータソースとしてAWS FSx for NetApp ONTAP(FSxN)を活用

共同作成者

作成者:
NetAppシニアデータ&アプリケーションサイエンティスト、Jian Jian(Ken)

はじめに

このチュートリアルでは、コンピュータビジョン分類プロジェクトの実践的な例を示し、SageMaker環境内でFSxNをデータソースとして使用するMLモデルを構築するための実践的な経験を提供します。このプロジェクトでは、ディープラーニングフレームワークであるPyTorchを使用して、タイヤの画像に基づいてタイヤの品質を分類することに焦点を当てています。Amazon SageMakerのデータソースとしてFSxNを使用した機械学習モデルの開発に重点を置いています。

FSxNとは

Amazon FSx for NetApp ONTAPは、AWSが提供するフルマネージドストレージ解決策です。ネットアップのONTAPファイルシステムを活用して、信頼性の高いハイパフォーマンスストレージを提供します。NFS、SMB、iSCSIなどのプロトコルをサポートしているため、さまざまなコンピューティングインスタンスやコンテナからシームレスにアクセスできます。このサービスは、卓越したパフォーマンスを提供し、高速かつ効率的なデータ運用を実現するように設計されています。また、高可用性とデータ保持性を実現し、データへのアクセスと保護を維持します。さらに、Amazon FSx for NetApp ONTAPのストレージ容量は拡張性に優れているため、ニーズに合わせて簡単に調整できます。

前提条件

ネットワーク環境

エラー:ネットワーク環境

FSxN(Amazon FSx for NetApp ONTAP)は、AWSのストレージサービスです。これには、NetApp ONTAPシステムで実行されているファイルシステムと、接続するAWSで管理されるSystem Virtual Machine(SVM)が含まれます。次の図では、AWSで管理されるNetApp ONTAPサーバがVPCの外部に配置されています。SVMはSageMakerとNetApp ONTAPシステムの仲介役として機能し、SageMakerからの処理要求を受け取り、基盤となるストレージに転送します。FSxNにアクセスするには、SageMakerをFSxN展開と同じVPC内に配置する必要があります。この構成により、SageMakerとFSxN間の通信とデータアクセスが保証されます。

データアクセス

実際のシナリオでは、データサイエンティストは通常、FSxNに保存されている既存のデータを利用して機械学習モデルを構築します。ただし、デモ目的では、FSxNファイルシステムは作成後に最初は空であるため、トレーニングデータを手動でアップロードする必要があります。これは、FSxNをボリュームとしてSageMakerにマウントすることで実現できます。ファイルシステムが正常にマウントされたら、データセットをマウントされた場所にアップロードして、SageMaker環境内でモデルをトレーニングするためにアクセスできるようにすることができます。このアプローチでは、SageMakerと連携してモデルの開発とトレーニングを行いながら、FSxNのストレージ容量と機能を活用できます。

データ読み取りプロセスでは、FSxNをプライベートS3バケットとして設定します。詳細な設定手順については、を参照してください。 "パート1 - AWS FSx for NetApp ONTAP(FSxN)をプライベートS3バケットとしてAWS SageMakerに統合する"

統合の概要

エラー:トレーニングワークフロー

FSxNのトレーニングデータを使用してSageMakerでディープラーニングモデルを構築するワークフローは、データローダーの定義、モデルのトレーニング、デプロイの3つの主なステップに要約できます。大まかに言えば、これらのステップはMLOpsパイプラインの基盤を形成します。ただし、各ステップには、包括的な実装のためのいくつかの詳細なサブステップが含まれています。これらのサブステップには、データの前処理、データセットの分割、モデルの構成、ハイパーパラメータの調整、モデルの評価など、さまざまなタスクが含まれます。 モデルの導入を支援します。これらの手順により、SageMaker環境内でFSxNからのトレーニングデータを使用してディープラーニングモデルを構築し、展開するための徹底的で効果的なプロセスが保証されます。

ステップバイステップの統合

データローダ

PyTorchディープラーニングネットワークをデータでトレーニングするために、データのフィードを容易にするためのデータローダーが作成されます。データローダーは、バッチサイズを定義するだけでなく、バッチ内の各レコードを読み取って前処理するための手順も決定します。データローダーを構成することで、データの処理をバッチで処理し、ディープラーニングネットワークのトレーニングを可能にします。

データローダーは3つの部分で構成されています。

前処理機能

from torchvision import transforms

preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224,224)),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

上記のコードスニペットは、torchvision.transforms*モジュールを使用した画像前処理変換の定義を示しています。このturtorialでは、一連の変換を適用するためにプリプロセスオブジェクトが作成されます。まず、*ToTensor()*変換は画像をテンソル表現に変換する。その後、*Resize(224,224*変換により、画像のサイズが224x224ピクセルの固定サイズに変更されます。最後に、 Normalize()*変換は、平均を減算し、各チャンネルに沿った標準偏差で割ることによってテンソル値を正規化します。正規化に使用される平均値と標準偏差値は、事前にトレーニングされたニューラルネットワークモデルで一般的に使用されます。全体的に、このコードは、画像データをテンソルに変換し、サイズを変更し、ピクセル値を正規化することで、事前にトレーニングされたモデルにさらに処理または入力できるように準備します。

PyTorchデータセットクラス

import torch
from io import BytesIO
from PIL import Image


class FSxNImageDataset(torch.utils.data.Dataset):
    def __init__(self, bucket, prefix='', preprocess=None):
        self.image_keys = [
            s3_obj.key
            for s3_obj in list(bucket.objects.filter(Prefix=prefix).all())
        ]
        self.preprocess = preprocess

    def __len__(self):
        return len(self.image_keys)

    def __getitem__(self, index):
        key = self.image_keys[index]
        response = bucket.Object(key)

        label = 1 if key[13:].startswith('defective') else 0

        image_bytes = response.get()['Body'].read()
        image = Image.open(BytesIO(image_bytes))
        if image.mode == 'L':
            image = image.convert('RGB')

        if self.preprocess is not None:
            image = self.preprocess(image)
        return image, label

このクラスは、データセット内のレコードの総数を取得する機能を提供し、各レコードのデータを読み取る方法を定義します。getItem*関数内で、コードはboto3 S3バケットオブジェクトを使用してFSxNからバイナリデータを取得します。FSxNからデータにアクセスするためのコードスタイルは、Amazon S3からデータを読み取るのと似ています。以降の説明では、プライベートS3オブジェクト Bucket *の作成プロセスについて詳しく説明します。

プライベートS3リポジトリとしてのFSxN

seed = 77                                                   # Random seed
bucket_name = '<Your ONTAP bucket name>'                    # The bucket name in ONTAP
aws_access_key_id = '<Your ONTAP bucket key id>'            # Please get this credential from ONTAP
aws_secret_access_key = '<Your ONTAP bucket access key>'    # Please get this credential from ONTAP
fsx_endpoint_ip = '<Your FSxN IP address>'                  # Please get this IP address from FSXN
import boto3

# Get session info
region_name = boto3.session.Session().region_name

# Initialize Fsxn S3 bucket object
# --- Start integrating SageMaker with FSXN ---
# This is the only code change we need to incorporate SageMaker with FSXN
s3_client: boto3.client = boto3.resource(
    's3',
    region_name=region_name,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    use_ssl=False,
    endpoint_url=f'http://{fsx_endpoint_ip}',
    config=boto3.session.Config(
        signature_version='s3v4',
        s3={'addressing_style': 'path'}
    )
)
# s3_client = boto3.resource('s3')
bucket = s3_client.Bucket(bucket_name)
# --- End integrating SageMaker with FSXN ---

SageMakerでFSxNからデータを読み取るために、S3プロトコルを使用してFSxNストレージを指すハンドラが作成されます。これにより、FSxNをプライベートS3バケットとして扱うことができます。ハンドラの設定では、FSxN SVMのIPアドレス、バケット名、および必要なクレデンシャルを指定します。これらの設定項目の入手方法については、次のWebサイトにあるドキュメントを参照してください。 "パート1 - AWS FSx for NetApp ONTAP(FSxN)をプライベートS3バケットとしてAWS SageMakerに統合する"

前述の例では、Bucketオブジェクトを使用してPyTorchデータセットオブジェクトをインスタンス化しています。データセットオブジェクトについては、次のセクションで詳しく説明します。

PyTorchデータローダ

from torch.utils.data import DataLoader
torch.manual_seed(seed)

# 1. Hyperparameters
batch_size = 64

# 2. Preparing for the dataset
dataset = FSxNImageDataset(bucket, 'dataset/tyre', preprocess=preprocess)

train, test = torch.utils.data.random_split(dataset, [1500, 356])

data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

この例では、64のバッチサイズが指定されています。これは、各バッチに64レコードが含まれることを示しています。PyTorch * Dataset *クラス、前処理関数、およびトレーニングバッチサイズを組み合わせることで、トレーニング用のデータローダーを取得します。このデータローダーは、トレーニングフェーズ中にデータセットをバッチで反復処理するプロセスを容易にします。

モデルトレーニング

from torch import nn


class TyreQualityClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3,32,(3,3)),
            nn.ReLU(),
            nn.Conv2d(32,32,(3,3)),
            nn.ReLU(),
            nn.Conv2d(32,64,(3,3)),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64*(224-6)*(224-6),2)
        )
    def forward(self, x):
        return self.model(x)
import datetime

num_epochs = 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = TyreQualityClassifier()
fn_loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


model.to(device)
for epoch in range(num_epochs):
    for idx, (X, y) in enumerate(data_loader):
        X = X.to(device)
        y = y.to(device)

        y_hat = model(X)

        loss = fn_loss(y_hat, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"Current Time: {current_time} - Epoch [{epoch+1}/{num_epochs}]- Batch [{idx + 1}] - Loss: {loss}", end='\r')

このコードは標準のPyTorchトレーニングプロセスを実装しています。これは、畳み込み層と線形層を使用してタイヤの品質を分類する*TireQualityClassifier*と呼ばれるニューラルネットワークモデルを定義します。トレーニングループはデータバッチを繰り返し、損失を計算し、バックプロパゲーションと最適化を使用してモデルのパラメータを更新します。さらに、現在の時刻、エポック、バッチ、および損失を監視するために印刷します。

モデルの導入

導入

import io
import os
import tarfile
import sagemaker

# 1. Save the PyTorch model to memory
buffer_model = io.BytesIO()
traced_model = torch.jit.script(model)
torch.jit.save(traced_model, buffer_model)

# 2. Upload to AWS S3
sagemaker_session = sagemaker.Session()
bucket_name_default = sagemaker_session.default_bucket()
model_name = f'tyre_quality_classifier.pth'

# 2.1. Zip PyTorch model into tar.gz file
buffer_zip = io.BytesIO()
with tarfile.open(fileobj=buffer_zip, mode="w:gz") as tar:
    # Add PyTorch pt file
    file_name = os.path.basename(model_name)
    file_name_with_extension = os.path.split(file_name)[-1]
    tarinfo = tarfile.TarInfo(file_name_with_extension)
    tarinfo.size = len(buffer_model.getbuffer())
    buffer_model.seek(0)
    tar.addfile(tarinfo, buffer_model)

# 2.2. Upload the tar.gz file to S3 bucket
buffer_zip.seek(0)
boto3.resource('s3') \
    .Bucket(bucket_name_default) \
    .Object(f'pytorch/{model_name}.tar.gz') \
    .put(Body=buffer_zip.getvalue())

このコードはPyTorchモデルを* Amazon S3 に保存します。これは、SageMakerが展開するためにモデルをS3に格納する必要があるためです。モデルを Amazon S3 *にアップロードすることで、SageMakerからアクセスできるようになり、デプロイされたモデルでのデプロイと推論が可能になります。

import time
from sagemaker.pytorch import PyTorchModel
from sagemaker.predictor import Predictor
from sagemaker.serializers import IdentitySerializer
from sagemaker.deserializers import JSONDeserializer


class TyreQualitySerializer(IdentitySerializer):
    CONTENT_TYPE = 'application/x-torch'

    def serialize(self, data):
        transformed_image = preprocess(data)
        tensor_image = torch.Tensor(transformed_image)

        serialized_data = io.BytesIO()
        torch.save(tensor_image, serialized_data)
        serialized_data.seek(0)
        serialized_data = serialized_data.read()

        return serialized_data


class TyreQualityPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super().__init__(
            endpoint_name,
            sagemaker_session=sagemaker_session,
            serializer=TyreQualitySerializer(),
            deserializer=JSONDeserializer(),
        )

sagemaker_model = PyTorchModel(
    model_data=f's3://{bucket_name_default}/pytorch/{model_name}.tar.gz',
    role=sagemaker.get_execution_role(),
    framework_version='2.0.1',
    py_version='py310',
    predictor_cls=TyreQualityPredictor,
    entry_point='inference.py',
    source_dir='code',
)

timestamp = int(time.time())
pytorch_endpoint_name = '{}-{}-{}'.format('tyre-quality-classifier', 'pt', timestamp)
sagemaker_predictor = sagemaker_model.deploy(
    initial_instance_count=1,
    instance_type='ml.p3.2xlarge',
    endpoint_name=pytorch_endpoint_name
)

このコードは、SageMakerへのPyTorchモデルのデプロイを容易にします。これは、入力データをPyTorchテンソルとして前処理してシリアライズするカスタムシリアライザ*TireQualitySerializer*を定義します。TireQualityPredictor*クラスは、定義されたシリアライザと*JSONDeserializer*を利用するカスタムプレディクタです。コードはまた、モデルのS3の場所、IAMの役割、フレームワークのバージョン、推論のエントリポイントを指定する PyTorchModel *オブジェクトを作成します。コードはタイムスタンプを生成し、モデルとタイムスタンプに基づいてエンドポイント名を構築します。最後に、インスタンス数、インスタンスタイプ、生成されたエンドポイント名を指定して、deployメソッドを使用してモデルをデプロイします。これにより、PyTorchモデルをデプロイし、SageMakerで推論できるようになります。

推論

image_object = list(bucket.objects.filter('dataset/tyre'))[0].get()
image_bytes = image_object['Body'].read()

with Image.open(with Image.open(BytesIO(image_bytes)) as image:
    predicted_classes = sagemaker_predictor.predict(image)

    print(predicted_classes)

次の例では、導入したエンドポイントを使用して推論を実行しています。