Pod Example
Once you have a Union account, install union
:
pip install union
Export the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"
Then run the following commands to run the workflow:
git clone https://github.com/unionai/unionai-examples
cd unionai-examples
union run --remote tutorials/sentiment_classifier/sentiment_classifier.py main --model distilbert-base-uncased
The source code for this tutorial can be found here {octicon}mark-github
.
This plugin is no longer needed and is here only for backwards compatibility.
No new versions will be published after v1.13.x Please use the pod_template
and pod_template_name
arguments to @task
Pod configuration for a Flyte task allows you to run multiple containers within a single task.
They provide access to a fully customizable Kubernetes pod spec,
which can be used to modify the runtime of the task execution.
The primary container is the main driver for Flyte task execution and is responsible for producing inputs and outputs.
Pod tasks accept arguments that are commonly used with container tasks, such as resource specifications.
However, these arguments only apply to the primary container.
To customize the other containers that are brought up during task execution, you can define a complete pod spec using the
Kubernetes Python client library’s,
V1PodSpec.
First, we import the necessary libraries for use in the following examples.
import time
from pathlib import Path
from typing import List
from flytekit import ImageSpec, Resources, TaskMetadata, dynamic, map_task, task, workflow
from flytekitplugins.pod import Pod
from kubernetes.client.models import (
V1Container,
V1EmptyDirVolumeSource,
V1PodSpec,
V1ResourceRequirements,
V1Volume,
V1VolumeMount,
)
image_spec = ImageSpec(registry="ghcr.io/flyteorg", packages=["flytekitplugins-pod"])
Add additional properties to the task container
In this example, we define a simple pod specification.
The containers
field is set to an empty list, signaling to Flyte to insert a placeholder primary container.
The node_selector
field specifies the nodes on which the task pod should be run.
@task(
task_config=Pod(
pod_spec=V1PodSpec(
containers=[],
node_selector={"node_group": "memory"},
),
),
requests=Resources(
mem="1G",
),
container_image=image_spec,
)
def pod_task() -> str:
return "Hello from pod task!"
@workflow
def pod_workflow() -> str:
return pod_task()
To configure default settings for all pods Flyte creates, including tasks for pods, containers, PyTorch, Spark, Ray, and Dask, configure a default Kubernetes pod template.
Multiple containers
We define a simple pod spec with a shared volume that is mounted in both the primary and secondary containers. The secondary container writes a file that the primary container waits for and reads before completing. First, we define the shared data path:
_SHARED_DATA_PATH = "/data/message.txt"
We define a pod spec with two containers.
While pod tasks generally allow you to customize Kubernetes container attributes, you can still use Flyte directives to specify resources and the image.
Unless you specify the container_image
task attribute, the default image built for Flyte tasks will be used.
@task(
task_config=Pod(
pod_spec=V1PodSpec(
containers=[
V1Container(
name="primary",
resources=V1ResourceRequirements(
requests={"cpu": "1", "memory": "100Mi"},
limits={"cpu": "1", "memory": "100Mi"},
),
volume_mounts=[
V1VolumeMount(
name="shared-data",
mount_path="/data",
)
],
),
V1Container(
name="secondary",
image="alpine",
command=["/bin/sh"],
args=[
"-c",
"echo hi pod world > {}".format(_SHARED_DATA_PATH),
],
resources=V1ResourceRequirements(
requests={"cpu": "1", "memory": "100Mi"},
limits={"cpu": "1", "memory": "100Mi"},
),
volume_mounts=[
V1VolumeMount(
name="shared-data",
mount_path="/data",
)
],
),
],
volumes=[
V1Volume(
name="shared-data",
empty_dir=V1EmptyDirVolumeSource(medium="Memory"),
)
],
),
),
requests=Resources(
mem="1G",
),
container_image=image_spec,
)
def multiple_containers_pod_task() -> str:
# The code defined in this task will get injected into the primary container.
while not Path(_SHARED_DATA_PATH).is_file():
time.sleep(5)
with open(_SHARED_DATA_PATH, "r") as shared_message_file:
return shared_message_file.read()
@workflow
def multiple_containers_pod_workflow() -> str:
txt = multiple_containers_pod_task()
return txt
Pod configuration in a map task
To use a pod task as part of a map task, you can send the pod task definition to the map_task
.
This will run the pod task across a collection of inputs.
@task(
task_config=Pod(
pod_spec=V1PodSpec(
containers=[
V1Container(
name="primary",
resources=V1ResourceRequirements(
requests={"cpu": ".5", "memory": "500Mi"},
limits={"cpu": ".5", "memory": "500Mi"},
),
)
],
init_containers=[
V1Container(
image="alpine",
name="init",
command=["/bin/sh"],
args=["-c", 'echo "I\'m a customizable init container"'],
resources=V1ResourceRequirements(
limits={"cpu": ".5", "memory": "500Mi"},
),
)
],
),
),
container_image=image_spec,
)
def map_pod_task(int_val: int) -> str:
return str(int_val)
@task(container_image=image_spec)
def coalesce(list_of_strings: List[str]) -> str:
coalesced = ", ".join(list_of_strings)
return coalesced
@workflow
def map_pod_workflow(list_of_ints: List[int]) -> str:
mapped_out = map_task(map_pod_task, metadata=TaskMetadata(retries=1))(int_val=list_of_ints)
coalesced = coalesce(list_of_strings=mapped_out)
return coalesced
Pod configuration in a dynamic workflow
To use a pod task in a dynamic workflow, simply pass the pod task config to the annotated dynamic workflow.
@task(container_image=image_spec)
def stringify(val: int) -> str:
return f"{val} served courtesy of a dynamic pod task!"
@dynamic(
task_config=Pod(
pod_spec=V1PodSpec(
containers=[
V1Container(
name="primary",
resources=V1ResourceRequirements(
requests={"cpu": ".5", "memory": "450Mi"},
limits={"cpu": ".5", "memory": "500Mi"},
),
)
],
),
),
container_image=image_spec,
)
def dynamic_pod_task(val: int) -> str:
return stringify(val=val)
@workflow
def dynamic_pod_workflow(val: int = 6) -> str:
txt = dynamic_pod_task(val=val)
return txt
You can execute workflows locally as follows:
if __name__ == "__main__":
print(f"Running {__file__}...")
print(f"Calling pod_workflow()... {pod_workflow()}")
print(f"Calling multiple_containers_pod_workflow()... {multiple_containers_pod_workflow()}")
print(f"Calling map_pod_workflow()... {map_pod_workflow()}")
print(f"Calling dynamic_pod_workflow()... {dynamic_pod_workflow()}")