Skip to main content
日本語は機械翻訳による参考訳です。内容に矛盾や不一致があった場合には、英語の内容が優先されます。

パート2 - SageMakerでのモデルトレーニングのデータソースとしてAWS Amazon FSx for NetApp ONTAP(FSx ONTAP)を活用

共同作成者

この記事は、特にタイヤ品質分類プロジェクトのために、SageMakerでPyTorchモデルをトレーニングするためにAmazon FSx for NetApp ONTAP(FSx ONTAP)を使用するためのチュートリアルです。

作成者:
NetAppシニアデータ&アプリケーションサイエンティスト、Jian Jian(Ken)

はじめに

このチュートリアルでは、コンピュータビジョン分類プロジェクトの実践的な例を紹介します。FSx ONTAPをSageMaker環境内のデータソースとして使用するMLモデルを構築するための実践的な経験を提供します。このプロジェクトでは、ディープラーニングフレームワークであるPyTorchを使用して、タイヤの画像に基づいてタイヤの品質を分類することに焦点を当てています。Amazon SageMakerのデータソースとしてFSx ONTAPを使用した機械学習モデルの開発に重点を置いています。

FSx ONTAPとは

Amazon FSx ONTAPは、AWSが提供するフルマネージドのストレージソリューションです。ネットアップのONTAPファイルシステムを活用して、信頼性の高いハイパフォーマンスストレージを提供します。NFS、SMB、iSCSIなどのプロトコルをサポートしているため、さまざまなコンピューティングインスタンスやコンテナからシームレスにアクセスできます。このサービスは、卓越したパフォーマンスを提供し、高速かつ効率的なデータ運用を実現するように設計されています。また、高可用性とデータ保持性を実現し、データへのアクセスと保護を維持します。さらに、Amazon FSx ONTAPのストレージ容量は拡張性に優れているため、ニーズに合わせて簡単に調整できます。

前提条件

ネットワーク環境

ネットワーク環境

FSx ONTAP(Amazon FSx ONTAP)は、AWSのストレージサービスです。これには、NetApp ONTAPシステムで実行されているファイルシステムと、接続するAWSで管理されるSystem Virtual Machine(SVM)が含まれます。次の図では、AWSで管理されるNetApp ONTAPサーバがVPCの外部に配置されています。SVMはSageMakerとNetApp ONTAPシステムの仲介役として機能し、SageMakerからの処理要求を受け取り、基盤となるストレージに転送します。FSx ONTAPにアクセスするには、SageMakerがFSx ONTAP環境と同じVPC内に配置されている必要があります。この構成により、SageMakerとFSx ONTAP間の通信とデータアクセスが保証されます。

データアクセス

実際のシナリオでは、データサイエンティストは通常、FSx ONTAPに保存されている既存のデータを利用して機械学習モデルを構築します。ただし、デモ目的では、FSx ONTAPファイルシステムは作成後に最初は空になるため、トレーニングデータを手動でアップロードする必要があります。これは、FSx ONTAPをボリュームとしてSageMakerにマウントすることで実現できます。ファイルシステムが正常にマウントされたら、データセットをマウントされた場所にアップロードして、SageMaker環境内でモデルをトレーニングするためにアクセスできるようにすることができます。このアプローチでは、FSx ONTAPのストレージ容量と機能を活用しながら、SageMakerと連携してモデルの開発とトレーニングを行うことができます。

データ読み取りプロセスでは、FSx ONTAPをプライベートS3バケットとして設定します。詳細な設定手順については、を参照してください。"パート1 - Amazon FSx for NetApp ONTAP(FSx ONTAP)をプライベートS3バケットとしてAWS SageMakerに統合する"

統合の概要

トレーニングワークフロー

FSx ONTAPのトレーニングデータを使用してSageMakerでディープラーニングモデルを構築するワークフローは、主に3つのステップ(Data Loaderの定義、モデルのトレーニング、導入)に要約できます。大まかに言えば、これらのステップはMLOpsパイプラインの基盤を形成します。ただし、各ステップには、包括的な実装のためのいくつかの詳細なサブステップが含まれています。これらのサブステップには、データの前処理、データセットの分割、モデルの構成、ハイパーパラメータの調整、モデルの評価など、さまざまなタスクが含まれます。 モデルの導入を支援します。これらの手順により、SageMaker環境内でFSx ONTAPのトレーニングデータを使用してディープラーニングモデルを構築し、導入するための徹底的で効果的なプロセスが保証されます。

ステップバイステップの統合

データローダ

PyTorchディープラーニングネットワークをデータでトレーニングするために、データのフィードを容易にするためのデータローダーが作成されます。データローダーは、バッチサイズを定義するだけでなく、バッチ内の各レコードを読み取って前処理するための手順も決定します。データローダーを構成することで、データの処理をバッチで処理し、ディープラーニングネットワークのトレーニングを可能にします。

データローダーは3つの部分で構成されています。

前処理機能

from torchvision import transforms

preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224,224)),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

