Skip to main content
NetApp Solutions
简体中文版经机器翻译而成,仅供参考。如与英语版出现任何冲突,应以英语版为准。

第2部分—利用AWS Amazon FSx for NetApp ONTAP (FSx ONTAP)作为SageMaker模型训练的数据源

贡献者

本文是有关使用Amazon FSx for NetApp ONTAP (FSx ONTAP)在SageMaker中训练PyTorch模型的教程、专门针对轮胎质量分类项目。

作者:
Jian Jian (Ken)、NetApp高级数据和应用科学人员

简介

本教程提供了计算机视觉分类项目的一个实际示例、提供了构建ML模型的实践经验、该模型利用FSx ONTAP作为SageMaker环境中的数据源。该项目侧重于使用深度学习框架PyTorch、根据轮胎图像对轮胎质量进行分类。它侧重于使用FSx ONTAP作为Amazon SageMaker中的数据源来开发机器学习模型。

什么是FSx ONTAP

Amazon FSx ONTAP确实是由AWS提供的完全托管存储解决方案。它利用NetApp的ONTAP文件系统提供可靠的高性能存储。由于支持NFS、SMB和iSCSI等协议、因此可以从不同的计算实例和容器无缝访问。该服务旨在提供卓越的性能、确保快速高效的数据运营。它还提供高可用性和持久性、确保您的数据始终可访问并受到保护。此外、Amazon FSx ONTAP的存储容量可扩展、使您可以根据需要轻松调整。

前提条件

网络环境

网络环境

FSx ONTAP (Amazon FSx ONTAP)是一项AWS存储服务。它包括在NetApp ONTAP系统上运行的文件系统以及与其连接的AWS托管系统虚拟机(SVM)。在提供的图中、由AWS管理的NetApp ONTAP服务器位于VPC之外。SVM充当SageMaker和NetApp ONTAP系统之间的中介、接收来自SageMaker的操作请求并将其转发到底层存储。要访问FSx ONTAP、SageMaker必须与FSx ONTAP部署位于同一个VPC中。此配置可确保SageMaker和FSx ONTAP之间的通信和数据访问。

数据访问

在实际场景中、数据科学家通常会利用FSx ONTAP中存储的现有数据来构建其机器学习模型。但是、出于演示目的、由于FSx ONTAP文件系统在创建后最初为空、因此需要手动上传训练数据。这可以通过将FSx ONTAP作为卷挂载到SageMaker来实现。成功挂载文件系统后、您可以将数据集上传到挂载位置、以便在SageMaker环境中训练模型。通过这种方法、您可以利用FSx ONTAP的存储容量和功能、同时与SageMaker合作进行模型开发和训练。

数据读取过程涉及将FSx ONTAP配置为专用S3存储分段。要了解详细的配置说明、请参见"第1部分—将Amazon FSx for NetApp ONTAP (FSx ONTAP)作为私有S3存储分段集成到AWS SageMaker中"

集成概述

培训工作流

使用FSx ONTAP中的训练数据在SageMaker中构建深度学习模型的工作流可概括为三个主要步骤:Data Loader定义、模型训练和部署。总体而言、这些步骤构成了MLOps管道的基础。但是、每个步骤都涉及多个详细的子步骤、以实现全面实施。这些子步骤包括各种任务、例如数据预处理、数据集拆分、模型配置、超参数调整、模型评估、 和型号部署。这些步骤可确保在SageMaker环境中使用FSx ONTAP中的训练数据构建和部署深度学习模型的流程全面有效。

分步集成

数据加载程序

为了训练使用数据的PyTorch深度学习网络、我们创建了一个数据加载程序来促进数据馈送。数据加载程序不仅可以定义批大小、还可以确定用于读取和预处理批处理中每个记录的操作步骤。通过配置数据加载程序、我们可以处理批量数据处理、从而实现深度学习网络的训练。

数据加载程序由3个部分组成。

预处理功能

from torchvision import transforms

preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224,224)),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

上述代码段演示了使用*torchVISION.transforms*模块的图像预处理转换的定义。在此图示中、创建预处理对象以应用一系列转换。首先,*ToTendor()*转换将图像转换为张图表示。随后,*Resize224224*转换将图像的大小调整为固定的224x224像素大小。最后,*NORMDE()*转换通过减去平均值并除以沿每个通道的标准偏差来使张量值标准化。用于标准化的平均值和标准偏差值通常用于经过预先训练的神经网络模型。总之、该代码通过将图像数据转换为张量、调整图像大小和使像素值标准化来准备图像数据、以便进一步处理或输入到预先训练的模型中。

