Como ya hemos descrito en post anteriores, las buenas prácticas de MLOps permiten la automatización de las operaciones de entrenamiento, despliegue y monitorización que forman parte del ciclo de vida de los modelos. A la hora de poder utilizarlas es común tener que adaptarse a las peculiaridades de cada framework, como por ejemplo Sagemaker. Aunque simplifican en gran medida el proceso de desarrollo y despliegue de aplicaciones basadas en Machine Learning (ML), no obstante, a veces puede suponer una limitación por la falta de flexibilidad que eso implica. Para poder construir un modelo basado en ML se suelen definir una serie de operaciones básicas, cada una de las cuales se suele encapsular en contenedores independientes que ejecutan cada una de las operaciones básicas, como se puede observar en la siguiente figura.

Sagemaker Pipelines 1

A lo largo de este post veremos cómo resolver este tipo de problemas para poder ejecutar cualquier modelo de ML, en Sagemaker Pipelines, de la manera más sencilla y flexible posible.

Limitaciones de Sagemaker Pipelines

AWS Sagemaker ofrece una librería con herramientas para la construcción y despliegue de modelos y que estos puedan ejecutarse de manera eficiente en su cloud. Además, Sagemaker Pipelines incluye un SDK para la definición de las canalizaciones de ML que permite la construcción de operaciones del ciclo de vida de los modelos mediante la utilización de imágenes preconstruidas de Sagemaker (Sklearn, Tensorflow, Spark...), las cuales ya tienen instaladas todas las dependencias necesarias para la utilización de los modelos. No obstante, es muy común que los proyectos basados en ML no utilicen únicamente modelos que utilizan las tecnologías disponibles en el repositorio de imágenes de Sagemaker, incrementando la complejidad de desplegar modelos ya existentes o que usen tecnologías no estandarizadas en Sagemaker. Esto puede ser aún mucho más complejo cuando se utilizan diferentes lenguajes en el desarrollo de modelos de ML y que no están soportados por Sagemaker (R, Julia, ...).

Para poder “resolver” los dos problemas anteriores de manera simultánea, Sagemaker Pipelines nos ofrece la posibilidad de crear nuestras imágenes personalizadas con todas las dependencias necesarias, así como el código del modelo. De esta manera, las imágenes que ejecutan las diferentes operaciones serán autocontenidas, por lo que no requerirán que Sagemaker Pipeline gestione las dependencias. Además, cada proceso de la canalización consistirá en la ejecución de la imagen Docker, haciendo todos los procesos de la canalización independientes al lenguaje y al conjunto de modelos disponibles en el SDK de Sagemaker.

Construye tu propia imagen

El hecho de construir imágenes Docker autocontenidas, que incluyan todas las dependencias necesarias y el código correspondiente a cada una de las operaciones de la canalización, permite que estas imágenes sean migrables a cualquier otro entorno, ya que no dependen de Sagemaker, puesto que el framework es la herramienta para orquestar y poder ejecutarlas. Es decir, la utilización de contenedores independientes hace que las operaciones de la canalización ya no dependan específicamente del framework.

Sagemaker Pipelines 2

Para describir como construir un proceso MLOps sencillo mediante la utilización de contenedores vamos a crear una canalización como la que se muestra en la imagen anterior que estará formada por tres operaciones: (1) entrenamiento, (2) creación del modelo; e (3) inferencia.

Paso 1 - Definición de la imagen

Para que Sagemaker Pipelines sea capaz de ejecutar correctamente la imagen en cada una de las operaciones de la canalización, es necesario que se definan una serie de comandos del sistema. De este modo, cada proceso de la canalización ejecutará la imagen con el comando y variables de entorno correspondientes a su ejecución. Los comandos necesarios son:

Además de estos dos comandos, para canalizaciones más complejas, se pueden definir comandos customizados para cada proceso (por ejemplo “preprocess” o “explain”). Por ejemplo, para un proyecto genérico de ML en Python, con la siguiente estructura:

   ├── ml_project
   ├── serve/
   │     ├── main.py
   │     └── config.json
   ├── train/
   │     ├── main.py
   │     └── config.json
   ├── Dockerfile
   └── requirements.txt

donde el fichero Dockerfile se podría definir con la siguiente configuración:

FROM python:3.8