上記のコードスニペットは、torchvision.transforms*モジュールを使用した画像前処理変換の定義を示しています。このturtorialでは、一連の変換を適用するためにプリプロセスオブジェクトが作成されます。まず、*ToTensor()*変換は画像をテンソル表現に変換する。その後、*Resize(224,224*変換により、画像のサイズが224x224ピクセルの固定サイズに変更されます。最後に、 Normalize()*変換は、平均を減算し、各チャンネルに沿った標準偏差で割ることによってテンソル値を正規化します。正規化に使用される平均値と標準偏差値は、事前にトレーニングされたニューラルネットワークモデルで一般的に使用されます。全体的に、このコードは、画像データをテンソルに変換し、サイズを変更し、ピクセル値を正規化することで、事前にトレーニングされたモデルにさらに処理または入力できるように準備します。

PyTorchデータセットクラス

import torch
from io import BytesIO
from PIL import Image


class FSxNImageDataset(torch.utils.data.Dataset):
    def __init__(self, bucket, prefix='', preprocess=None):
        self.image_keys = [
            s3_obj.key
            for s3_obj in list(bucket.objects.filter(Prefix=prefix).all())
        ]
        self.preprocess = preprocess

    def __len__(self):
        return len(self.image_keys)

    def __getitem__(self, index):
        key = self.image_keys[index]
        response = bucket.Object(key)

        label = 1 if key[13:].startswith('defective') else 0

        image_bytes = response.get()['Body'].read()
        image = Image.open(BytesIO(image_bytes))
        if image.mode == 'L':
            image = image.convert('RGB')

        if self.preprocess is not None:
            image = self.preprocess(image)
        return image, label

このクラスは、データセット内のレコードの総数を取得する機能を提供し、各レコードのデータを読み取る方法を定義します。getItem*関数内で、コードはboto3 S3バケットオブジェクトを利用して、FSx ONTAPからバイナリデータを取得します。FSx ONTAPからデータにアクセスするためのコードスタイルは、Amazon S3からデータを読み取るのと似ています。以降の説明では、プライベートS3オブジェクト Bucket *の作成プロセスについて詳しく説明します。

FSx ONTAPをプライベートS3リポジトリとして使用

seed = 77                                                   # Random seed
bucket_name = '<Your ONTAP bucket name>'                    # The bucket name in ONTAP
aws_access_key_id = '<Your ONTAP bucket key id>'            # Please get this credential from ONTAP
aws_secret_access_key = '<Your ONTAP bucket access key>'    # Please get this credential from ONTAP
fsx_endpoint_ip = '<Your FSx ONTAP IP address>'                  # Please get this IP address from FSXN
import boto3

# Get session info
region_name = boto3.session.Session().region_name

# Initialize Fsxn S3 bucket object
# --- Start integrating SageMaker with FSXN ---
# This is the only code change we need to incorporate SageMaker with FSXN
s3_client: boto3.client = boto3.resource(
    's3',
    region_name=region_name,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    use_ssl=False,
    endpoint_url=f'http://{fsx_endpoint_ip}',
    config=boto3.session.Config(
        signature_version='s3v4',
        s3={'addressing_style': 'path'}
    )
)
# s3_client = boto3.resource('s3')
bucket = s3_client.Bucket(bucket_name)
# --- End integrating SageMaker with FSXN ---

SageMakerでFSx ONTAPからデータを読み取るために、S3プロトコルを使用してFSx ONTAPストレージを指すハンドラが作成されます。これにより、FSx ONTAPをプライベートS3バケットとして扱うことができます。ハンドラの設定には、FSx ONTAP SVMのIPアドレス、バケット名、および必要なクレデンシャルの指定が含まれます。これらの設定項目の入手方法については、のドキュメントを参照してください"パート1 - Amazon FSx for NetApp ONTAP(FSx ONTAP)をプライベートS3バケットとしてAWS SageMakerに統合する"

前述の例では、Bucketオブジェクトを使用してPyTorchデータセットオブジェクトをインスタンス化しています。データセットオブジェクトについては、次のセクションで詳しく説明します。

PyTorchデータローダ

from torch.utils.data import DataLoader
torch.manual_seed(seed)

# 1. Hyperparameters
batch_size = 64

# 2. Preparing for the dataset
dataset = FSxNImageDataset(bucket, 'dataset/tyre', preprocess=preprocess)

train, test = torch.utils.data.random_split(dataset, [1500, 356])

data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

この例では、64のバッチサイズが指定されています。これは、各バッチに64レコードが含まれることを示しています。PyTorch * Dataset *クラス、前処理関数、およびトレーニングバッチサイズを組み合わせることで、トレーニング用のデータローダーを取得します。このデータローダーは、トレーニングフェーズ中にデータセットをバッチで反復処理するプロセスを容易にします。

モデルトレーニング

from torch import nn


class TyreQualityClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3,32,(3,3)),
            nn.ReLU(),
            nn.Conv2d(32,32,(3,3)),
            nn.ReLU(),
            nn.Conv2d(32,64,(3,3)),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64*(224-6)*(224-6),2)
        )
    def forward(self, x):
        return self.model(x)
import datetime

num_epochs = 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = TyreQualityClassifier()
fn_loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


