flytekit.core.worker_queue
Directory
Classes
Class | Description |
---|---|
Controller |
This controller object is responsible for kicking off and monitoring executions against a Flyte Admin endpoint. |
Update |
|
WorkItem |
This is a class to keep track of what the user requested. |
Variables
Property | Type | Description |
---|---|---|
EAGER_ROOT_ENV_NAME |
str |
|
EAGER_TAG_KEY |
str |
|
EAGER_TAG_ROOT_KEY |
str |
|
NODE_HTML_TEMPLATE |
str |
|
handling_signal |
int |
flytekit.core.worker_queue.Controller
This controller object is responsible for kicking off and monitoring executions against a Flyte Admin endpoint
using a FlyteRemote object. It is used only for running eager tasks. It exposes one async method, add
, which
should be called by the eager task to run a sub-flyte-entity (task, workflow, or a nested eager task).
The controller maintains a dictionary of entries, where each entry is a list of WorkItems. They are maintained in a list because the number of times and order that each task (or subwf, lp) is called affects the execution name which is consistently hashed.
After calling add
, a background thread is started to reconcile the state of this dictionary of WorkItem entries.
Executions that should be kicked off will be kicked off, and ones that are running will be checked. This runs
in a loop similar to a controller loop in a k8s operator.
class Controller(
remote: FlyteRemote,
ss: SerializationSettings,
tag: str,
root_tag: str,
exec_prefix: str,
)
Parameter | Type |
---|---|
remote |
FlyteRemote |
ss |
SerializationSettings |
tag |
str |
root_tag |
str |
exec_prefix |
str |
Methods
Method | Description |
---|---|
add() |
Add an entity along with the requested inputs to be submitted to Admin for running and return a future. |
for_sandbox() |
|
get_env() |
In order for downstream tasks to correctly set the root label, this needs to pass down that information. |
get_execution_name() |
Make a deterministic name. |
get_labels() |
These labels keep track of the current and root (in case of nested) eager execution, that is responsible for. |
get_signal_handler() |
TODO: At some point, this loop would be ideally managed by the loop manager, and the signal handler should. |
launch_execution() |
This function launches executions. |
reconcile_one() |
This is responsible for processing one work item. |
render_html() |
Render the callstack as a deck presentation to be shown after eager workflow execution. |
add()
def add(
entity: RunnableEntity,
input_kwargs: dict[str, typing.Any],
) -> typing.Any
Add an entity along with the requested inputs to be submitted to Admin for running and return a future
Parameter | Type |
---|---|
entity |
RunnableEntity |
input_kwargs |
dict[str, typing.Any] |
for_sandbox()
def for_sandbox(
exec_prefix: typing.Optional[str],
) -> Controller
Parameter | Type |
---|---|
exec_prefix |
typing.Optional[str] |
get_env()
def get_env()
In order for downstream tasks to correctly set the root label, this needs to pass down that information.
get_execution_name()
def get_execution_name(
entity: RunnableEntity,
idx: int,
input_kwargs: dict[str, typing.Any],
) -> str
Make a deterministic name
Parameter | Type |
---|---|
entity |
RunnableEntity |
idx |
int |
input_kwargs |
dict[str, typing.Any] |
get_labels()
def get_labels()
These labels keep track of the current and root (in case of nested) eager execution, that is responsible for kicking off this execution.
get_signal_handler()
def get_signal_handler()
TODO: At some point, this loop would be ideally managed by the loop manager, and the signal handler should gracefully initiate shutdown of all loops, calling .cancel() on all tasks, allowing each loop to clean up, starting with the deepest loop/thread first and working up. https://github.com/flyteorg/flyte/issues/6068
launch_execution()
def launch_execution(
wi: WorkItem,
idx: int,
) -> FlyteWorkflowExecution
This function launches executions.
Parameter | Type |
---|---|
wi |
WorkItem |
idx |
int |
reconcile_one()
def reconcile_one(
update: Update,
)
This is responsible for processing one work item. Will launch, update, set error on the update object Any errors are captured in the update object.
Parameter | Type |
---|---|
update |
Update |
render_html()
def render_html()
Render the callstack as a deck presentation to be shown after eager workflow execution.
flytekit.core.worker_queue.Update
class Update(
work_item: WorkItem,
idx: int,
status: typing.Optional[ItemStatus],
wf_exec: typing.Optional[FlyteWorkflowExecution],
error: typing.Optional[BaseException],
)
Parameter | Type |
---|---|
work_item |
WorkItem |
idx |
int |
status |
typing.Optional[ItemStatus] |
wf_exec |
typing.Optional[FlyteWorkflowExecution] |
error |
typing.Optional[BaseException] |
flytekit.core.worker_queue.WorkItem
This is a class to keep track of what the user requested. Since it captures the arguments that the user wants to run the entity with, an arbitrary map, can’t make this frozen.
class WorkItem(
entity: RunnableEntity,
input_kwargs: dict[str, typing.Any],
result: typing.Any,
error: typing.Optional[BaseException],
status: ItemStatus,
wf_exec: typing.Optional[FlyteWorkflowExecution],
python_interface: typing.Optional[Interface],
uuid: typing.Optional[uuid.UUID],
)
Parameter | Type |
---|---|
entity |
RunnableEntity |
input_kwargs |
dict[str, typing.Any] |
result |
typing.Any |
error |
typing.Optional[BaseException] |
status |
ItemStatus |
wf_exec |
typing.Optional[FlyteWorkflowExecution] |
python_interface |
typing.Optional[Interface] |
uuid |
typing.Optional[uuid.UUID] |
Properties
Property | Type | Description |
---|---|---|
is_in_terminal_state |