PyTorch数据集类

import torch
from io import BytesIO
from PIL import Image


class FSxNImageDataset(torch.utils.data.Dataset):
    def __init__(self, bucket, prefix='', preprocess=None):
        self.image_keys = [
            s3_obj.key
            for s3_obj in list(bucket.objects.filter(Prefix=prefix).all())
        ]
        self.preprocess = preprocess

    def __len__(self):
        return len(self.image_keys)

    def __getitem__(self, index):
        key = self.image_keys[index]
        response = bucket.Object(key)

        label = 1 if key[13:].startswith('defective') else 0

        image_bytes = response.get()['Body'].read()
        image = Image.open(BytesIO(image_bytes))
        if image.mode == 'L':
            image = image.convert('RGB')

        if self.preprocess is not None:
            image = self.preprocess(image)
        return image, label

此类提供了获取数据集中记录总数的功能,并定义了读取每个记录的数据的方法。在*_gottim*函数中,代码利用boto3 S3存储分段对象从FSx ONTAP中检索二进制数据。从FSx ONTAP访问数据的代码模式类似于从Amazon S3读取数据。后面的说明将深入介绍私有S3对象*bket*的创建过程。

FSx ONTAP作为私有S3存储库

seed = 77                                                   # Random seed
bucket_name = '<Your ONTAP bucket name>'                    # The bucket name in ONTAP
aws_access_key_id = '<Your ONTAP bucket key id>'            # Please get this credential from ONTAP
aws_secret_access_key = '<Your ONTAP bucket access key>'    # Please get this credential from ONTAP
fsx_endpoint_ip = '<Your FSx ONTAP IP address>'                  # Please get this IP address from FSXN
import boto3

# Get session info
region_name = boto3.session.Session().region_name

# Initialize Fsxn S3 bucket object
# --- Start integrating SageMaker with FSXN ---
# This is the only code change we need to incorporate SageMaker with FSXN
s3_client: boto3.client = boto3.resource(
    's3',
    region_name=region_name,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    use_ssl=False,
    endpoint_url=f'http://{fsx_endpoint_ip}',
    config=boto3.session.Config(
        signature_version='s3v4',
        s3={'addressing_style': 'path'}
    )
)
# s3_client = boto3.resource('s3')
bucket = s3_client.Bucket(bucket_name)
# --- End integrating SageMaker with FSXN ---

要从SageMaker中的FSx ONTAP读取数据、需要创建一个使用S3协议指向FSx ONTAP存储的处理程序。这样、FSx ONTAP便可视为私有S3存储分段。处理程序配置包括指定FSx ONTAP SVM的IP地址、分段名称和所需凭据。有关获取这些配置项的完整说明,请参阅上的文档"第1部分—将Amazon FSx for NetApp ONTAP (FSx ONTAP)作为私有S3存储分段集成到AWS SageMaker中"

在上述示例中、b分 段对象用于实例化PyTorch DataSet对象。数据集对象将在后续章节中进一步说明。

PyTorch数据加载程序

from torch.utils.data import DataLoader
torch.manual_seed(seed)

# 1. Hyperparameters
batch_size = 64

# 2. Preparing for the dataset
dataset = FSxNImageDataset(bucket, 'dataset/tyre', preprocess=preprocess)

train, test = torch.utils.data.random_split(dataset, [1500, 356])

data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

在提供的示例中、指定的批大小为64、表示每个批将包含64条记录。通过将PyTorch *DataT*类、预处理功能和训练批大小相结合,我们可以获得训练所需的数据加载程序。此数据加载程序有助于在训练阶段批量迭代数据集。

模型训练

from torch import nn


class TyreQualityClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3,32,(3,3)),
            nn.ReLU(),
            nn.Conv2d(32,32,(3,3)),
            nn.ReLU(),
            nn.Conv2d(32,64,(3,3)),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64*(224-6)*(224-6),2)
        )
    def forward(self, x):
        return self.model(x)
import datetime

num_epochs = 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = TyreQualityClassifier()
fn_loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


model.to(device)
for epoch in range(num_epochs):
    for idx, (X, y) in enumerate(data_loader):
        X = X.to(device)
        y = y.to(device)

        y_hat = model(X)

        loss = fn_loss(y_hat, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"Current Time: {current_time} - Epoch [{epoch+1}/{num_epochs}]- Batch [{idx + 1}] - Loss: {loss}", end='\r')