# Install al the required dependencies
COPY ./requirements.txt /home
WORKDIR /home
RUN  pip install -r requirements.txt

# Copy the folder path into the container
COPY ./ /home

# Create commands
RUN echo "#!/bin/bash\n/usr/local/bin/python -u /home/train/main.py" > /usr/bin/train
RUN echo "#!/bin/bash\n/usr/local/bin/python -u /home/serve/main.py" > /usr/bin/serve

RUN chmod +x /usr/bin/train
RUN chmod +x /usr/bin/serve

Cómo observamos en el Dockerfile, primero se instalan todas las dependencias en la imagen, luego se copia todo el código del proyecto y finalmente se crean los comandos de entrenamiento (train) e inferencia (serve). Estos comandos se definen como el resultado de la ejecución de un script “train/main.py” y “serve/main.py” respectivamente.

Paso 2 - Construcción del script de entrenamiento

Para poder construir un proceso de entrenamiento personalizado dentro de una canalización de Sagemaker Pipelines, es necesario crear un conjunto de scripts de código que ejecutarán el proceso de entrenamiento dentro de la imagen. Este conjunto de scripts debería ser desarrollado mediante la utilización de una estructura específica y estar siempre coordinados a partir de un script maestro denominado main.py que será el script principal de ejecución. A continuación, se presenta un ejemplo del código de un proceso de entrenamiento mediante un script denominado main.py.

import os
import json
import joblib
import argparse

import pandas as pd
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier

if __name__=="__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument("--target", type=str, default="Species")
    parser.add_argument("--n-estimators", type=int, default=10)
    parser.add_argument("--min-samples-leaf", type=int, default=3)

    parser.add_argument("--train-dir", type=str, default='/opt/ml/input/data/train')
    parser.add_argument("--test-dir", type=str, default='/opt/ml/input/data/test')
    parser.add_argument("--model-dir", type=str, default='/opt/ml/model')

    args, _ = parser.parse_known_args()

    # Leemos los datos
    df_train = pd.read_csv(os.path.join(args.train_dir, 'train.csv'))
    X_train = df_train.loc[:, df_train.columns != args.target].values
    y_train = df_train.loc[:, df_train.columns == args.target].values.ravel()

    # Entrenamiento
    model = RandomForestClassifier(
        n_estimators=args.n_estimators,
        min_samples_leaf=args.min_samples_leaf
    ).fit(X_train, y_train)

    # Testeo
    df_test = pd.read_csv(os.path.join(args.test_dir, 'test.csv'))
    X_test = df_test.loc[:, df_test.columns != args.target].values
    y_test = df_test.loc[:, df_test.columns == args.target].values.ravel()
    print("Accuracy=" + str(accuracy_score(y_test, model.predict(X_test))))

    # Guardamos el modelo
    path = os.path.join(args.model_dir, "model.joblib")
    joblib.dump(model, path)

Como vemos, los parámetros del modelo se especifican por medio de variables del parser del script y los paths de lectura de datos y almacenamiento del modelo apuntan a los paths donde Sagemaker monta los directorios de lectura y escritura respectivamente.

Paso 3 - Construcción del script de inferencia

Para poder ejecutar el proceso personalizado de inferencia de nuestro modelo es necesario también construir el código de inferencia. Para que pueda ser ejecutable por Sagemaker en sus procesos de inferencia, independientemente de que sea una predicción batch u online, es necesario construir una API con las siguientes características:

A continuación, se presenta un ejemplo del script que desplegará el API para el proceso de inferencia. Este script, al igual, que en el caso de entrenamiento, ha sido denominado main.py:

import csv
import pickle
from io import StringIO

import flask
import joblib
import pandas as pd

# cargamos modelo
model = joblib.load('/opt/ml/model/model.joblib')

app = flask.Flask(__name__)

@app.route("/ping", methods=["GET"])
def ping():
    return flask.Response(response="\n", status=200,
        mimetype="application/json")

@app.route("/invocations", methods=["POST"])
def predict():

    # Convert from CSV to pandas
    if flask.request.content_type == "text/csv":
        data = flask.request.data.decode("utf-8")
        s = StringIO(data)
        X = pd.read_csv(s).values[:,:-1]

        # Do the prediction
        predictions = pd.DataFrame(
            model.predict_proba(X), columns=model.classes_)

        # Convert from numpy back to CSV
        out = StringIO()
        predictions.to_csv(out, header=False, index=False)
        result = out.getvalue()

        return flask.Response(
            response=result,
            status=200,
            mimetype="text/csv")