model.to(device)
for epoch in range(num_epochs):
    for idx, (X, y) in enumerate(data_loader):
        X = X.to(device)
        y = y.to(device)

        y_hat = model(X)

        loss = fn_loss(y_hat, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"Current Time: {current_time} - Epoch [{epoch+1}/{num_epochs}]- Batch [{idx + 1}] - Loss: {loss}", end='\r')

このコードは標準のPyTorchトレーニングプロセスを実装しています。これは、畳み込み層と線形層を使用してタイヤの品質を分類する*TireQualityClassifier*と呼ばれるニューラルネットワークモデルを定義します。トレーニングループはデータバッチを繰り返し、損失を計算し、バックプロパゲーションと最適化を使用してモデルのパラメータを更新します。さらに、現在の時刻、エポック、バッチ、および損失を監視するために印刷します。

モデルの導入

導入

import io
import os
import tarfile
import sagemaker

# 1. Save the PyTorch model to memory
buffer_model = io.BytesIO()
traced_model = torch.jit.script(model)
torch.jit.save(traced_model, buffer_model)

# 2. Upload to AWS S3
sagemaker_session = sagemaker.Session()
bucket_name_default = sagemaker_session.default_bucket()
model_name = f'tyre_quality_classifier.pth'

# 2.1. Zip PyTorch model into tar.gz file
buffer_zip = io.BytesIO()
with tarfile.open(fileobj=buffer_zip, mode="w:gz") as tar:
    # Add PyTorch pt file
    file_name = os.path.basename(model_name)
    file_name_with_extension = os.path.split(file_name)[-1]
    tarinfo = tarfile.TarInfo(file_name_with_extension)
    tarinfo.size = len(buffer_model.getbuffer())
    buffer_model.seek(0)
    tar.addfile(tarinfo, buffer_model)

# 2.2. Upload the tar.gz file to S3 bucket
buffer_zip.seek(0)
boto3.resource('s3') \
    .Bucket(bucket_name_default) \
    .Object(f'pytorch/{model_name}.tar.gz') \
    .put(Body=buffer_zip.getvalue())

このコードはPyTorchモデルを* Amazon S3 に保存します。これは、SageMakerが展開するためにモデルをS3に格納する必要があるためです。モデルを Amazon S3 *にアップロードすることで、SageMakerからアクセスできるようになり、デプロイされたモデルでのデプロイと推論が可能になります。

import time
from sagemaker.pytorch import PyTorchModel
from sagemaker.predictor import Predictor
from sagemaker.serializers import IdentitySerializer
from sagemaker.deserializers import JSONDeserializer


class TyreQualitySerializer(IdentitySerializer):
    CONTENT_TYPE = 'application/x-torch'

    def serialize(self, data):
        transformed_image = preprocess(data)
        tensor_image = torch.Tensor(transformed_image)

        serialized_data = io.BytesIO()
        torch.save(tensor_image, serialized_data)
        serialized_data.seek(0)
        serialized_data = serialized_data.read()

        return serialized_data


class TyreQualityPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super().__init__(
            endpoint_name,
            sagemaker_session=sagemaker_session,
            serializer=TyreQualitySerializer(),
            deserializer=JSONDeserializer(),
        )

sagemaker_model = PyTorchModel(
    model_data=f's3://{bucket_name_default}/pytorch/{model_name}.tar.gz',
    role=sagemaker.get_execution_role(),
    framework_version='2.0.1',
    py_version='py310',
    predictor_cls=TyreQualityPredictor,
    entry_point='inference.py',
    source_dir='code',
)

timestamp = int(time.time())
pytorch_endpoint_name = '{}-{}-{}'.format('tyre-quality-classifier', 'pt', timestamp)
sagemaker_predictor = sagemaker_model.deploy(
    initial_instance_count=1,
    instance_type='ml.p3.2xlarge',
    endpoint_name=pytorch_endpoint_name
)

このコードは、SageMakerへのPyTorchモデルのデプロイを容易にします。これは、入力データをPyTorchテンソルとして前処理してシリアライズするカスタムシリアライザ*TireQualitySerializer*を定義します。TireQualityPredictor*クラスは、定義されたシリアライザと*JSONDeserializer*を利用するカスタムプレディクタです。コードはまた、モデルのS3の場所、IAMの役割、フレームワークのバージョン、推論のエントリポイントを指定する PyTorchModel *オブジェクトを作成します。コードはタイムスタンプを生成し、モデルとタイムスタンプに基づいてエンドポイント名を構築します。最後に、インスタンス数、インスタンスタイプ、生成されたエンドポイント名を指定して、deployメソッドを使用してモデルをデプロイします。これにより、PyTorchモデルをデプロイし、SageMakerで推論できるようになります。

推論

image_object = list(bucket.objects.filter('dataset/tyre'))[0].get()
image_bytes = image_object['Body'].read()

with Image.open(with Image.open(BytesIO(image_bytes)) as image:
    predicted_classes = sagemaker_predictor.predict(image)

    print(predicted_classes)

次の例では、導入したエンドポイントを使用して推論を実行しています。