Feature Workflow/DAG/Block Telemetrization #79
Conversation
…extVar propagation - Add _workflow_id_ctx module-level ContextVar — asyncio copies it into every create_task/gather branch automatically, so concurrent workflows remain isolated without any explicit passing - Add workflow_scope() async context manager — sets the ContextVar, opens an OTel workflow span via span_scope() (or nullcontext() when telemetry is off), and yields the workflow_id; task spans created inside become structural OTel children - Add execute_block() span scope — wraps each block execution in an OTel block span using its uid as asyncflow.workflow_id; removes dead run_in_executor branch (all block functions are async) - Register a span enricher on TelemetryManager at start_telemetry() — copies asyncflow.workflow_id from event attributes into the OTel task span at creation time, decoupling RHAPSODY from this domain attribute - Propagate workflow_id through all _emit() call sites (TaskCreated, TaskStarted, TaskCompleted, TaskFailed, TaskCanceled, TaskQueued, TaskSubmitted, asyncflow.TaskResolved) by stamping comp_desc[workflow_id] at registration time from the active ContextVar - Add workflow_id kwarg to _emit() — injects asyncflow.workflow_id into event attributes when present, leaving the existing attribute dict untouched otherwise - Replace contextmanager + custom _null_context() with stdlib nullcontext from contextlib
There was a problem hiding this comment.
Code Review
This pull request introduces workflow grouping through the new workflow_scope context manager and enhances telemetry capabilities by allowing the injection of custom OpenTelemetry span processors, metric readers, and resources. It also ensures that tasks executed within a @flow.block automatically inherit the block's UID as their workflow ID. A potential issue was identified regarding the use of a module-level ContextVar for tracking workflow IDs, which could lead to context leakage if multiple WorkflowEngine instances are used concurrently within the same coroutine hierarchy; moving this to an instance attribute was suggested for better isolation.
| _workflow_id_ctx: ContextVar[str | None] = ContextVar( | ||
| "asyncflow_workflow_id", default=None | ||
| ) |
There was a problem hiding this comment.
Defining _workflow_id_ctx as a module-level ContextVar can lead to context leakage between different WorkflowEngine instances if they are used within the same coroutine hierarchy. For example, if two engines are used concurrently and one enters a workflow_scope, it will affect the workflow ID captured by the other engine's tasks because they share the same ContextVar key.
To ensure proper isolation, consider making _workflow_id_ctx an instance attribute of WorkflowEngine (initialized in __init__). Since all methods that access it (workflow_scope, _register_component, execute_block) are instance methods or have access to self, this would provide better isolation for multi-engine environments.
There was a problem hiding this comment.
Typical/covered pattern:
await asyncio.gather(
engine1_workflow(), # own context copy — workflow_scope("wf-A") stays here
engine2_workflow(), # own context copy — completely isolated
)Problematic pattern still valid (not covered):
async def problematic():
async with engine1.workflow_scope("wf-A"):
engine2.my_task() # reads _workflow_id_ctx → gets "wf-A" — wrong|
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
This PR introduces best practice/standard approach to allow teleworking a worklfow/dag or set of workflows in asyncflow.
Added
workflow_scope()_workflow_id_ctxContextVarstart_telemetry()parameters_emit()workflow ID injectionFixed
run_in_executorbranch fromexecute_block()Changed
execute_block()now usesnullcontextexecute_block()now automatically sets_workflow_id_ctxto the block UIDDocs
telemetry.md:workflow_scope()referenceasyncflow.workflow_idpropagation