Decorating tasks
You can easily change how tasks behave by using decorators to wrap your task functions.
In order to make sure that your decorated function contains all the type annotation and docstring
information that Flyte needs, you will need to use the built-in functools.wraps
decorator.
To begin, create a file called decorating_tasks.py
.
Add the the imports:
import logging
import union
from functools import partial, wraps
Create a logger to monitor the execution’s progress.
logger = logging.getLogger(__file__)
Using a single decorator
We define a decorator that logs the input and output details for a decorated task.
def log_io(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
logger.info(f"task {fn.__name__} called with args: {args}, kwargs: {kwargs}")
out = fn(*args, **kwargs)
logger.info(f"task {fn.__name__} output: {out}")
return out
return wrapper
We create a task named t1
that is decorated with log_io
.
The order of invoking the decorators is important. @task
should always be the outer-most decorator.
@union.task
@log_io
def t1(x: int) -> int:
return x + 1
Stacking multiple decorators
You can also stack multiple decorators on top of each other as long as @task
is the outer-most decorator.
We define a decorator that verifies if the output from the decorated function is a positive number before it’s returned.
If this assumption is violated, it raises a ValueError
exception.
def validate_output(fn=None, *, floor=0):
@wraps(fn)
def wrapper(*args, **kwargs):
out = fn(*args, **kwargs)
if out <= floor:
raise ValueError(f"output of task {fn.__name__} must be a positive number, found {out}")
return out
if fn is None:
return partial(validate_output, floor=floor)
return wrapper
The output of the validate_output
task uses functools.partial
to implement parameterized decorators.
We define a function that uses both the logging and validator decorators.
@union.task
@log_io
@validate_output(floor=10)
def t2(x: int) -> int:
return x + 10
Finally, we compose a workflow that calls t1
and t2
.
@union.workflow
def decorating_task_wf(x: int) -> int:
return t2(x=t1(x=x))
Run the example on Union.ai
To run the workflow, execute the following command:
union run --remote decorating_tasks.py decorating_task_wf --x 10