Kubeflow Part 5: Model Registries—Combining MLflow and Kubeflow

close up of a vintage cash register

We are running a #Kubeflow series where we are sharing our experiences and thoughts on building a Kubeflow-based ML pipeline architecture for production use. This is the fifth post in the series.

When a data scientist moves from experimentation on models to deployment—whether that be by handing it to another team, handing it to other members of their own team, or doing it themselves—the trained models always come with strings attached. Models come with artifacts or other files needed to get the result. Each model has its unique set of inputs and outputs, as well as the steps required to preprocess any input the application is receiving. The datatypes of the inputs need to be taken into account; plus, the application needs to ingest the model’s output correctly.

This is where a model registry comes into play. A model registry is a central repository that allows model developers to publish production-ready models for ease of access. It provides methods to work with other teams and stakeholders, managing models’ lifecycles, and a testing/validation/deployment workflow.

In the last article, we completed the hyperparameter tuning of our anomalous log–detection model. These hyperparameters can be used to train a model, which can be saved in Amazon S3 or similar cloud blob storage services. However, this can make sharing a process—and an uncentralized one at that. Each data scientist or data science team’s models may be in their respective buckets, which means sharing them with other teams may involve permissions not already possessed by them. Each data scientist may have their unique way of working and sharing information and artifacts.

This is the benefit of a model registry’s centralized storage. The model registry provides a central storage unit that holds models and their artifacts for easy retrieval by an application. Without the model registry, the model artifacts would be stored in files that are difficult to track, and saved to whatever source code repository or blob storage location is established. A model registry simplifies this process through model versioning.

With a model registry being something many larger organizations may wish to implement—and something even smaller organizations may want to have ready from an early stage—in this article, we focus on creating a model registry that can be connected with Kubeflow, the platform used for it, how to connect it to Kubeflow and get it working, and the use of the relevant APIs in application code and training and inference pipelines.

MLflow

MLflow is an open source platform to manage the ML lifecycle, including experimentation, reproducibility, deployment, and a central model registry. MLflow currently offers four components: Tracking, Projects, Models, and a Model Registry. The scope of this article is limited to the latter.

The MLflow Model Registry component is a centralized model store, set of APIs, and UI, to collaboratively manage the full lifecycle of an MLflow Model. It provides model lineage (which MLflow experiment and run produced the model), model versioning, stage transitions (for example from staging to production), and annotations.

It is important to note that MLflow is not Kubernetes-native, and does not come pre-packaged with a Kubeflow install. It is an entirely separate platform that will need to be combined with the install of Kubeflow being used, and that process will be part of this article.

The MLflow Model Registry hinges on the following concepts.

  1. Model. An MLflow Model is created from an experiment or a run, and must be logged by one of MLflow’s mlflow.<model_flavor>.log_model() methods. MLflow is not framework-agnostic; a model written and created through PyTorch must use MLflow’s PyTorch-flavor APIs, and one created through scikit-learn must use MLflow’s scikit-learn–flavor APIs. MLflow currently supports 20 different flavors, including those corresponding to the most commonly used ML frameworks.
  2. Registered Model. Once a model has been logged, it can be registered with the Model Registry. A registered model has a unique name and contains versions, associated transitional stages, model lineage, and other metadata. It is associated with an experiment.
  3. Model Version. Each registered model can have one or many versions. When a new model is added to the Model Registry, it is added as version 1. Each new model registered to the same model name increments the version number. Each model version will have its associated artifacts and input/output metadata, which allows data scientists to trace the model lineage.
  4. Model Stage. Each model version can be assigned one stage at a given time. MLflow provides labels for commonly used stages, namely, “Staging,” “Production,” and “Archived.” If a single model is assigned one stage (as “Production”-stage models tend to be), then the application code can simply refer to the model stage instead of model version if desired. Models can be transitioned from one stage to another.
  5. Annotations and Descriptions. The MLflow Registered Model and each model version can be annotated using Markdown. This includes a description, and any information that may be relevant to teams sharing the model—other data science teams or deployment teams—such as algorithm descriptions, datasets employed, or methodology.

These concepts will be demonstrated through an example later in the article; first, however, since MLflow is not directly available with a Kubeflow install, let’s see how MLflow can be installed in the cluster and connected to the Kubeflow UI.

Installing MLflow in a Kubeflow Cluster

The first step is simply deploying MLflow on Kubernetes. This is performed in the same manner as any other deployment. The Dockerfile looks a little like the following.

FROM python:3.7-slim-buster


RUN pip3 install --upgrade pip && \
    pip3 install mlflow==1.20.2 boto3 psycopg2-binary


ENTRYPOINT ["mlflow"," server"]

Code Segment 1. The Dockerfile to Deploy MLflow on K8s

We deployed MLflow on AWS with Amazon S3 as the artifact store. If this does not reflect your deployment setup, ensure the dependencies are accurately installed.

The next steps are creating a deployment.yaml and a service.yaml.

apiVersion: v1
kind: ServiceAccount
metadata:
  name: <sa_name>
  namespace: <sa_namespace>
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: <deployment_name>
  namespace: <deployment_namespace>
  labels:
    app: <deployment_label>
spec:
  replicas: 1
  selector:
    matchLabels:
      app: <deployment_label>
  template:
    metadata:
      labels:
        app: <deployment_label>
    spec:
      serviceAccountName: <sa_name>
      containers:
        - name: <container_name>
          image: <repo_name>/<container_name>:latest
          imagePullPolicy: Always
          command: ["/bin/bash"]
          args:
            [mlflow server --host 0.0.0.0 --default-artifact-root ${MLFLOW_S3_ENDPOINT_URL} --backend-store-uri ${MLFLOW_TRACKING_URI}]
          ports:
            - containerPort: 5000

Code Segment 2. deployment.yaml

Ensure that for the above, environment variables are passed through a config map or some other means.

apiVersion: v1
kind: Service
metadata:
  name: <service_name>
  namespace: <service_namespace>
spec:
  selector:
    app: <deployment_label>
  ports:
    - protocol: TCP
      port: 5000
      targetPort: 5000

Code Segment 3. service.yaml

This will be enough to deploy MLflow in the appropriate cluster. Now, the Python API can be used to register models, create new model versions, and use models from the MLflow model registry by changing the pipeline code. However, a move that will entirely ease the process of using MLflow is adding a tab associated with it to the Kubeflow UI. This will ensure the MLflow UI is accessible through the Kubeflow UI itself.

For this to be achieved, a virtual service needs to be defined. This virtual service must make the MLflow service available via Istio Ingress.

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: <virtualservice_name>
  namespace: <virtualservice_namespace>
spec:
  gateways:
  - kubeflow/kubeflow-gateway
  hosts:
  - '*'
  http:
  - match:
    - uri:
        prefix: /mlflow/
    rewrite:
      uri: /
    route:
    - destination:
        host: mlflow-service.mlflow.svc.cluster.local
        port:
          number: 5000

Code Segment 4. Deploying the Virtual Service

Finally, a small change needs to be made to the central dashboard config map.

kubectl edit cm centraldashboard-config -n kubeflow
# add this under the other menu items
 {
 “type”: “item”,
 “link”: “/mlflow/”,
 “text”: “Mlflow”,
 “icon”: “icons:cached”
 }

Code Segment 5. Editing the Central Dashboard Config Map to Add an MLflow Tab

Restarting the central dashboard deployment will result in the tab being added.

kubectl rollout restart deploy centraldashboard -n kubeflow

Code Segment 6. Restarting the Dashboard Deployment

Changing Pipeline Code to Use the Model Registry

Use of the model registry can be during the training phase, or during the inference phase. In the training phase, the model is logged and registered to the registry. The inference phase reads the model from the registry and uses it in the normal way.

Thus, there are at least two areas in which we need to look at the original code, and the changed code. Let’s look at both.

The Training Pipeline