app.run(host="0.0.0.0", port=8080, debug=True)

Paso 4 - Construcción de la canalización

Del mismo modo que con las imágenes predefinidas de Sagemaker es necesario utilizar el objeto de tipo Estimator que referencia a la imagen que hemos construido. La diferencia es que en vez de tener que usar un Estimator específico de los que ofrece el framework (Sklearn, Tensorflow, Pytorch), tenemos que usar el Estimator básico. Además, como argumento, en vez de pasarle la ruta al script que queremos que ejecute, tenemos que pasar como argumento la URI del contenedor que queremos utilizar, que es el que ejecuta nuestro código de entrenamiento bajo el comando train. A continuación, se presenta un fragmento de código del proceso de creación de la Pipeline utilizando las diferentes imágenes que hemos creado previamente:

from sagemaker.estimator import Estimator
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput

account = sesion.boto_session.client("sts").get_caller_identity()["Account"]
image = "{}.dkr.ecr.{}.amazonaws.com/sagemaker-decision-trees:latest".format(account, region)
estimator = Estimator(
    image_uri=image,
    role=role,
    instance_count=1,
    instance_type="ml.c4.2xlarge",
    output_path="s3://{}/output".format(bucket),
    sagemaker_session=sesion,
)

step_train = TrainingStep(
    name="TrainSklearnModel",
    estimator=estimator,
    inputs={
        "train": TrainingInput(s3_data=TRAIN_DATA_URI, content_type="text/csv"),
        "test": TrainingInput(s3_data=TEST_DATA_URI, content_type="text/csv"),
    },
)

Para que el modelo generado por el entrenamiento se pueda usar en la inferencia, es necesario añadir en el Pipeline un paso de creación de modelo. Para que tenga todas las dependencias, además, hay que pasarle al modelo de nuevo la URI. Esto permitirá que el Step de inferencia cargue de manera correcta el contenedor de la inferencia.

from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.step_collections import CreateModelStep

model=Model(
    image_uri=image,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    sagemaker_session=sesion
)

step_create_model = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs = CreateModelInput(instance_type="ml.m5.large")
)

Para poder crear un proceso de inferencia es necesario crear un objeto de tipo Transformer de tipo genérico, al igual que en el caso del entrenamiento A la operación (Step) de inferencia hay que pasarle el modelo generado anteriormente. Para ello usamos las propiedades del objeto CreateModelStep mediante la función step_create_model. para obtener la información del modelo.

from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
from sagemaker.transformer import Transformer

output_data="s3://sagemaker-eu-west-1-827345860551/curso_sagemaker/output"

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    accept="text/csv",
    instance_count=1,
    output_path=output_data
)

step_transform = TransformStep(
    name="TransformStep", 
    transformer=transformer,
    inputs=TransformInput(data=s3_path, content_type="text/csv")
)

Paso 5 - Ejecución de la canalización

Una vez que tenemos definidas todas las operaciones (Steps) de la canalización, tenemos que instanciar el objeto Pipeline y ejecutarlo.

import json
from pprint import pprint
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="SklearnIrisPipeline",
    steps=[
        step_train,
        step_create_model,
        step_transform
    ],
)

pipeline.upsert(role_arn=role)
definition = json.loads(pipeline.definition())

 execution = pipeline.start()

Conclusiones

En este post hemos visto cómo poder crear y ejecutar un Pipeline de Sagemaker sin necesidad de usar sus propias imágenes, creando una imagen con nuestro propio modelo dockerizado. Esto nos permite crear imágenes para la productización de nuestros modelos completamente agnósticos a las dependencias o el SDK de Sagemaker.

Además, podemos emplear Sagemaker como herramienta de productización y orquestación de modelos para cualquier tipo de proyecto, independientemente del framework de ML que se utilice o del lenguaje de programación. Es decir, podemos desplegar mediante SageMaker Pipelines nuestros proyectos de ML de manera sencilla sin realizar un gran número de cambios en el código fuente de los diferentes procesos (pre-procesamiento, entrenamiento, inferencia, etc.) debido a que podemos construir contenedores independientes de los procesos estándar de Sagemaker.

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete