0.1.dev2192+g7c539c3.d20250403

flytekit.core.environment

Directory

Classes

Class Description
Environment

Methods

Method Description
forge()
inherit()

Variables

Property Type Description
P ParamSpec
T TypeVar

Methods

forge()

def forge(
    source: typing.Callable[typing.Concatenate[typing.Any, ~P], ~T],
) -> typing.Callable[[typing.Callable], typing.Callable[typing.Concatenate[typing.Any, ~P], ~T]]
Parameter Type
source typing.Callable[typing.Concatenate[typing.Any, ~P], ~T]

inherit()

def inherit(
    old: dict[str, typing.Any],
    new: dict[str, typing.Any],
) -> dict[str, typing.Any]
Parameter Type
old dict[str, typing.Any]
new dict[str, typing.Any]

flytekit.core.environment.Environment

class Environment(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
)

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,

@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

For specific task types

@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

Methods

Method Description
dynamic() Please first see the comments for :py:func:`flytekit.
extend() This is the core decorator to use for any task type in flytekit.
show()
task() This is the core decorator to use for any task type in flytekit.
update() This is the core decorator to use for any task type in flytekit.

dynamic()

def dynamic(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]

Please first see the comments for :py:func:flytekit.task and :py:func:flytekit.workflow. This dynamic concept is an amalgamation of both and enables the user to pursue some :std:ref:pretty incredible <cookbook:advanced_merge_sort> constructs.

In short, a task’s function is run at execution time only, and a workflow function is run at compilation time only (local execution notwithstanding). A dynamic workflow is modeled on the backend as a task, but at execution time, the function body is run to produce a workflow. It is almost as if the decorator changed from @task to @workflow except workflows cannot make use of their inputs like native Python values whereas dynamic workflows can. The resulting workflow is passed back to the Flyte engine and is run as a :std:ref:subworkflow <cookbook:subworkflows>. Simple usage

@dynamic
def my_dynamic_subwf(a: int) -> (typing.List[str], int):
    s = []
    for i in range(a):
        s.append(t1(a=i))
    return s, 5

Note in the code block that we call the Python range operator on the input. This is typically not allowed in a workflow but it is here. You can even express dependencies between tasks.

@dynamic
def my_dynamic_subwf(a: int, b: int) -> int:
    x = t1(a=a)
    return t2(b=b, x=x)

See the :std:ref:cookbook <cookbook:subworkflows> for a longer discussion.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

extend()

def extend(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,

@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

For specific task types

@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

show()

def show()

task()

def task(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,

@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

For specific task types

@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs

update()

def update(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,

@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

For specific task types

@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type
_task_function Optional[Callable[P, FuncOut]]
task_config Optional[T]
cache Union[bool, Cache]
retries int
interruptible Optional[bool]
deprecated str
timeout Union[datetime.timedelta, int]
container_image Optional[Union[str, ImageSpec]]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
secret_requests Optional[List[Secret]]
execution_mode PythonFunctionTask.ExecutionBehavior
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]
task_resolver Optional[TaskResolverMixin]
docs Optional[Documentation]
disable_deck Optional[bool]
enable_deck Optional[bool]
deck_fields Optional[Tuple[DeckField, ...]]
pod_template Optional['PodTemplate']
pod_template_name Optional[str]
accelerator Optional[BaseAccelerator]
pickle_untyped bool
shared_memory Optional[Union[L[True], str]]
resources Optional[Resources]
kwargs **kwargs