def model_training(sequence_json: comp.InputPath(), model_art: comp.OutputBinaryFile(bytes), parameters_json: comp.OutputPath()):
    ...
    # Saving the model into local and an artifact.
    checkpoint = {'state_dict': model.state_dict()}
    torch.save(checkpoint, model_art)
    torch.save(checkpoint, '/tmp/checkpoint.pth')


    # Uploading the local file into S3.
    path_bucket = '<bucket_name>'
    path_to_move_file = ''


    boto3.client('s3').upload_file(
        '/tmp/checkpoint.pth',
        path_bucket, 'checkpoint.pth'
    )

Code Segment 7. The Original Training Component Code

When training the model, the way we saved the model before using a model registry was in an S3 bucket. To ensure the model could be used for evaluation in the same pipeline, we also saved it to a Kubeflow output artifact—recall that each Component in a Pipeline is executed in an independent container.

When considering the updates required to the code to utilize the model registry, we also had to consider the options that MLflow provides. During the training process, a model can be logged and then registered with the MLflow Model Registry. In addition to the model itself, the MLflow provides the option to log relevant metrics, hyperparameters, input and output schema, and code files. With this in mind, to leverage all of the options available in MLflow, we registered the model in a separate Component, and made minor changes to the training and validation Components.

In the training Component, the following was added, so as to dump the hyperparameter information into a JSON artifact.

with open(parameters_json, 'w') as f:
    json.dump({'lr': learning_rate, 'batch_size': batch_size, 'epochs': epochs}, f)

Code Segment 8. Added Code to the Training Component

Note that in the function signature, parameters_json was annotated as a kfp.components.OutputPath; kfp.components was imported as comp. This ensures that the hyperparameters used to train our model are available to our new registering Component.

In the model validation Component, a similar JSON artifact is created, this time to log the metric information of the validation, e.g., the accuracy value on the validation set of the trained model. In our Component, this looked like the following.

def model_evaluating(
    sequence_json: comp.InputPath(), model_art: comp.InputBinaryFile(bytes),
    metrics_json: comp.OutputPath(), mlpipeline_ui_metadata_path: comp.OutputPath()
):
    ...
    correct, test_loss = test_loop(valid_dataloader, model, loss_fn)
    with open(metrics_json, 'w') as f:
        json.dump({'accuracy': correct, 'loss': test_loss}, f)

Code Segment 9. The Test Loop in the Validation Component, and the Added Code

Our test loop function returned the accuracy (correct) and loss (test_loss) values for that loop, and we dumped this information into a JSON artifact—note that metrics_json was defined in the function signature as comp.OutputPath.

Along with the model artifact itself, denoted as model_art in code segment 7, this is the information we will take into the model-registration Component.

def register_model(
    sequence_json: comp.InputPath(), model_art: comp.InputBinaryFile(bytes),
    metrics_json: comp.InputPath(), parameters_json: comp.InputPath()
):
    import numpy as np
    import mlflow
    import torch
    import json


    # Reading the preprocessed data from the artifact.
    with open(sequence_json, 'r') as f:
        sequences = json.load(f)


    # Setting up Dataset and DataLoader for torch model.
    train_size = 9000
    X_valid = torch.Tensor([sequence[:-1] for sequence in sequences[train_size:] if sequence[-1] > -1])
    y_valid = torch.Tensor([sequence[-1] for sequence in sequences[train_size:] if sequence[-1] > -1]).long()


    # Model class: single-layer A.N.N.
    class SimpleNN(torch.nn.Module):
        def __init__(self, input_size, num_keys):
            super(SimpleNN, self).__init__()
            self.fc = torch.nn.Linear(input_size, num_keys)


        def forward(self, x):
            out = self.fc(x)
            return out


    with open('/tmp/model_class.py', 'w') as f:
        f.write('''
            class SimpleNN(torch.nn.Module):
                def __init__(self, input_size, num_keys):
                    super(SimpleNN, self).__init__()
                    self.fc = torch.nn.Linear(input_size, num_keys)


                def forward(self, x):
                    out = self.fc(x)
                    return out
        ''')


    checkpoint = torch.load(model_art, map_location=torch.device('cpu'))['state_dict']
    model = SimpleNN(10, 11)
    model.load_state_dict(checkpoint)


    with open(metrics_json) as f:
        metrics = json.load(f)
    with open(parameters_json) as f:
        parameters = json.load(f)
    tracking_uri = 'http://mlflow.kubeflow'
    experiment_name = 'sequence-anomaly-detection-experiment'


    mlflow.tracking.set_tracking_uri(tracking_uri)
    mlflow.set_experiment(experiment_name)
    with mlflow.start_run() as run:
        for metric in metrics:
            mlflow.log_metric(metric, metrics[metric])
        mlflow.log_params(parameters)


        input_size = list(X_valid.size())
        input_size[0] = -1
        input_dtype = np.array(X_valid).dtype
        output_size = list(y_valid.size())
        output_size[0] = -1
        output_dtype = np.array(y_valid).dtype
        input_schema = mlflow.types.schema.Schema([
            mlflow.types.schema.TensorSpec(input_dtype, input_size)
        ])
        output_schema = mlflow.types.schema.Schema([
            mlflow.types.schema.TensorSpec(output_dtype, output_size)
        ])
        signature = mlflow.models.signature.ModelSignature(inputs=input_schema, outputs=output_schema)
        mlflow.pytorch.log_model(
            model,
            'models',
            registered_model_name='sequence-anomaly-detection-model',
            signature=signature,
            code_paths=['/tmp/model_class.py']
        )