本规范实施标准的PyTorch培训流程。它定义了一个名为*TireQualityClassifyer*的神经网络模型,该模型使用卷积层和线性层对轮胎质量进行分类。训练循环会迭代数据批处理、并使用反向传播和优化功能来确定损失、然后更新模型的参数。此外、它还会打印当前时间、时期、批处理和损失、以供监控。

模型部署

部署

import io
import os
import tarfile
import sagemaker

# 1. Save the PyTorch model to memory
buffer_model = io.BytesIO()
traced_model = torch.jit.script(model)
torch.jit.save(traced_model, buffer_model)

# 2. Upload to AWS S3
sagemaker_session = sagemaker.Session()
bucket_name_default = sagemaker_session.default_bucket()
model_name = f'tyre_quality_classifier.pth'

# 2.1. Zip PyTorch model into tar.gz file
buffer_zip = io.BytesIO()
with tarfile.open(fileobj=buffer_zip, mode="w:gz") as tar:
    # Add PyTorch pt file
    file_name = os.path.basename(model_name)
    file_name_with_extension = os.path.split(file_name)[-1]
    tarinfo = tarfile.TarInfo(file_name_with_extension)
    tarinfo.size = len(buffer_model.getbuffer())
    buffer_model.seek(0)
    tar.addfile(tarinfo, buffer_model)

# 2.2. Upload the tar.gz file to S3 bucket
buffer_zip.seek(0)
boto3.resource('s3') \
    .Bucket(bucket_name_default) \
    .Object(f'pytorch/{model_name}.tar.gz') \
    .put(Body=buffer_zip.getvalue())

此代码会将PyTorch模型保存到*Amazon S3*中,因为SageMaker要求将模型存储在S3中进行部署。通过将模型上传到*Amazon S3*,SageMaker便可访问模型,从而可以在已部署的模型上进行部署和引用。

import time
from sagemaker.pytorch import PyTorchModel
from sagemaker.predictor import Predictor
from sagemaker.serializers import IdentitySerializer
from sagemaker.deserializers import JSONDeserializer


class TyreQualitySerializer(IdentitySerializer):
    CONTENT_TYPE = 'application/x-torch'

    def serialize(self, data):
        transformed_image = preprocess(data)
        tensor_image = torch.Tensor(transformed_image)

        serialized_data = io.BytesIO()
        torch.save(tensor_image, serialized_data)
        serialized_data.seek(0)
        serialized_data = serialized_data.read()

        return serialized_data


class TyreQualityPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super().__init__(
            endpoint_name,
            sagemaker_session=sagemaker_session,
            serializer=TyreQualitySerializer(),
            deserializer=JSONDeserializer(),
        )

sagemaker_model = PyTorchModel(
    model_data=f's3://{bucket_name_default}/pytorch/{model_name}.tar.gz',
    role=sagemaker.get_execution_role(),
    framework_version='2.0.1',
    py_version='py310',
    predictor_cls=TyreQualityPredictor,
    entry_point='inference.py',
    source_dir='code',
)

timestamp = int(time.time())
pytorch_endpoint_name = '{}-{}-{}'.format('tyre-quality-classifier', 'pt', timestamp)
sagemaker_predictor = sagemaker_model.deploy(
    initial_instance_count=1,
    instance_type='ml.p3.2xlarge',
    endpoint_name=pytorch_endpoint_name
)

此代码有助于在SageMaker上部署PyTorch模型。它定义了一个自定义的串口器*TireQuality串 口器*,该串口器可将输入数据作为PyTorch张量进行预处理和串口处理。TireQuality谓 词*类是一个自定义的预测程序,它利用定义的串列器和*JSONDeseririter*。该代码还会创建一个*PyTorchModel*对象,用于指定模型的S3位置、IAM角色、框架版本和引用入口点。代码会生成时间戳并根据模型和时间戳构建端点名称。最后、使用Deploy方法部署模型、并指定实例计数、实例类型和生成的端点名称。这样、可以在SageMaker上部署PyTorch模型并可用于进行推入。

参考

image_object = list(bucket.objects.filter('dataset/tyre'))[0].get()
image_bytes = image_object['Body'].read()

with Image.open(with Image.open(BytesIO(image_bytes)) as image:
    predicted_classes = sagemaker_predictor.predict(image)

    print(predicted_classes)

这是使用已部署端点执行此假定的示例。