flytekit.core.context_manager
.. autoclass:: flytekit.core.context_manager::ExecutionState.Mode :noindex: .. autoclass:: flytekit.core.context_manager::ExecutionState.Mode.TASK_EXECUTION :noindex: .. autoclass:: flytekit.core.context_manager::ExecutionState.Mode.LOCAL_WORKFLOW_EXECUTION :noindex: .. autoclass:: flytekit.core.context_manager::ExecutionState.Mode.LOCAL_TASK_EXECUTION :noindex:
Directory
Classes
Class | Description |
---|---|
CompilationState |
Compilation state is used during the compilation of a workflow or task. |
ExecutionParameters |
This is a run-time user-centric context object that is accessible to every @task method. |
ExecutionState |
This is the context that is active when executing a task or a local workflow. |
FlyteContext |
This is an internal-facing context object, that most users will not have to deal with. |
FlyteContextManager |
FlyteContextManager manages the execution context within Flytekit. |
FlyteEntities |
This is a global Object that tracks various tasks and workflows that are declared within a VM during the. |
OutputMetadata |
|
OutputMetadataTracker |
This class is for the users to set arbitrary metadata on output literals. |
SecretsManager |
This provides a secrets resolution logic at runtime. |
SerializableToString |
This protocol is used by the Artifact create_from function. |
Variables
Property | Type | Description |
---|---|---|
flyte_context_Var |
ContextVar |
flytekit.core.context_manager.CompilationState
Compilation state is used during the compilation of a workflow or task. It stores the nodes that were created when walking through the workflow graph.
Attributes:
prefix (str): This is because we may one day want to be able to have subworkflows inside other workflows. If
users choose to not specify their node names, then we can end up with multiple “n0"s. This prefix allows
us to give those nested nodes a distinct name, as well as properly identify them in the workflow.
mode (int): refer to flytekit.extend.ExecutionState.Mode
task_resolver (Optional[TaskResolverMixin]): Please see flytekit.extend.TaskResolverMixin
nodes (Optional[List]): Stores currently compiled nodes so far.
class CompilationState(
prefix: str,
mode: int,
task_resolver: Optional[TaskResolverMixin],
nodes: List,
)
Parameter | Type |
---|---|
prefix |
str |
mode |
int |
task_resolver |
Optional[TaskResolverMixin] |
nodes |
List |
Methods
Method | Description |
---|---|
add_node() |
|
with_params() |
Create a new CompilationState where the mode and task resolver are defaulted to the current object, but they. |
add_node()
def add_node(
n: Node,
)
Parameter | Type |
---|---|
n |
Node |
with_params()
def with_params(
prefix: str,
mode: Optional[int],
resolver: Optional[TaskResolverMixin],
nodes: Optional[List],
) -> CompilationState
Create a new CompilationState where the mode and task resolver are defaulted to the current object, but they and all other args are taken if explicitly provided as an argument.
Usage: s.with_params(“p”, nodes=[])
Parameter | Type |
---|---|
prefix |
str |
mode |
Optional[int] |
resolver |
Optional[TaskResolverMixin] |
nodes |
Optional[List] |
flytekit.core.context_manager.ExecutionParameters
This is a run-time user-centric context object that is accessible to every @task method. It can be accessed using
flytekit.current_context()
This object provides the following objections
- a statsd handler
- a logging handler
- the execution ID as an
flytekit.models.core.identifier.WorkflowExecutionIdentifier
object - a working directory for the user to write arbitrary files to
Please do not confuse this object with the flytekit.FlyteContext
object.
class ExecutionParameters(
execution_date,
tmp_dir,
stats,
execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier],
logging,
raw_output_prefix,
output_metadata_prefix,
checkpoint,
decks,
task_id: typing.Optional[_identifier.Identifier],
enable_deck: bool,
kwargs,
)
Parameter | Type |
---|---|
execution_date |
|
tmp_dir |
|
stats |
|
execution_id |
typing.Optional[_identifier.WorkflowExecutionIdentifier] |
logging |
|
raw_output_prefix |
|
output_metadata_prefix |
|
checkpoint |
|
decks |
|
task_id |
typing.Optional[_identifier.Identifier] |
enable_deck |
bool |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
builder() |
|
get() |
Returns task specific context if present else raise an error. |
has_attr() |
|
new_builder() |
|
with_enable_deck() |
|
with_task_sandbox() |
builder()
def builder()
get()
def get(
key: str,
) -> typing.Any
Returns task specific context if present else raise an error. The returned context will match the key
Parameter | Type |
---|---|
key |
str |
has_attr()
def has_attr(
attr_name: str,
) -> bool
Parameter | Type |
---|---|
attr_name |
str |
new_builder()
def new_builder(
current: Optional[ExecutionParameters],
) -> Builder
Parameter | Type |
---|---|
current |
Optional[ExecutionParameters] |
with_enable_deck()
def with_enable_deck(
enable_deck: bool,
) -> Builder
Parameter | Type |
---|---|
enable_deck |
bool |
with_task_sandbox()
def with_task_sandbox()
Properties
Property | Type | Description |
---|---|---|
checkpoint |
||
decks |
A list of decks of the tasks, and it will be rendered to a html at the end of the task execution. |
|
default_deck |
||
enable_deck |
Returns whether deck is enabled or not |
|
execution_date |
This is a datetime representing the time at which a workflow was started. This is consistent across all tasks executed in a workflow or sub-workflow.
|
|
execution_id |
This is the identifier of the workflow execution within the underlying engine. It will be consistent across all task executions in a workflow or sub-workflow execution.
|
|
logging |
A handle to a useful logging object. TODO: Usage examples |
|
output_metadata_prefix |
||
raw_output_prefix |
||
secrets |
||
stats |
A handle to a special statsd object that provides usefully tagged stats. TODO: Usage examples and better comments |
|
task_id |
At production run-time, this will be generated by reading environment variables that are set by the backend. |
|
timeline_deck |
||
working_directory |
A handle to a special working directory for easily producing temporary files. TODO: Usage examples |
flytekit.core.context_manager.ExecutionState
This is the context that is active when executing a task or a local workflow. This carries the necessary state to execute. Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the user etc.
Attributes: mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc). working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs are uploaded engine_dir (os.PathLike): branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute. user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd handler, a logging handler, the current execution id and a working directory.
class ExecutionState(
working_dir: Union[os.PathLike, str],
mode: Optional[ExecutionState.Mode],
engine_dir: Optional[Union[os.PathLike, str]],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
)
Parameter | Type |
---|---|
working_dir |
Union[os.PathLike, str] |
mode |
Optional[ExecutionState.Mode] |
engine_dir |
Optional[Union[os.PathLike, str]] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
Methods
Method | Description |
---|---|
branch_complete() |
Indicates that we are within a conditional / ifelse block and the active branch is not done. |
is_local_execution() |
|
take_branch() |
Indicates that we are within an if-else block and the current branch has evaluated to true. |
with_params() |
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values. |
branch_complete()
def branch_complete()
Indicates that we are within a conditional / ifelse block and the active branch is not done. Default to SKIPPED
is_local_execution()
def is_local_execution()
take_branch()
def take_branch()
Indicates that we are within an if-else block and the current branch has evaluated to true. Useful only in local execution mode
with_params()
def with_params(
working_dir: Optional[os.PathLike],
mode: Optional[Mode],
engine_dir: Optional[os.PathLike],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
) -> ExecutionState
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.
Parameter | Type |
---|---|
working_dir |
Optional[os.PathLike] |
mode |
Optional[Mode] |
engine_dir |
Optional[os.PathLike] |
branch_eval_mode |
Optional[BranchEvalMode] |
user_space_params |
Optional[ExecutionParameters] |
flytekit.core.context_manager.FlyteContext
This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and compile workflows, serialize Flyte entities, etc.
Even though this object as a current_context
function on it, it should not be called directly. Please use the
flytekit.FlyteContextManager
object instead.
Please do not confuse this object with the flytekit.ExecutionParameters
object.
class FlyteContext(
file_access: FileAccessProvider,
level: int,
flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
compilation_state: Optional[CompilationState],
execution_state: Optional[ExecutionState],
serialization_settings: Optional[SerializationSettings],
in_a_condition: bool,
origin_stackframe: Optional[traceback.FrameSummary],
output_metadata_tracker: Optional[OutputMetadataTracker],
worker_queue: Optional[Controller],
)
Parameter | Type |
---|---|
file_access |
FileAccessProvider |
level |
int |
flyte_client |
Optional['friendly_client.SynchronousFlyteClient'] |
compilation_state |
Optional[CompilationState] |
execution_state |
Optional[ExecutionState] |
serialization_settings |
Optional[SerializationSettings] |
in_a_condition |
bool |
origin_stackframe |
Optional[traceback.FrameSummary] |
output_metadata_tracker |
Optional[OutputMetadataTracker] |
worker_queue |
Optional[Controller] |
Methods
Method | Description |
---|---|
current_context() |
This method exists only to maintain backwards compatibility. |
enter_conditional_section() |
|
get_deck() |
Returns the deck that was created as part of the last execution. |
get_origin_stackframe_repr() |
|
new_builder() |
|
new_compilation_state() |
Creates and returns a default compilation state. |
new_execution_state() |
Creates and returns a new default execution state. |
set_stackframe() |
|
with_client() |
|
with_compilation_state() |
|
with_execution_state() |
|
with_file_access() |
|
with_new_compilation_state() |
|
with_output_metadata_tracker() |
|
with_serialization_settings() |
|
with_worker_queue() |
current_context()
def current_context()
This method exists only to maintain backwards compatibility. Please use
FlyteContextManager.current_context()
instead.
Users of flytekit should be wary not to confuse the object returned from this function
with :py:func:flytekit.current_context
enter_conditional_section()
def enter_conditional_section()
get_deck()
def get_deck()
Returns the deck that was created as part of the last execution.
The return value depends on the execution environment. In a notebook, the return value is compatible with IPython.display and should be rendered in the notebook.
with flytekit.new_context() as ctx:
my_task(...)
ctx.get_deck()
OR if you wish to explicitly display
from IPython import display
display(ctx.get_deck())
get_origin_stackframe_repr()
def get_origin_stackframe_repr()
new_builder()
def new_builder()
new_compilation_state()
def new_compilation_state(
prefix: str,
) -> CompilationState
Creates and returns a default compilation state. For most of the code this should be the entrypoint of compilation, otherwise the code should always uses - with_compilation_state
Parameter | Type |
---|---|
prefix |
str |
new_execution_state()
def new_execution_state(
working_dir: Optional[os.PathLike],
) -> ExecutionState
Creates and returns a new default execution state. This should be used at the entrypoint of execution, in all other cases it is preferable to use with_execution_state
Parameter | Type |
---|---|
working_dir |
Optional[os.PathLike] |
set_stackframe()
def set_stackframe(
s: traceback.FrameSummary,
)
Parameter | Type |
---|---|
s |
traceback.FrameSummary |
with_client()
def with_client(
c: SynchronousFlyteClient,
) -> Builder
Parameter | Type |
---|---|
c |
SynchronousFlyteClient |
with_compilation_state()
def with_compilation_state(
c: CompilationState,
) -> Builder
Parameter | Type |
---|---|
c |
CompilationState |
with_execution_state()
def with_execution_state(
es: ExecutionState,
) -> Builder
Parameter | Type |
---|---|
es |
ExecutionState |
with_file_access()
def with_file_access(
fa: FileAccessProvider,
) -> Builder
Parameter | Type |
---|---|
fa |
FileAccessProvider |
with_new_compilation_state()
def with_new_compilation_state()
with_output_metadata_tracker()
def with_output_metadata_tracker(
t: OutputMetadataTracker,
) -> Builder
Parameter | Type |
---|---|
t |
OutputMetadataTracker |
with_serialization_settings()
def with_serialization_settings(
ss: SerializationSettings,
) -> Builder
Parameter | Type |
---|---|
ss |
SerializationSettings |
with_worker_queue()
def with_worker_queue(
wq: Controller,
) -> Builder
Parameter | Type |
---|---|
wq |
Controller |
Properties
Property | Type | Description |
---|---|---|
user_space_params |
flytekit.core.context_manager.FlyteContextManager
FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation
or Execution. It is not thread-safe and can only be run as a single threaded application currently.
Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState
and ExecutionState
for more information. FlyteContextManager provides a singleton stack to manage these contexts.
Typical usage is
FlyteContextManager.initialize()
with FlyteContextManager.with_context(o) as ctx:
pass
# If required - not recommended you can use
FlyteContextManager.push_context()
# but correspondingly a pop_context should be called
FlyteContextManager.pop_context()
Methods
Method | Description |
---|---|
add_signal_handler() |
|
current_context() |
|
get_origin_stackframe() |
|
initialize() |
Re-initializes the context and erases the entire context. |
pop_context() |
|
push_context() |
|
size() |
|
with_context() |
add_signal_handler()
def add_signal_handler(
handler: typing.Callable[[int, FrameType], typing.Any],
)
Parameter | Type |
---|---|
handler |
typing.Callable[[int, FrameType], typing.Any] |
current_context()
def current_context()
get_origin_stackframe()
def get_origin_stackframe(
limit,
) -> traceback.FrameSummary
Parameter | Type |
---|---|
limit |
initialize()
def initialize()
Re-initializes the context and erases the entire context
pop_context()
def pop_context()
push_context()
def push_context(
ctx: FlyteContext,
f: Optional[traceback.FrameSummary],
) -> FlyteContext
Parameter | Type |
---|---|
ctx |
FlyteContext |
f |
Optional[traceback.FrameSummary] |
size()
def size()
with_context()
def with_context(
b: FlyteContext.Builder,
) -> Generator[FlyteContext, None, None]
Parameter | Type |
---|---|
b |
FlyteContext.Builder |
flytekit.core.context_manager.FlyteEntities
This is a global Object that tracks various tasks and workflows that are declared within a VM during the registration process
flytekit.core.context_manager.OutputMetadata
class OutputMetadata(
artifact: 'Artifact',
dynamic_partitions: Optional[typing.Dict[str, str]],
time_partition: Optional[datetime],
additional_items: Optional[typing.List[SerializableToString]],
)
Parameter | Type |
---|---|
artifact |
'Artifact' |
dynamic_partitions |
Optional[typing.Dict[str, str]] |
time_partition |
Optional[datetime] |
additional_items |
Optional[typing.List[SerializableToString]] |
flytekit.core.context_manager.OutputMetadataTracker
This class is for the users to set arbitrary metadata on output literals.
Attributes: output_metadata Optional[TaskOutputMetadata]: is a sparse dictionary of metadata that the user wants to attach to each output of a task. The key is the output value (object) and the value is an OutputMetadata object.
class OutputMetadataTracker(
output_metadata: typing.Dict[typing.Any, OutputMetadata],
)
Parameter | Type |
---|---|
output_metadata |
typing.Dict[typing.Any, OutputMetadata] |
Methods
Method | Description |
---|---|
add() |
|
get() |
|
with_params() |
Produces a copy of the current object and set new things. |
add()
def add(
obj: typing.Any,
metadata: OutputMetadata,
)
Parameter | Type |
---|---|
obj |
typing.Any |
metadata |
OutputMetadata |
get()
def get(
obj: typing.Any,
) -> Optional[OutputMetadata]
Parameter | Type |
---|---|
obj |
typing.Any |
with_params()
def with_params(
output_metadata: Optional[TaskOutputMetadata],
) -> OutputMetadataTracker
Produces a copy of the current object and set new things
Parameter | Type |
---|---|
output_metadata |
Optional[TaskOutputMetadata] |
flytekit.core.context_manager.SecretsManager
This provides a secrets resolution logic at runtime. The resolution order is
- Try env var first. The env var should have the configuration.SECRETS_ENV_PREFIX. The env var will be all upper cased
- If not then try the file where the name matches lower case
configuration.SECRETS_DEFAULT_DIR/<group>/configuration.SECRETS_FILE_PREFIX<key>
All configuration values can always be overridden by injecting an environment variable
class SecretsManager(
secrets_cfg: typing.Optional[SecretsConfig],
)
Parameter | Type |
---|---|
secrets_cfg |
typing.Optional[SecretsConfig] |
Methods
Method | Description |
---|---|
get() |
Retrieves a secret using the resolution order -> Env followed by file. |
get_secrets_env_var() |
Returns a string that matches the ENV Variable to look for the secrets. |
get_secrets_file() |
Returns a path that matches the file to look for the secrets. |
get()
def get(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
encode_mode: str,
) -> str
Retrieves a secret using the resolution order -> Env followed by file. If not found raises a ValueError param encode_mode, defines the mode to open files, it can either be “r” to read file, or “rb” to read binary file
Parameter | Type |
---|---|
group |
Optional[str] |
key |
Optional[str] |
group_version |
Optional[str] |
encode_mode |
str |
get_secrets_env_var()
def get_secrets_env_var(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
) -> str
Returns a string that matches the ENV Variable to look for the secrets
Parameter | Type |
---|---|
group |
Optional[str] |
key |
Optional[str] |
group_version |
Optional[str] |
get_secrets_file()
def get_secrets_file(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
) -> str
Returns a path that matches the file to look for the secrets
Parameter | Type |
---|---|
group |
Optional[str] |
key |
Optional[str] |
group_version |
Optional[str] |
flytekit.core.context_manager.SerializableToString
This protocol is used by the Artifact create_from function. Basically these objects are serialized when running, and then added to a literal’s metadata.
class SerializableToString(
args,
kwargs,
)
Parameter | Type |
---|---|
args |
*args |
kwargs |
**kwargs |
Methods
Method | Description |
---|---|
serialize_to_string() |
serialize_to_string()
def serialize_to_string(
ctx: FlyteContext,
variable_name: str,
) -> typing.Tuple[str, str]
Parameter | Type |
---|---|
ctx |
FlyteContext |
variable_name |
str |