第 2 部分 - 利用 AWS Amazon FSx for NetApp ONTAP (FSx ONTAP) 作为 SageMaker 模型训练的数据源
本文是关于使用Amazon FSx for NetApp ONTAP (FSx ONTAP) 在 SageMaker 中训练 PyTorch 模型的教程,具体针对轮胎质量分类项目。
简介
本教程提供了一个计算机视觉分类项目的实际示例,提供了在 SageMaker 环境中利用 FSx ONTAP作为数据源构建 ML 模型的实践经验。该项目专注于使用深度学习框架 PyTorch 根据轮胎图像对轮胎质量进行分类。它强调使用 FSx ONTAP作为 Amazon SageMaker 中的数据源来开发机器学习模型。
什么是 FSx ONTAP
Amazon FSx ONTAP确实是 AWS 提供的完全托管的存储解决方案。它利用 NetApp 的ONTAP文件系统提供可靠且高性能的存储。通过支持 NFS、SMB 和 iSCSI 等协议,它允许从不同的计算实例和容器进行无缝访问。该服务旨在提供卓越的性能,确保快速高效的数据操作。它还具有高可用性和耐用性,确保您的数据保持可访问和受保护。此外, Amazon FSx ONTAP的存储容量是可扩展的,您可以根据需要轻松调整。
前提条件
网络环境
FSx ONTAP (Amazon FSx ONTAP)是一项 AWS 存储服务。它包括在NetApp ONTAP系统上运行的文件系统和连接到它的 AWS 管理的系统虚拟机 (SVM)。在提供的图中,AWS 管理的NetApp ONTAP服务器位于 VPC 外部。 SVM作为SageMaker和NetApp ONTAP系统之间的中介,接收来自SageMaker的操作请求并转发到底层存储。要访问 FSx ONTAP,SageMaker 必须放置在与 FSx ONTAP部署相同的 VPC 内。此配置可确保 SageMaker 和 FSx ONTAP之间的通信和数据访问。
数据访问
在现实场景中,数据科学家通常利用 FSx ONTAP中存储的现有数据来构建他们的机器学习模型。但是,出于演示目的,由于 FSx ONTAP文件系统在创建后最初是空的,因此需要手动上传训练数据。这可以通过将 FSx ONTAP作为卷安装到 SageMaker 来实现。文件系统成功挂载后,您可以将数据集上传到挂载位置,以便在 SageMaker 环境中训练模型。这种方法允许您利用 FSx ONTAP的存储容量和功能,同时与 SageMaker 进行模型开发和训练。
数据读取过程涉及将 FSx ONTAP配置为私有 S3 存储桶。详细配置说明请参考"第 1 部分 - 将Amazon FSx for NetApp ONTAP (FSx ONTAP) 作为私有 S3 存储桶集成到 AWS SageMaker"
集成概述
使用 FSx ONTAP中的训练数据在 SageMaker 中构建深度学习模型的工作流程可以概括为三个主要步骤:数据加载器定义、模型训练和部署。从高层次来看,这些步骤构成了 MLOps 管道的基础。然而,为了全面实施,每个步骤都涉及几个详细的子步骤。这些子步骤涵盖数据预处理、数据集拆分、模型配置、超参数调整、模型评估和模型部署等各种任务。这些步骤确保在 SageMaker 环境中使用来自 FSx ONTAP的训练数据构建和部署深度学习模型的流程全面有效。
逐步集成
数据Loader
为了使用数据训练 PyTorch 深度学习网络,创建了一个数据加载器以方便数据的输入。数据加载器不仅定义批次大小,还确定读取和预处理批次中每个记录的过程。通过配置数据加载器,我们可以批量处理数据,从而实现深度学习网络的训练。
数据加载器由3部分组成。
预处理函数
from torchvision import transforms
preprocess = transforms.Compose([
transforms.ToTensor(),
transforms.Resize((224,224)),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
上面的代码片段演示了使用 torchvision.transforms 模块定义图像预处理转换。在本教程中,创建预处理对象来应用一系列转换。首先,ToTensor() 转换将图像转换为张量表示。随后,Resize224,224 转换将图像调整为固定大小 224x224 像素。最后,Normalize() 转换通过减去平均值并除以每个通道的标准差来对张量值进行归一化。用于标准化的平均值和标准差值通常用于预训练的神经网络模型。总的来说,这段代码通过将图像数据转换为张量、调整其大小以及规范化像素值来准备进一步处理或输入到预先训练的模型中。
PyTorch 数据集类
import torch
from io import BytesIO
from PIL import Image
class FSxNImageDataset(torch.utils.data.Dataset):
def __init__(self, bucket, prefix='', preprocess=None):
self.image_keys = [
s3_obj.key
for s3_obj in list(bucket.objects.filter(Prefix=prefix).all())
]
self.preprocess = preprocess
def __len__(self):
return len(self.image_keys)
def __getitem__(self, index):
key = self.image_keys[index]
response = bucket.Object(key)
label = 1 if key[13:].startswith('defective') else 0
image_bytes = response.get()['Body'].read()
image = Image.open(BytesIO(image_bytes))
if image.mode == 'L':
image = image.convert('RGB')
if self.preprocess is not None:
image = self.preprocess(image)
return image, label
该类提供获取数据集中记录总数的功能,并定义读取每条记录数据的方法。在 getitem 函数中,代码利用 boto3 S3 bucket 对象从 FSx ONTAP检索二进制数据。从 FSx ONTAP访问数据的代码样式类似于从 Amazon S3 读取数据。后续讲解深入探讨私有 S3 对象 bucket 的创建过程。
FSx ONTAP作为私有 S3 存储库
seed = 77 # Random seed
bucket_name = '<Your ONTAP bucket name>' # The bucket name in ONTAP
aws_access_key_id = '<Your ONTAP bucket key id>' # Please get this credential from ONTAP
aws_secret_access_key = '<Your ONTAP bucket access key>' # Please get this credential from ONTAP
fsx_endpoint_ip = '<Your FSx ONTAP IP address>' # Please get this IP address from FSXN
import boto3
# Get session info
region_name = boto3.session.Session().region_name
# Initialize Fsxn S3 bucket object
# --- Start integrating SageMaker with FSXN ---
# This is the only code change we need to incorporate SageMaker with FSXN
s3_client: boto3.client = boto3.resource(
's3',
region_name=region_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
use_ssl=False,
endpoint_url=f'http://{fsx_endpoint_ip}',
config=boto3.session.Config(
signature_version='s3v4',
s3={'addressing_style': 'path'}
)
)
# s3_client = boto3.resource('s3')
bucket = s3_client.Bucket(bucket_name)
# --- End integrating SageMaker with FSXN ---
为了从 SageMaker 中的 FSx ONTAP读取数据,需要创建一个使用 S3 协议指向 FSx ONTAP存储的处理程序。这使得 FSx ONTAP可以被视为私有 S3 存储桶。处理程序配置包括指定 FSx ONTAP SVM 的 IP 地址、存储桶名称和必要的凭据。有关获取这些配置项的详细说明,请参阅以下文档:"第 1 部分 - 将Amazon FSx for NetApp ONTAP (FSx ONTAP) 作为私有 S3 存储桶集成到 AWS SageMaker" 。
在上面的例子中,bucket对象用于实例化PyTorch数据集对象。数据集对象将在后续章节中进一步解释。
PyTorch 数据Loader
from torch.utils.data import DataLoader
torch.manual_seed(seed)
# 1. Hyperparameters
batch_size = 64
# 2. Preparing for the dataset
dataset = FSxNImageDataset(bucket, 'dataset/tyre', preprocess=preprocess)
train, test = torch.utils.data.random_split(dataset, [1500, 356])
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
在提供的示例中,指定批次大小为 64,表示每个批次将包含 64 条记录。通过结合 PyTorch Dataset 类、预处理函数和训练批次大小,我们获得了用于训练的数据加载器。该数据加载器有助于在训练阶段分批迭代数据集的过程。
模型训练
from torch import nn
class TyreQualityClassifier(nn.Module):
def __init__(self):
super().__init__()
self.model = nn.Sequential(
nn.Conv2d(3,32,(3,3)),
nn.ReLU(),
nn.Conv2d(32,32,(3,3)),
nn.ReLU(),
nn.Conv2d(32,64,(3,3)),
nn.ReLU(),
nn.Flatten(),
nn.Linear(64*(224-6)*(224-6),2)
)
def forward(self, x):
return self.model(x)
import datetime
num_epochs = 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TyreQualityClassifier()
fn_loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
model.to(device)
for epoch in range(num_epochs):
for idx, (X, y) in enumerate(data_loader):
X = X.to(device)
y = y.to(device)
y_hat = model(X)
loss = fn_loss(y_hat, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Current Time: {current_time} - Epoch [{epoch+1}/{num_epochs}]- Batch [{idx + 1}] - Loss: {loss}", end='\r')
此代码实现了标准的 PyTorch 训练流程。它定义了一个名为*TyreQualityClassifier*的神经网络模型,使用卷积层和线性层来对轮胎质量进行分类。训练循环迭代数据批次,计算损失,并使用反向传播和优化更新模型的参数。此外,它还打印当前时间、纪元、批次和损失以供监控目的。
模型部署
部署
import io
import os
import tarfile
import sagemaker
# 1. Save the PyTorch model to memory
buffer_model = io.BytesIO()
traced_model = torch.jit.script(model)
torch.jit.save(traced_model, buffer_model)
# 2. Upload to AWS S3
sagemaker_session = sagemaker.Session()
bucket_name_default = sagemaker_session.default_bucket()
model_name = f'tyre_quality_classifier.pth'
# 2.1. Zip PyTorch model into tar.gz file
buffer_zip = io.BytesIO()
with tarfile.open(fileobj=buffer_zip, mode="w:gz") as tar:
# Add PyTorch pt file
file_name = os.path.basename(model_name)
file_name_with_extension = os.path.split(file_name)[-1]
tarinfo = tarfile.TarInfo(file_name_with_extension)
tarinfo.size = len(buffer_model.getbuffer())
buffer_model.seek(0)
tar.addfile(tarinfo, buffer_model)
# 2.2. Upload the tar.gz file to S3 bucket
buffer_zip.seek(0)
boto3.resource('s3') \
.Bucket(bucket_name_default) \
.Object(f'pytorch/{model_name}.tar.gz') \
.put(Body=buffer_zip.getvalue())
代码将 PyTorch 模型保存到 Amazon S3,因为 SageMaker 要求将模型存储在 S3 中以便部署。通过将模型上传到 Amazon S3,SageMaker 就可以访问它,从而允许对已部署的模型进行部署和推理。
import time
from sagemaker.pytorch import PyTorchModel
from sagemaker.predictor import Predictor
from sagemaker.serializers import IdentitySerializer
from sagemaker.deserializers import JSONDeserializer
class TyreQualitySerializer(IdentitySerializer):
CONTENT_TYPE = 'application/x-torch'
def serialize(self, data):
transformed_image = preprocess(data)
tensor_image = torch.Tensor(transformed_image)
serialized_data = io.BytesIO()
torch.save(tensor_image, serialized_data)
serialized_data.seek(0)
serialized_data = serialized_data.read()
return serialized_data
class TyreQualityPredictor(Predictor):
def __init__(self, endpoint_name, sagemaker_session):
super().__init__(
endpoint_name,
sagemaker_session=sagemaker_session,
serializer=TyreQualitySerializer(),
deserializer=JSONDeserializer(),
)
sagemaker_model = PyTorchModel(
model_data=f's3://{bucket_name_default}/pytorch/{model_name}.tar.gz',
role=sagemaker.get_execution_role(),
framework_version='2.0.1',
py_version='py310',
predictor_cls=TyreQualityPredictor,
entry_point='inference.py',
source_dir='code',
)
timestamp = int(time.time())
pytorch_endpoint_name = '{}-{}-{}'.format('tyre-quality-classifier', 'pt', timestamp)
sagemaker_predictor = sagemaker_model.deploy(
initial_instance_count=1,
instance_type='ml.p3.2xlarge',
endpoint_name=pytorch_endpoint_name
)
此代码有助于在 SageMaker 上部署 PyTorch 模型。它定义了一个自定义序列化器 TyreQualitySerializer,它将输入数据预处理并序列化为 PyTorch 张量。 TyreQualityPredictor 类是一个自定义预测器,它利用定义的序列化器和 JSONDeserializer。该代码还创建了一个 PyTorchModel 对象来指定模型的 S3 位置、IAM 角色、框架版本和推理的入口点。代码生成时间戳并根据模型和时间戳构建端点名称。最后,使用 deploy 方法部署模型,指定实例数量、实例类型和生成的端点名称。这使得 PyTorch 模型可以在 SageMaker 上部署并进行推理。
推理
image_object = list(bucket.objects.filter('dataset/tyre'))[0].get()
image_bytes = image_object['Body'].read()
with Image.open(with Image.open(BytesIO(image_bytes)) as image:
predicted_classes = sagemaker_predictor.predict(image)
print(predicted_classes)
这是使用已部署端点进行推理的示例。