Code Segment 10. The Entire Model-Registration Component

Code Segment 10 contains the entirety of the code we used to register our Component. Let us delve into each in sections, so as to explain the process. 

Let’s start from the point where we actually log our model.

mlflow.pytorch.log_model(
    model,
    'models',
    registered_model_name='sequence-anomaly-detection-model',
    signature=signature,
    code_paths=['/tmp/model_class.py']
)

Code Segment 11. Logging the Model

Firstly, note we are using the PyTorch flavor of MLflow’s log_model function. The function only requires the PyTorch object of the model and the artifact path, but we provide some additional information. We wish to consolidate all our models created from this Pipeline into multiple versions of a single MLflow Model, and for this, we provide the registered_model_name under which the versions must be filed.

The Model Signature

Next, we provide the signature for the model. The signature is assigned as follows.

input_size = list(X_valid.size())
input_size[0] = -1
input_dtype = np.array(X_valid).dtype
output_size = list(y_valid.size())
output_size[0] = -1
output_dtype = np.array(y_valid).dtype
input_schema = mlflow.types.schema.Schema([
    mlflow.types.schema.TensorSpec(input_dtype, input_size)
])
output_schema = mlflow.types.schema.Schema([
    mlflow.types.schema.TensorSpec(output_dtype, output_size)
])
signature = mlflow.models.signature.ModelSignature(inputs=input_schema, outputs=output_schema)

Code Segment 12. Defining the Model Signature

In the code above, X_valid is a pandas.DataFrame object that contains the columns of all the input features from the training set to the model; y_valid contains the target feature from the training set. We assign their size() values to input_size and output_size respectively. A preference for us was to set the first element of the size, denoting the number of rows, to –1, so as to state that the number of rows was unimportant. However, this can be preserved as is.

We also get the data types, and simply provide the data types and sizes to an input and output Schema respectively, which are then passed to create an mlflow.models.signature.ModelSignature object.

The Code Paths

The other argument passed to the log_model function is the list of code paths. These are a list of local file system paths to Python file dependencies (or directories containing file dependencies). These files are prepended to the system path when the model is loaded.

In our case, the only code we needed for use of the model was the model definition itself; we only saved the state dictionary. Thus, we wrote the code into the file /tmp/model_class.py, and passed that path into the code_paths argument.

Using the MLflow APIs

Next, let us discuss the entire use of MLflow APIs. As discussed above, MLflow allows us to save information such as the hyperparameters used and the metric values logged. This is done in an MLflow Run, and multiple Runs can be consolidated under one MLflow Experiment.

To connect to the MLflow application, the tracking URI needs to be set. Since we wish to consolidate all potential Runs into a single Experiment, we also set the Experiment name.

tracking_uri = 'http://mlflow.kubeflow'
experiment_name = 'sequence-anomaly-detection-experiment'


mlflow.tracking.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)

Code Segment 13. Setting the Tracking URI and Experiment Name

