We are running a #Kubeflow series where we are sharing our experiences and thoughts on building a Kubeflow-based ML pipeline architecture for production use. This is the first post in the series.
The first step in the machine learning process is to perform the experimentation, both on the data and on the model. The next step is usually writing code to actually use the model, whether it be for training or inference. The former is popularly performed on Jupyter Notebooks and similar IDEs. Kubeflow provides a way to use these Notebooks themselves to create Pipelines in such a way that each step in the Pipeline runs in its own Docker container. In this article, we will go through the default method of creating Pipelines: the JupyterLab server.
JupyterLab
JupyterLab is a web-based IDE for Jupyter Notebooks, code, and data. Kubeflow comes out of the box with support for a JupyterLab server. The creation process for a server is very simple. From the Kubeflow console, go to the “Notebooks” screen.
In the Kubeflow console, “Notebook server” is referred to as “Notebook.” Click on the “Create Notebook” button to open the wizard screen for the creation process.

After giving a custom name to the Notebook server, an image for it can be selected. This image determines the kind of server installed and ready for use. Three default images are available from the UI: JupyterLab, CodeServer, and R Studio. A custom image can also be selected, if one is available in an image repository. The documentation for creating a custom image can be found here.
That’s all that is required for basic set-up. Advanced options allow us to customize the Notebook server to the exact data science requirements, as well as tightly control the amount of resources that can be used by it.

The advanced customizations include:
- the amount of compute power, in terms of CPUs, and amount of RAM;
- the amount of storage in the base volume and mounting additional volumes;
- optional configurations for PodDefault resources in the profile namespace;
- the number of GPU devices, if required;
- affinity and tolerations;
- and enabling shared memory—some libraries, like PyTorch, use shared memory for multiprocessing.
While Kubeflow currently does not have an implementation to activate shared memory, they get around it by mounting an empty directory volume at /dev/shm
.
Once the “Launch” button is clicked, Kubeflow will reroute to the Notebooks page once more, with a new entry for the newly created Notebook server. Once the Notebook server is ready, clicking “Connect” will open the web interface exposed by the server.
Creating a Kubeflow Pipeline through JupyterLab
ML orchestration in Kubeflow is done through Pipelines. A Pipeline is a multi-step workflow in which each step is run in its own container—with a special way for data and objects to flow between them. This allows containers to be reused over different Pipelines with extra code written.
For this article, we explore a text use-case. Each element of text in the dataset is a log statement, gathered by us for the purposes of this use-case. The problem statement is to find out which lines are anomalous based on the order in which the log lines appear.
The way to solve this problem is complex and involves large models and long periods of training. For the purposes of displaying the capabilities of Kubeflow, we will use a simple artificial neural network (ANN) in our Pipeline. This model will not be quite as accurate as the real solution, but it will display what Kubeflow can do sufficiently well.
To create a Kubeflow Pipeline, we need to define a set of steps, known as “Components,” through the SDK. Each Component will be run in its own container, and Kubeflow handles the transfer of data from one Component to the next.
Each Component in a Pipeline executes independently. The Components do not run in the same process and cannot directly share in-memory data. All the data pieces that passed between the Components must be serialized (to strings or files) so that the data can travel over the distributed network. They can then be deserialized in the next Component for use.Components can be defined in two ways: packaging code as a Docker image; or using the Kubeflow Pipelines SDK to convert functions into Components. In this article, we use the latter methodology, but both methods can be found on the Kubeflow documentation.
In this article, we assume that there is some code, including downloading data, preprocessing it, using a model to train on it, saving the model, etc., that has already been written into functions. We will discuss how to convert each into a Component.
Modifying Regular Python Function Code
def unzip_data(bucket_zipfile_path: str):
# Download a ZIP file from S3.
path_bucket = '<bucket_name>'
path_to_move_file = ''
os.makedirs('./data', exist_ok=True)
os.makedirs('./unzipped_data', exist_ok=True)
boto3.resource('s3').Object(path_bucket, bucket_zipfile_path).download_file(Filename='./data/zipfile.zip')
for zip in os.listdir('./data'):
with zipfile.ZipFile(os.path.join('./data', zip), 'r') as file:
file.extractall('./unzipped_data')
# Extract all files out of the ZIP file and write them back to S3.
s3_client = boto3.client('s3')
for file in os.listdir('./unzipped_data'):
output_path = path_to_move_file + file
s3_client.upload_file(
os.path.join('./unzipped_data', file),
path_bucket, output_path
)
return output_path
Code Segment 1. A Regular Python Function to Download and Unzip Data
The above image is of a simple Python function that downloads a particular file and unzips all of it into the folder ./unzipped_data
. In this case, we also write the unzipped data back to S3, but that may not be required based on your data storage principles. In this function, we return output_path
, which is defined and modified inside a for loop. This only works because there is only one file in the data we are using. This can be modified however required.
Let’s modify this function to be used with Kubeflow.
def unzip_data(bucket_zipfile_path: str, output_str: comp.OutputPath(str)):
# Imports required for the Pipeline Component.
from io import BytesIO
import zipfile
import boto3
import os
# Download a ZIP file from S3.
path_bucket = '<bucket_name>'
path_to_move_file = ''
os.makedirs('./data', exist_ok=True)
os.makedirs('./unzipped_data', exist_ok=True)
boto3.resource('s3').Object(path_bucket, bucket_zipfile_path).download_file(Filename='./data/zipfile.zip')
for zip in os.listdir('./data'):
with zipfile.ZipFile(os.path.join('./data', zip), 'r') as file:
file.extractall('./unzipped_data')
# Extract all files out of the ZIP file and write them back to S3.
s3_client = boto3.client('s3')
for file in os.listdir('./unzipped_data'):
output_path = path_to_move_file + file
s3_client.upload_file(
os.path.join('./unzipped_data', file),
path_bucket, output_path
)
# Write the path of the required file into an artifact.
with open(output_str, 'w') as writer:
writer.write(output_path)
Code Segment 2. Python Function Modified to be a Kubeflow Component
Let’s start at the function definition. Notice that each argument is now annotated. Our bucket_zipfile_path
is still a string, but notice that there is a new argument, output_str
, that is annotated as a kfp.components.OutputPath(str)
—note that, in this code segment, kfp.components
was imported using the alias comp
: import kfp.components as comp
. We will discuss this argument at the end of the function.
Another difference is that we now import all Python packages required for the code in the function at the top of the function itself. This is because this code is going to be containerized on its own, and thus this is the only way to include import
statements.
Kubeflow Artifacts
The last change is the modification of the return
statement. Since each Component is containerized on its own, there is no use returning any objects out of a function. Instead, Kubeflow uses the concept of Artifacts.
When Kubeflow Pipelines runs a Component, a container image is started in a Kubernetes Pod and the component’s inputs are passed in as command-line arguments. When the component has finished, its outputs are returned as files. These input arguments and output files are known as artifacts.
Thus, the next step in converting our Python functions into Kubeflow Pipelines–compatible code is modifying the way we send outputs from the function.
In the modified code, we open a file and write into it the string of the value we wanted to send to the next Component. Since we want this value to be transferred to the next Component, we need to inform Kubeflow Pipelines of this fact—this is why we use the kfp
annotation. This informs Kubeflow that this data needs to be created as an Artifact, i.e., data that is transmitted between Components of a Pipeline. The Kubeflow documentation for Python function–based Components contains all the annotations that can be used.
Once all functions have been converted in this manner, we can then convert each function into Components using the kfp
SDK.
Creating Components
base_img = "<repo>/<image>:latest" # The base container image to be used by pods running the Components.
# Create components from the functions above.
unzip_data_op = kfp.components.create_component_from_func(unzip_data, base_image=base_img)
read_data_op = kfp.components.create_component_from_func(read_data, base_image=base_img)
preprocess_data_op = kfp.components.create_component_from_func(preprocess_data, base_image=base_img)
model_training_op = kfp.components.create_component_from_func(
model_training, base_image=base_img, packages_to_install=['boto3']
)
model_evaluating_op = kfp.components.create_component_from_func(
model_evaluating, base_image=base_img, packages_to_install=['scikit-learn', 'mlflow', 'wandb']
)
register_model_op = kfp.components.create_component_from_func(
register_model, base_image=base_img,
packages_to_install=['mlflow', 'boto3', 'kfp']
)
Code Segment 3. Converting Functions to Components
The Kubeflow Pipelines (KFP) SDK provides a simple kfp.components.create_component_from_func
function that converts functions into Components ready to be containerized. All we need to provide is the function name and the base image for the Docker image to take. This image can contain all the packages that need for that function to run, but in case it doesn’t, these packages to be installed using pip
can be provided in a list in the packages_to_install
argument.
Code Segment 3 contains the code required to convert a function into a Component when no GPU is available. When one is available, there is a slight modification.
base_img = "<repo>/<image>:latest" # The base container image to be used by pods running the Components.
# Create components from the functions above.
unzip_data_op = kfp.components.create_component_from_func(unzip_data, base_image=base_img)
read_data_op = kfp.components.create_component_from_func(read_data, base_image=base_img)
preprocess_data_op = kfp.components.create_component_from_func(preprocess_data, base_image=base_img)
model_training_op = kfp.components.create_component_from_func(
model_training, base_image='<repo>/<gpu-image>',
packages_to_install=['boto3']
)
model_evaluating_op = kfp.components.create_component_from_func(
model_evaluating, base_image=base_img, packages_to_install=['scikit-learn', 'mlflow', 'wandb']
)
register_model_op = kfp.components.create_component_from_func(
register_model, base_image='<repo>/<gpu-image>',
packages_to_install=['mlflow', 'boto3']
)
Code Segment 4. Converting Functions to Components when GPUs are Available
In Code Segment 4, we have two components that run on a GPU, and one that runs on a CPU. The image <repo>/<gpu-image>
consists of the GPU versions of packages like PyTorch along with the drivers required to run them on GPUs. Kubeflow allows data scientists to run Pipelines more efficiently in this way, having only the required steps running using the GPU.
The last step is to stitch the Components into a single Pipeline.
Defining the Pipeline
# Create the pipeline from the components created above.
@kfp.dsl.pipeline(
name='single-layer-ann-training-pipeline',
description='Trains a single-layer A.N.N. to find anomalies in string sequences'
)
def unzip_and_read_pipeline(
bucket_zipfile_path: str, bucket_name: str,
sep: str, decimal: str, encoding: str
):
first_task = unzip_data_op(bucket_zipfile_path)
second_task = read_data_op(bucket_name, first_task.outputs['output_str'], sep, decimal, encoding)
third_task = preprocess_data_op(second_task.outputs['output_csv'])
fourth_task = model_training_op(third_task.outputs['sequence_json'])
fifth_task = model_evaluating_op(third_task.outputs['sequence_json'], fourth_task.outputs['model_art'])
sixth_task = register_model_op(
third_task.outputs['sequence_json'], fourth_task.outputs['model_art'],
fifth_task.outputs['metrics_json'], fourth_task.outputs['parameters_json']
)
Code Segment 5. Defining a Pipeline in Terms of Components
Using the kfp.dsl.pipeline
decorator (and providing a name and optional description), we inform the SDK that this function defines a Pipeline. We can provide all the arguments required from the outside of the program as arguments of the function.
Each op is called along with all its input arguments, and its value stored into a variable. The outputs of an op can be accessed through the outputs
attribute of its variable, and hashed like a dictionary
.
In case GPUs are required and available, the op can also be strung along with the set_gpu_limit
function, which states the number of GPUs accessible to the container (default zero). If we do set that, we also add the node selector constraint, which will depend on the backend cloud configuration, as seen in Code Segment 6.
# Create the pipeline from the components created above.
@kfp.dsl.pipeline(
name='single-layer-ann-training-pipeline',
description='Trains a single-layer A.N.N. to find anomalies in string sequences'
)
def unzip_and_read_pipeline(
bucket_zipfile_path: str, bucket_name: str,
sep: str, decimal: str, encoding: str
):
first_task = unzip_data_op(bucket_zipfile_path)
second_task = read_data_op(bucket_name, first_task.outputs['output_str'], sep, decimal, encoding)
third_task = preprocess_data_op(second_task.outputs['output_csv'])
fourth_task = model_training_op(third_task.outputs['sequence_json']).set_gpu_limit(1)
fourth_task.add_node_selector_constraint('eks.amazonaws.com/nodegroup', 'gpu')
fifth_task = model_evaluating_op(third_task.outputs['sequence_json'], fourth_task.outputs['model_art'])
sixth_task = register_model_op(
third_task.outputs['sequence_json'], fourth_task.outputs['model_art'],
fifth_task.outputs['metrics_json'], fourth_task.outputs['parameters_json']
).set_gpu_limit(1)
sixth_task.add_node_selector_constraint('eks.amazonaws.com/nodegroup', 'gpu')
Code Segment 6. Defining a Pipeline in Terms of Components when GPUs are Available
The Kubeflow Pipelines CLI provides a command that converts a Python file into a Kubeflow YAML file. An easy way of creating a Python file out of all the code in our notebook, we can place all of what we have discussed above into a single cell and insert the following line at the top of that cell, and then run it.
%%writefile ./single_layer_ann_training_pipeline.py
# The above line just writes this cell into a Python file; this is used in the KFP DSL command.
Code Segment 7. A Single-Line Magic to Write a Cell into a Python File
This writes into a Python file named single_layer_ann_training_program.py
. We then convert this into a YAML file using the CLI command provided by the Kubeflow Pipelines CLI.
%%sh
dsl-compile --py single_layer_ann_training_pipeline.py --output single_layer_ann_training_pipeline.yaml
# Compilation of the pipeline code into a YAML.
Code Segment 8. Converting a Python File to a Kubeflow Pipelines YAML
Conclusion
Kubeflow provides easy ways to create Pipelines, each Component of which runs independently and can be reused. Each step in the process has been documented above.
However, it is heavily dependent on the Kubeflow SDK and process, which is fraught with use of YAML and code, as well as many modifications to the existing code a data scientist might write. Next in the series, we will have a look at Elyra, a tool that allows us to perform all of this through a UI, with no extra or modified code. We will also look at alternative IDEs—specifically, R Studio—and what changes in creating pipelines there.
After that will follow articles on running Pipelines and monitoring them, Auto ML, using a model registry, running pipelines through schedules or in an event-based manner, and monitoring our data and models over time.