The Run can then be started as a Python context. All of the code related to the model signature goes into that Context, along with other logging statements relevant to the model.

with mlflow.start_run() as run:
    for metric in metrics:
        mlflow.log_metric(metric, metrics[metric])
    mlflow.log_params(parameters)

Code Segment 14. The Run Context

At the very top of the Run, we log each metric by providing its name and its value. We also log all hyperparameters in dictionary format. This rounds out the model-registration Component in the training Pipeline.

The Inference Pipeline

def model_inferencing(sequence_json: comp.InputPath(), preds_json: comp.OutputPath()):
    ...
    boto3.client('s3').download_file(
        '<bucket_name>', '<model_object>',
        '<model_path>'
    )
    model.load_state_dict(torch.load('<model_path>'))
    model.eval()


    # The inference loop.
    preds = test_loop(valid_dataloader, model)
    with open(preds_json, 'w') as f:
        json.dump(preds, f)

Code Segment 15. The Original Inference Component—Loading the Model

In the original inference Component, since the model was saved into S3, it is also retrieved from that location. It is downloaded and PyTorch APIs are used to load the state dictionary, as normal. The updated code replaces that download with a call to the model registry.

mlflow.tracking.set_tracking_uri('http://mlflow.kubeflow')
model = mlflow.pytorch.load_model("models:/sequence-anomaly-detection-model/1")
model.eval()

Code Segment 16. The Updated Inference Component—Loading the Model

For inference, all we have to provide is the tracking URI and the name of the registered model. If a specific version of the model is to be loaded, it can be referred to in the URI for the model, which is of the format <artifact_path>:/<registered_model_name>/<version>. Alternatively, the stage of the model can be referred to, and the latest version of the model in that stage is loaded. For instance, <artifact_path>:/<registered_model_name>/archived will load the latest archived version.

If neither the version or the stage is provided, MLflow loads the latest version available of the model.

The MLflow Model Registry: UI

The MLflow UI opens on a list of experiments on the sidebar, with the first experiment opened by default.

Figure 1. An MLflow Experiment in the UI

The Experiment screen provides an optional Description field, as well as the list of all Runs registered to that Experiment. It is presented in tabular format: each row contains the Run, how long it took, the name of the run, the user that ran it, the model registered from it, and the metrics and hyperparameters logged during it.

Here is one of the benefits of logging metrics from different Runs. We can select multiple Runs and click on “Compare,” which provides a graph of various metrics plotted against each other, like so.

Figure 2. Comparing the Metrics of Two Runs

MLflow also provides 3D graphs to compare multiple metrics on the same graph.

Another approach to viewing the model registry is from the perspective of the Registered Models, not the Experiments.

Figure 3. All Registered Models

This screen provides the Registered Models in tabular form, providing each Model, its latest Version, latest Staging Version, latest Production Version, last modified time, and any tags associated with it.

The Model Versions are how a model lineage can be traced.

Figure 4. A Model Version

Version 2 of the Model fetal-health-assessment-model is displayed in this screenshot. This was a different use-case, and thus the input columns and information are different. However, two important pieces of information about the model are displayed on this screen: the input and output schema, and the source Run.

The source Run contains the actual information about the logged metrics and hyperparameters. Data scientists and others can look through the source Runs of different Model Versions, seeing the differences in each and making their decisions accordingly.

Conclusion

Model registries provide a centralized system for sharing, collaborating, and managing the lifecycle of an ML model. A registry does this by providing both central storage and APIs to utilize the models registered. Kubeflow has no model registry integrated natively. MLflow provides a robust, open source model registry, as well as other views of the models (through the perspective of Experiments and Runs) that aid data scientists in plotting model lineages.

We can now install MLflow and integrate it into the Kubeflow UI, so despite having to install a separate platform, everything is still in one location. We can also use the registry to register models—along with relevant metrics, hyperparameters, and code—during the training phase, and to load those models in the inference phase. We can compare different models using the metrics and hyperparameters logged.

Next in the series, we will discuss model and data observability, how to perform them in Kubeflow, and how to create dashboards that make it easy to visualize how well your model is performing and how consistent your data is.

Leave a Reply

%d bloggers like this: