From 505da9ebd37ec44691d3efa440d73f01d8780543 Mon Sep 17 00:00:00 2001 From: AymenFJA Date: Wed, 3 Jun 2026 06:08:02 +0200 Subject: [PATCH 1/6] This commits focus on block cancellation: fix(engine): correct block/task cancellation and add member propagation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - B2: patched_cancel returned method object instead of calling it - B1: block futures no longer patched; cancellation uses done_callback on block_fut → asyncio.Task cancel via _block_asyncio_tasks - B3: handle_task_cancellation is now idempotent (silent on done future) - M3: _block_members tracks tasks registered inside each block; cancel propagates recursively to all members on block cancellation - M5: future.state attribute (PENDING/RUNNING/DONE/FAILED/CANCELLED), RUNNING set in task_callbacks when backend emits it - M6: explicit workflow_id= kwarg at call-time overrides ContextVar --- src/radical/asyncflow/workflow_manager.py | 86 +++++++-- tests/unit/test_block_execution.py | 199 ++++++++++++++++++++ tests/unit/test_cancellation.py | 74 ++++++++ tests/unit/test_future_state.py | 99 ++++++++++ tests/unit/test_workflow_scope_isolation.py | 86 ++++++++- 5 files changed, 526 insertions(+), 18 deletions(-) create mode 100644 tests/unit/test_block_execution.py create mode 100644 tests/unit/test_cancellation.py create mode 100644 tests/unit/test_future_state.py diff --git a/src/radical/asyncflow/workflow_manager.py b/src/radical/asyncflow/workflow_manager.py index 439e75f..1ea27c0 100644 --- a/src/radical/asyncflow/workflow_manager.py +++ b/src/radical/asyncflow/workflow_manager.py @@ -107,6 +107,11 @@ def __init__( self._dependency_count = {} self._component_change_event = asyncio.Event() + # Block task registry: uid -> asyncio.Task for running execute_block coroutines + self._block_asyncio_tasks: dict[str, asyncio.Task] = {} + # Block member registry: block_uid -> set of component UIDs registered within it + self._block_members: dict[str, set[str]] = {} + self.task_states_map = self.backend.get_task_states_map() # Define decorators @@ -606,6 +611,10 @@ def _handle_flow_component_registration( def wrapper(*args, **kwargs): # Create async future - we only support async comp_fut = asyncio.Future() + comp_fut.state = "PENDING" + + # Extract call-time workflow_id before storing kwargs or calling the function + explicit_workflow_id = kwargs.pop("workflow_id", None) comp_desc = { "args": args, @@ -615,6 +624,7 @@ def wrapper(*args, **kwargs): "task_backend_specific_kwargs": task_backend_specific_kwargs or {}, "target_backend": target_backend, "capture_stdio": capture_stdio, + "_explicit_workflow_id": explicit_workflow_id, } # Only handle async functions @@ -703,7 +713,8 @@ def _register_component( # make sure not to specify both func and executable at the same time comp_desc["name"] = comp_desc["function"].__name__ comp_desc["uid"] = self._assign_uid(prefix=comp_type) - comp_desc["workflow_id"] = self._workflow_id_ctx.get() + # call-time workflow_id takes precedence over the ContextVar + comp_desc["workflow_id"] = comp_desc.pop("_explicit_workflow_id", None) or self._workflow_id_ctx.get() if task_type == EXECUTABLE: comp_desc[FUNCTION] = None # Clear function since we're using executable @@ -760,8 +771,25 @@ def _register_component( self._update_dependency_tracking(comp_desc["uid"]) self._component_change_event.set() - # Setup cancel hook - comp_fut.cancel = self._setup_future_cancel_hook(comp_fut, comp_desc["uid"]) + # Track block membership: if this component is registered from within a block's + # execution context, record it so it gets cancelled when the block is cancelled. + # Read ContextVar directly — not comp_desc["workflow_id"] which may be overridden + # by an explicit call-time kwarg (a telemetry label, not a membership signal). + parent_block_uid = self._workflow_id_ctx.get() + if ( + parent_block_uid + and parent_block_uid in self.components + and self.components[parent_block_uid]["type"] == BLOCK + ): + self._block_members.setdefault(parent_block_uid, set()).add(comp_desc["uid"]) + comp_fut.add_done_callback( + lambda _, buid=parent_block_uid, tuid=comp_desc["uid"]: self._remove_member(buid, tuid) + ) + + # Patch cancel for tasks only; blocks are cancelled via a done_callback + # installed in _submit_blocks that cancels the underlying asyncio.Task directly. + if comp_type != BLOCK: + comp_fut.cancel = self._setup_future_cancel_hook(comp_fut, comp_desc["uid"]) self._emit( "TaskCreated", @@ -827,7 +855,7 @@ def patched_cancel(*args, **kwargs): else: # Task is pending -> cancel locally logger.info(f"Cancellation requested for {uid} (pending) locally") - return fut.original_cancel + return fut.original_cancel() return patched_cancel @@ -926,6 +954,7 @@ def _clear_internal_records(self): self._ready_queue.clear() self._dependents_map.clear() self._dependency_count.clear() + self._block_members.clear() reset_uid_counter() @@ -954,6 +983,14 @@ def _notify_dependents(self, comp_uid: str): if comp_uid in self._dependency_count: del self._dependency_count[comp_uid] + def _remove_member(self, block_uid: str, task_uid: str) -> None: + """Remove a completed task from its parent block's member set.""" + members = self._block_members.get(block_uid) + if members is not None: + members.discard(task_uid) + if not members: + del self._block_members[block_uid] + def _create_dependency_failure_exception(self, comp_desc: dict, failed_deps: list): """Create a DependencyFailureError exception that shows both the immediate failure and the root cause from failed dependencies. @@ -1355,15 +1392,28 @@ async def _submit_blocks(self, blocks: list): - Relies on `execute_block` to handle the actual function call and future """ for block in blocks: - args = block["args"] - kwargs = block["kwargs"] - func = block["function"] - block_fut = self.components[block["uid"]]["future"] - - # Execute the block function as a coroutine - asyncio.create_task( - self.execute_block(block_fut, func, *args, **kwargs), name=block["uid"] + block_uid = block["uid"] + block_fut = self.components[block_uid]["future"] + t = asyncio.create_task( + self.execute_block(block_fut, block["function"], *block["args"], **block["kwargs"]), + name=block_uid, ) + self._block_asyncio_tasks[block_uid] = t + # Remove from registry when the asyncio.Task finishes (any outcome) + t.add_done_callback(lambda _, uid=block_uid: self._block_asyncio_tasks.pop(uid, None)) + # Wire cancellation: if block_fut is cancelled externally after submission, + # propagate to the asyncio.Task. + def _on_block_fut_done(f, task=t, buid=block_uid): + if f.cancelled(): + task.cancel() + f.state = "CANCELLED" + members = self._block_members.pop(buid, None) + if members: + for member_uid in members: + comp = self.components.get(member_uid) + if comp and not comp["future"].done(): + comp["future"].cancel() + block_fut.add_done_callback(_on_block_fut_done) async def execute_block( self, block_fut: asyncio.Future, func: Callable, *args: Any, **kwargs: Any @@ -1423,6 +1473,7 @@ def handle_task_success(self, task: dict, task_fut: asyncio.Future) -> None: task_fut.set_result(task["return_value"]) else: task_fut.set_result(task["stdout"]) + task_fut.state = "DONE" else: logger.warning( f'Attempted to handle an already finished task "{task["uid"]}"' @@ -1482,18 +1533,18 @@ def handle_task_failure( exception = RuntimeError(str(original_error)) task_fut.set_exception(exception) + task_fut.state = "FAILED" def handle_task_cancellation(self, task: dict, task_fut: asyncio.Future): """Handle task cancellation.""" if task_fut.done(): - logger.warning( - f'Attempted to handle an already cancelled task "{task["uid"]}"' - ) - return + return # already resolved — idempotent, nothing to do # Restore original cancel method task_fut.cancel = task_fut.original_cancel - return task_fut.cancel() + result = task_fut.cancel() + task_fut.state = "CANCELLED" + return result @typeguard.typechecked def task_callbacks( @@ -1596,6 +1647,7 @@ def wait_and_set(): # implicit: when a coroutine that awaits the future # is scheduled and started by the event loop, that's # when the "work" is running. + task_fut.state = "RUNNING" if self._telemetry is not None: now = time.time() self._task_start_times[uid] = now diff --git a/tests/unit/test_block_execution.py b/tests/unit/test_block_execution.py new file mode 100644 index 0000000..9ae3a32 --- /dev/null +++ b/tests/unit/test_block_execution.py @@ -0,0 +1,199 @@ +"""Tests for block submission, task registry, and member cancellation propagation.""" + +import asyncio +from concurrent.futures import ThreadPoolExecutor + +import pytest + +from radical.asyncflow import NoopExecutionBackend, WorkflowEngine +from radical.asyncflow.backends import LocalExecutionBackend + + +async def _make_engine(): + return await WorkflowEngine.create(backend=NoopExecutionBackend()) + + +async def _make_local_engine(): + backend = await LocalExecutionBackend(ThreadPoolExecutor(max_workers=8)) + return await WorkflowEngine.create(backend=backend) + + +# --------------------------------------------------------------------------- +# _block_asyncio_tasks registry +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_block_asyncio_tasks_registry_populated(): + """_block_asyncio_tasks must hold an entry for a block while it is running.""" + engine = await _make_engine() + block_started = asyncio.Event() + block_release = asyncio.Event() + + @engine.block + async def my_block(): + block_started.set() + await block_release.wait() + + my_block() + await asyncio.sleep(0.05) + await block_started.wait() + + block_uid = next(iter(engine.components)) + assert block_uid in engine._block_asyncio_tasks + + block_release.set() + await asyncio.sleep(0.05) + + assert block_uid not in engine._block_asyncio_tasks + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_block_asyncio_tasks_registry_cleared_on_completion(): + """Registry must be empty after a block completes normally.""" + engine = await _make_engine() + + @engine.block + async def quick_block(): + return 42 + + quick_block() + await asyncio.sleep(0.1) + + assert len(engine._block_asyncio_tasks) == 0 + + await engine.shutdown() + + +# --------------------------------------------------------------------------- +# _block_members — member registration and cleanup +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_block_members_cleared_on_normal_completion(): + """_block_members must be empty after a block and its tasks complete normally.""" + engine = await _make_engine() + + @engine.block + async def normal_block(): + @engine.function_task + async def inner(): + return "ok" + + await inner() + + block_fut = normal_block() + await asyncio.sleep(0.2) + + assert block_fut.done() and not block_fut.cancelled() + assert len(engine._block_members) == 0 + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_completed_tasks_removed_from_block_members(): + """Tasks that finish before block cancellation are cleaned up by their done_callback.""" + engine = await _make_engine() + block_gate = asyncio.Event() + + @engine.block + async def my_block(): + @engine.function_task + async def quick_task(): + return "done" + + await quick_task() # NoopBackend resolves immediately + await block_gate.wait() # keep block alive for inspection + + my_block() + await asyncio.sleep(0.15) + + block_uid = next(uid for uid, c in engine.components.items() if c["type"] == "block") + assert len(engine._block_members.get(block_uid, set())) == 0 + + block_gate.set() + await asyncio.sleep(0.05) + await engine.shutdown() + + +# --------------------------------------------------------------------------- +# Cancellation propagation to block members +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_block_cancellation_propagates_to_member_tasks(): + """All tasks registered inside a block before cancellation must be cancelled.""" + engine = await _make_local_engine() + + @engine.block + async def my_block(): + @engine.function_task + async def t1(): + await asyncio.sleep(1) + return "t1" + + @engine.function_task + async def t2(dep): + await asyncio.sleep(1) + return "t2" + + @engine.function_task + async def t3(dep1, dep2): + await asyncio.sleep(1) + return "t3" + + f1 = t1() + f2 = t2(f1) + f3 = t3(f1, f2) + await f3 + + block_fut = my_block() + await asyncio.sleep(0.4) + + block_fut.cancel() + await asyncio.sleep(0.3) + + assert block_fut.cancelled() + task_comps = [c for c in engine.components.values() if c["type"] == "task"] + assert len(task_comps) == 3 + assert all(c["future"].cancelled() for c in task_comps) + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_nested_block_cancellation_propagates(): + """Cancelling an outer block propagates recursively to inner blocks and their tasks.""" + engine = await _make_local_engine() + + @engine.block + async def outer_block(): + @engine.block + async def inner_block(): + @engine.function_task + async def deep_task(): + await asyncio.sleep(1) + return "never" + + f = deep_task() + await f + + ib = inner_block() + await ib + + outer_fut = outer_block() + await asyncio.sleep(0.5) + + outer_fut.cancel() + await asyncio.sleep(0.3) + + assert outer_fut.cancelled() + deep_comp = next(c for c in engine.components.values() if c["type"] == "task") + assert deep_comp["future"].cancelled() + + await engine.shutdown() diff --git a/tests/unit/test_cancellation.py b/tests/unit/test_cancellation.py new file mode 100644 index 0000000..e1bf70a --- /dev/null +++ b/tests/unit/test_cancellation.py @@ -0,0 +1,74 @@ +"""Tests for task and block cancellation behaviour.""" + +import asyncio + +import pytest + +from radical.asyncflow import NoopExecutionBackend, WorkflowEngine + + +async def _make_engine(): + return await WorkflowEngine.create(backend=NoopExecutionBackend()) + + +@pytest.mark.asyncio +async def test_pending_cancel_actually_cancels(): + """cancel() on a pending future must mark it cancelled, not return the method object.""" + engine = await _make_engine() + + fut = asyncio.Future() + fut.state = "PENDING" + uid = "task.pending-test" + fut.cancel = engine._setup_future_cancel_hook(fut, uid) + + result = fut.cancel() + + assert result is True + assert fut.cancelled() + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_block_cancel_stops_execution(): + """Cancelling a block future must stop the underlying execute_block coroutine.""" + engine = await _make_engine() + execution_reached_end = False + block_started = asyncio.Event() + + @engine.block + async def long_block(): + nonlocal execution_reached_end + block_started.set() + await asyncio.sleep(2) + execution_reached_end = True + + block_fut = long_block() + await asyncio.sleep(0.05) + await block_started.wait() + + block_fut.cancel() + await asyncio.sleep(0.1) + + assert block_fut.cancelled() + assert not execution_reached_end + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_handle_task_cancellation_is_idempotent(): + """Calling handle_task_cancellation twice on the same future is a silent no-op.""" + engine = await _make_engine() + + fut = asyncio.Future() + fut.state = "PENDING" + fut.original_cancel = fut.cancel + task_desc = {"uid": "task.idempotent-test"} + + engine.handle_task_cancellation(task_desc, fut) + assert fut.cancelled() + + engine.handle_task_cancellation(task_desc, fut) # must not raise + + await engine.shutdown() diff --git a/tests/unit/test_future_state.py b/tests/unit/test_future_state.py new file mode 100644 index 0000000..82db872 --- /dev/null +++ b/tests/unit/test_future_state.py @@ -0,0 +1,99 @@ +"""Tests for the public future.state attribute lifecycle.""" + +import asyncio + +import pytest + +from radical.asyncflow import NoopExecutionBackend, WorkflowEngine + + +async def _make_engine(): + return await WorkflowEngine.create(backend=NoopExecutionBackend()) + + +@pytest.mark.asyncio +async def test_future_state_is_pending_on_creation(): + """A freshly returned future must have state == 'PENDING'.""" + engine = await _make_engine() + + @engine.function_task + async def some_task(): + return 1 + + fut = some_task() + assert fut.state == "PENDING" + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_future_state_transitions_to_done(): + """A successfully completed task future must have state == 'DONE'.""" + engine = await _make_engine() + + @engine.function_task + async def simple_task(): + return "ok" + + fut = simple_task() + await asyncio.sleep(0.1) + + assert fut.state == "DONE" + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_future_state_transitions_to_running(): + """task_callbacks(RUNNING) must set state == 'RUNNING' on the future.""" + engine = await _make_engine() + + @engine.function_task + async def some_task(): + return "ok" + + some_task() + await asyncio.sleep(0.05) # let async_wrapper register the component + + comp = next(iter(engine.components.values())) + task_fut = comp["future"] + + # Simulate the backend emitting a RUNNING transition + engine.task_callbacks(comp["description"], "RUNNING") + + assert task_fut.state == "RUNNING" + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_future_state_transitions_to_failed(): + """handle_task_failure must set state == 'FAILED' on the future.""" + engine = await _make_engine() + + fut = asyncio.Future() + fut.state = "RUNNING" + task_desc = {"uid": "task.fail-test", "exception": RuntimeError("boom"), "stderr": "boom"} + + engine.handle_task_failure(task_desc, fut) + + assert fut.state == "FAILED" + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_future_state_transitions_to_cancelled(): + """handle_task_cancellation must set state == 'CANCELLED' on the future.""" + engine = await _make_engine() + + fut = asyncio.Future() + fut.state = "RUNNING" + fut.original_cancel = fut.cancel + task_desc = {"uid": "task.cancel-test"} + + engine.handle_task_cancellation(task_desc, fut) + + assert fut.state == "CANCELLED" + + await engine.shutdown() diff --git a/tests/unit/test_workflow_scope_isolation.py b/tests/unit/test_workflow_scope_isolation.py index 802028f..ec5ad2c 100644 --- a/tests/unit/test_workflow_scope_isolation.py +++ b/tests/unit/test_workflow_scope_isolation.py @@ -1,7 +1,9 @@ -"""Unit tests for workflow_scope() ContextVar isolation (per-instance fix).""" +"""Unit tests for workflow_scope() ContextVar isolation and workflow_id propagation.""" import asyncio +import pytest + from radical.asyncflow import NoopExecutionBackend, WorkflowEngine @@ -85,3 +87,85 @@ async def outside(x): assert wids["outside"] is None await engine.shutdown() + + +# --------------------------------------------------------------------------- +# Explicit workflow_id kwarg (M6) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_explicit_workflow_id_is_recorded(): + """A task called with workflow_id='my-wf' must record that id in comp_desc.""" + engine = await WorkflowEngine.create(backend=NoopExecutionBackend()) + + @engine.function_task + async def tagged_task(x): + return x + + tagged_task(1, workflow_id="my-wf") + await asyncio.sleep(0.05) + + comp = next(iter(engine.components.values())) + assert comp["description"]["workflow_id"] == "my-wf" + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_explicit_workflow_id_not_passed_to_function(): + """The workflow_id kwarg must be stripped before being forwarded to the function.""" + engine = await WorkflowEngine.create(backend=NoopExecutionBackend()) + received_kwargs = {} + + @engine.function_task + async def capturing_task(**kwargs): + received_kwargs.update(kwargs) + return "ok" + + capturing_task(workflow_id="wf-invisible") + await asyncio.sleep(0.05) + + assert "workflow_id" not in received_kwargs + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_explicit_workflow_id_overrides_context_var(): + """An explicit workflow_id kwarg must take precedence over the active workflow_scope.""" + engine = await WorkflowEngine.create(backend=NoopExecutionBackend()) + + async with engine.workflow_scope("ctx-wf"): + + @engine.function_task + async def ctx_task(x): + return x + + ctx_task(1, workflow_id="explicit-wf") + await asyncio.sleep(0.05) + + comp = next(iter(engine.components.values())) + assert comp["description"]["workflow_id"] == "explicit-wf" + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_workflow_id_falls_back_to_context_var(): + """When no explicit workflow_id is passed, the active workflow_scope value is used.""" + engine = await WorkflowEngine.create(backend=NoopExecutionBackend()) + + async with engine.workflow_scope("scope-wf"): + + @engine.function_task + async def scoped_task(x): + return x + + scoped_task(1) + await asyncio.sleep(0.05) + + comp = next(iter(engine.components.values())) + assert comp["description"]["workflow_id"] == "scope-wf" + + await engine.shutdown() From c6228135625357113c49ef3450845a63e1a417e8 Mon Sep 17 00:00:00 2001 From: AymenFJA Date: Wed, 3 Jun 2026 17:50:09 +0200 Subject: [PATCH 2/6] address gemini comments --- src/radical/asyncflow/workflow_manager.py | 35 ++++++++++---- tests/unit/test_block_execution.py | 37 ++++++++++++-- tests/unit/test_cancellation.py | 25 +++++++++- tests/unit/test_future_state.py | 53 ++++++++++++++++++++- tests/unit/test_workflow_scope_isolation.py | 6 ++- 5 files changed, 140 insertions(+), 16 deletions(-) diff --git a/src/radical/asyncflow/workflow_manager.py b/src/radical/asyncflow/workflow_manager.py index 1ea27c0..cfd8476 100644 --- a/src/radical/asyncflow/workflow_manager.py +++ b/src/radical/asyncflow/workflow_manager.py @@ -714,7 +714,9 @@ def _register_component( comp_desc["name"] = comp_desc["function"].__name__ comp_desc["uid"] = self._assign_uid(prefix=comp_type) # call-time workflow_id takes precedence over the ContextVar - comp_desc["workflow_id"] = comp_desc.pop("_explicit_workflow_id", None) or self._workflow_id_ctx.get() + comp_desc["workflow_id"] = ( + comp_desc.pop("_explicit_workflow_id", None) or self._workflow_id_ctx.get() + ) if task_type == EXECUTABLE: comp_desc[FUNCTION] = None # Clear function since we're using executable @@ -781,9 +783,13 @@ def _register_component( and parent_block_uid in self.components and self.components[parent_block_uid]["type"] == BLOCK ): - self._block_members.setdefault(parent_block_uid, set()).add(comp_desc["uid"]) + self._block_members.setdefault(parent_block_uid, set()).add( + comp_desc["uid"] + ) comp_fut.add_done_callback( - lambda _, buid=parent_block_uid, tuid=comp_desc["uid"]: self._remove_member(buid, tuid) + lambda _, + b_uid=parent_block_uid, + tuid=comp_desc["uid"]: self._remove_member(b_uid, tuid) ) # Patch cancel for tasks only; blocks are cancelled via a done_callback @@ -855,7 +861,10 @@ def patched_cancel(*args, **kwargs): else: # Task is pending -> cancel locally logger.info(f"Cancellation requested for {uid} (pending) locally") - return fut.original_cancel() + result = fut.original_cancel(*args, **kwargs) + if result: + fut.state = "CANCELLED" + return result return patched_cancel @@ -955,6 +964,7 @@ def _clear_internal_records(self): self._dependents_map.clear() self._dependency_count.clear() self._block_members.clear() + self._block_asyncio_tasks.clear() reset_uid_counter() @@ -1395,24 +1405,31 @@ async def _submit_blocks(self, blocks: list): block_uid = block["uid"] block_fut = self.components[block_uid]["future"] t = asyncio.create_task( - self.execute_block(block_fut, block["function"], *block["args"], **block["kwargs"]), + self.execute_block( + block_fut, block["function"], *block["args"], **block["kwargs"] + ), name=block_uid, ) + block_fut.state = "RUNNING" self._block_asyncio_tasks[block_uid] = t # Remove from registry when the asyncio.Task finishes (any outcome) - t.add_done_callback(lambda _, uid=block_uid: self._block_asyncio_tasks.pop(uid, None)) + t.add_done_callback( + lambda _, uid=block_uid: self._block_asyncio_tasks.pop(uid, None) + ) + # Wire cancellation: if block_fut is cancelled externally after submission, # propagate to the asyncio.Task. - def _on_block_fut_done(f, task=t, buid=block_uid): + def _on_block_fut_done(f, task=t, b_uid=block_uid): + members = self._block_members.pop(b_uid, None) if f.cancelled(): task.cancel() f.state = "CANCELLED" - members = self._block_members.pop(buid, None) if members: for member_uid in members: comp = self.components.get(member_uid) if comp and not comp["future"].done(): comp["future"].cancel() + block_fut.add_done_callback(_on_block_fut_done) async def execute_block( @@ -1448,9 +1465,11 @@ async def execute_block( result = await func(*args, **kwargs) if not block_fut.done(): block_fut.set_result(result) + block_fut.state = "DONE" except Exception as e: if not block_fut.done(): block_fut.set_exception(e) + block_fut.state = "FAILED" finally: self._workflow_id_ctx.reset(token) diff --git a/tests/unit/test_block_execution.py b/tests/unit/test_block_execution.py index 9ae3a32..e433f1f 100644 --- a/tests/unit/test_block_execution.py +++ b/tests/unit/test_block_execution.py @@ -72,6 +72,31 @@ async def quick_block(): # --------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_block_members_cleared_on_exception(): + """_block_members must be empty after a block that raises, not just on normal + completion.""" + engine = await _make_engine() + + @engine.block + async def failing_block(): + @engine.function_task + async def inner(): + return "ok" + + await inner() + raise RuntimeError("block error") + + block_fut = failing_block() + await asyncio.sleep(0.2) + + assert block_fut.done() and not block_fut.cancelled() + assert isinstance(block_fut.exception(), RuntimeError) + assert len(engine._block_members) == 0 + + await engine.shutdown() + + @pytest.mark.asyncio async def test_block_members_cleared_on_normal_completion(): """_block_members must be empty after a block and its tasks complete normally.""" @@ -96,7 +121,8 @@ async def inner(): @pytest.mark.asyncio async def test_completed_tasks_removed_from_block_members(): - """Tasks that finish before block cancellation are cleaned up by their done_callback.""" + """Tasks that finish before block cancellation are cleaned up by their + done_callback.""" engine = await _make_engine() block_gate = asyncio.Event() @@ -106,13 +132,15 @@ async def my_block(): async def quick_task(): return "done" - await quick_task() # NoopBackend resolves immediately + await quick_task() # NoopBackend resolves immediately await block_gate.wait() # keep block alive for inspection my_block() await asyncio.sleep(0.15) - block_uid = next(uid for uid, c in engine.components.items() if c["type"] == "block") + block_uid = next( + uid for uid, c in engine.components.items() if c["type"] == "block" + ) assert len(engine._block_members.get(block_uid, set())) == 0 block_gate.set() @@ -168,7 +196,8 @@ async def t3(dep1, dep2): @pytest.mark.asyncio async def test_nested_block_cancellation_propagates(): - """Cancelling an outer block propagates recursively to inner blocks and their tasks.""" + """Cancelling an outer block propagates recursively to inner blocks and their + tasks.""" engine = await _make_local_engine() @engine.block diff --git a/tests/unit/test_cancellation.py b/tests/unit/test_cancellation.py index e1bf70a..3ad1027 100644 --- a/tests/unit/test_cancellation.py +++ b/tests/unit/test_cancellation.py @@ -13,7 +13,8 @@ async def _make_engine(): @pytest.mark.asyncio async def test_pending_cancel_actually_cancels(): - """cancel() on a pending future must mark it cancelled, not return the method object.""" + """Cancel() on a pending future must mark it cancelled, not return the method + object.""" engine = await _make_engine() fut = asyncio.Future() @@ -25,6 +26,28 @@ async def test_pending_cancel_actually_cancels(): assert result is True assert fut.cancelled() + assert fut.state == "CANCELLED" + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_pending_cancel_forwards_msg(): + """Cancel(msg) must forward the message through to the underlying future.""" + engine = await _make_engine() + + fut = asyncio.Future() + fut.state = "PENDING" + fut.cancel = engine._setup_future_cancel_hook(fut, "task.msg-test") + + fut.cancel("cancel-reason") + + assert fut.cancelled() + assert fut.state == "CANCELLED" + + with pytest.raises(asyncio.CancelledError) as exc_info: + await fut + assert "cancel-reason" in str(exc_info.value) await engine.shutdown() diff --git a/tests/unit/test_future_state.py b/tests/unit/test_future_state.py index 82db872..ede4980 100644 --- a/tests/unit/test_future_state.py +++ b/tests/unit/test_future_state.py @@ -73,7 +73,11 @@ async def test_future_state_transitions_to_failed(): fut = asyncio.Future() fut.state = "RUNNING" - task_desc = {"uid": "task.fail-test", "exception": RuntimeError("boom"), "stderr": "boom"} + task_desc = { + "uid": "task.fail-test", + "exception": RuntimeError("boom"), + "stderr": "boom", + } engine.handle_task_failure(task_desc, fut) @@ -97,3 +101,50 @@ async def test_future_state_transitions_to_cancelled(): assert fut.state == "CANCELLED" await engine.shutdown() + + +# --------------------------------------------------------------------------- +# Block future state lifecycle +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_block_future_state_transitions_running_then_done(): + """Block future must be RUNNING after submit and DONE after normal completion.""" + engine = await _make_engine() + block_release = asyncio.Event() + + @engine.block + async def held_block(): + await block_release.wait() + + block_fut = held_block() + await asyncio.sleep(0.05) + + assert block_fut.state == "RUNNING" + + block_release.set() + await asyncio.sleep(0.05) + + assert block_fut.state == "DONE" + + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_block_future_state_transitions_to_failed(): + """A block that raises must have state == 'FAILED' after the exception + propagates.""" + engine = await _make_engine() + + @engine.block + async def failing_block(): + raise ValueError("block error") + + block_fut = failing_block() + await asyncio.sleep(0.1) + + assert block_fut.state == "FAILED" + assert isinstance(block_fut.exception(), ValueError) + + await engine.shutdown() diff --git a/tests/unit/test_workflow_scope_isolation.py b/tests/unit/test_workflow_scope_isolation.py index ec5ad2c..937097a 100644 --- a/tests/unit/test_workflow_scope_isolation.py +++ b/tests/unit/test_workflow_scope_isolation.py @@ -133,7 +133,8 @@ async def capturing_task(**kwargs): @pytest.mark.asyncio async def test_explicit_workflow_id_overrides_context_var(): - """An explicit workflow_id kwarg must take precedence over the active workflow_scope.""" + """An explicit workflow_id kwarg must take precedence over the active + workflow_scope.""" engine = await WorkflowEngine.create(backend=NoopExecutionBackend()) async with engine.workflow_scope("ctx-wf"): @@ -153,7 +154,8 @@ async def ctx_task(x): @pytest.mark.asyncio async def test_workflow_id_falls_back_to_context_var(): - """When no explicit workflow_id is passed, the active workflow_scope value is used.""" + """When no explicit workflow_id is passed, the active workflow_scope value is + used.""" engine = await WorkflowEngine.create(backend=NoopExecutionBackend()) async with engine.workflow_scope("scope-wf"): From 2c86646527dce69a7ce8921138ec16e618d2ac02 Mon Sep 17 00:00:00 2001 From: AymenFJA Date: Wed, 3 Jun 2026 18:17:25 +0200 Subject: [PATCH 3/6] =?UTF-8?q?Fix=20H1=20and=20H2:=20-=20H1:=20from=20O(n?= =?UTF-8?q?)=20to=20O(1):=20self.running=20changed=20from=20list=20to=20se?= =?UTF-8?q?t.=20append=20=E2=86=92=20add,=20remove=20=E2=86=92=20discard.?= =?UTF-8?q?=20The=20in=20check=20and=20list(self.running)=20iteration=20we?= =?UTF-8?q?re=20already=20compatible.=20-=20H2:=20=5Fclear=5Finternal=5Fre?= =?UTF-8?q?cords=20now=20also=20clears=20resolved,=20running,=20=5Ftask=5F?= =?UTF-8?q?submit=5Ftimes,=20and=20=5Ftask=5Fstart=5Ftimes.=20-=20Added=20?= =?UTF-8?q?new=20test=20for=20engine=20lifecycle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/radical/asyncflow/workflow_manager.py | 10 ++- tests/unit/test_engine_lifecycle.py | 86 +++++++++++++++++++++++ 2 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 tests/unit/test_engine_lifecycle.py diff --git a/src/radical/asyncflow/workflow_manager.py b/src/radical/asyncflow/workflow_manager.py index cfd8476..387c5cb 100644 --- a/src/radical/asyncflow/workflow_manager.py +++ b/src/radical/asyncflow/workflow_manager.py @@ -93,7 +93,7 @@ def __init__( self._default_backend_name: str = backend[0].name # Initialize core attributes - self.running = [] + self.running: set[str] = set() self.components = {} self.resolved = set() self.dependencies = {} @@ -965,6 +965,10 @@ def _clear_internal_records(self): self._dependency_count.clear() self._block_members.clear() self._block_asyncio_tasks.clear() + self.resolved.clear() + self.running.clear() + self._task_submit_times.clear() + self._task_start_times.clear() reset_uid_counter() @@ -1245,7 +1249,7 @@ async def run(self): await self.submit(to_submit) for comp_desc in to_submit: comp_uid = comp_desc["uid"] - self.running.append(comp_uid) + self.running.add(comp_uid) self.resolved.add(comp_uid) # Check for completed components and update dependency tracking @@ -1253,7 +1257,7 @@ async def run(self): for comp_uid in list(self.running): if self.components[comp_uid]["future"].done(): completed_components.append(comp_uid) - self.running.remove(comp_uid) + self.running.discard(comp_uid) # Notify dependents of completed components for comp_uid in completed_components: diff --git a/tests/unit/test_engine_lifecycle.py b/tests/unit/test_engine_lifecycle.py new file mode 100644 index 0000000..1d395a7 --- /dev/null +++ b/tests/unit/test_engine_lifecycle.py @@ -0,0 +1,86 @@ +"""Tests for engine internal state correctness across component lifecycle.""" + +import asyncio + +import pytest + +from radical.asyncflow import NoopExecutionBackend, WorkflowEngine + + +async def _make_engine(): + return await WorkflowEngine.create(backend=NoopExecutionBackend()) + + +# --------------------------------------------------------------------------- +# H1 — self.running is a set +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_running_is_a_set(): + """engine.running must be a set, not a list, for O(1) membership and removal.""" + engine = await _make_engine() + assert isinstance(engine.running, set) + await engine.shutdown() + + +@pytest.mark.asyncio +async def test_running_membership_and_cleanup(): + """Tasks must appear in running while submitted and be removed on completion.""" + engine = await _make_engine() + + @engine.function_task + async def t(): + return 1 + + t() + # Let the run loop submit the task and the Noop backend resolve it + await asyncio.sleep(0.1) + + # After Noop resolves immediately, the completed task should have left running + assert len(engine.running) == 0 + + await engine.shutdown() + + +# --------------------------------------------------------------------------- +# H2 — _clear_internal_records clears resolved, running, timing dicts +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_clear_internal_records_resets_resolved_and_running(): + """resolved and running must be empty after shutdown (which calls _clear_internal_records).""" + engine = await _make_engine() + + @engine.function_task + async def work(): + return 42 + + work() + await asyncio.sleep(0.1) + + # Confirm something was processed (resolved is non-empty before shutdown) + assert len(engine.resolved) > 0 + + await engine.shutdown() + + assert len(engine.resolved) == 0 + assert len(engine.running) == 0 + + +@pytest.mark.asyncio +async def test_clear_internal_records_resets_timing_dicts(): + """_task_submit_times and _task_start_times must be empty after _clear_internal_records.""" + engine = await _make_engine() + + # Populate the timing dicts directly to simulate telemetry state + engine._task_submit_times["task.000001"] = 1.0 + engine._task_start_times["task.000001"] = 2.0 + + engine._clear_internal_records() + + assert len(engine._task_submit_times) == 0 + assert len(engine._task_start_times) == 0 + + await engine.shutdown() From d0c0e07a884ac71473990d5dac6c34912d0f8f20 Mon Sep 17 00:00:00 2001 From: AymenFJA Date: Wed, 3 Jun 2026 18:20:12 +0200 Subject: [PATCH 4/6] pre-commit --- tests/unit/test_engine_lifecycle.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_engine_lifecycle.py b/tests/unit/test_engine_lifecycle.py index 1d395a7..706320b 100644 --- a/tests/unit/test_engine_lifecycle.py +++ b/tests/unit/test_engine_lifecycle.py @@ -50,7 +50,8 @@ async def t(): @pytest.mark.asyncio async def test_clear_internal_records_resets_resolved_and_running(): - """resolved and running must be empty after shutdown (which calls _clear_internal_records).""" + """Resolved and running must be empty after shutdown (which calls + _clear_internal_records).""" engine = await _make_engine() @engine.function_task @@ -71,7 +72,8 @@ async def work(): @pytest.mark.asyncio async def test_clear_internal_records_resets_timing_dicts(): - """_task_submit_times and _task_start_times must be empty after _clear_internal_records.""" + """_task_submit_times and _task_start_times must be empty after + _clear_internal_records.""" engine = await _make_engine() # Populate the timing dicts directly to simulate telemetry state From 9e5d568ed875ed7f59a1650b2e39dfaac5685e36 Mon Sep 17 00:00:00 2001 From: AymenFJA Date: Tue, 9 Jun 2026 02:04:36 +0200 Subject: [PATCH 5/6] update docs and changelogs --- CHANGELOG.md | 43 +++++++++++++++++++++++++++ docs/async_workflows.md | 49 ++++++++++++++++++++++++++++++ docs/best_practice.md | 66 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53d03ad..1693bba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,49 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] +### Added + +- **`future.state` attribute** — every component future (task and block) now exposes a `.state` + string attribute tracking its full lifecycle: `PENDING` → `RUNNING` → `DONE` / `FAILED` / + `CANCELLED`. Block futures now correctly transition through all states; previously they remained + at `PENDING` for the duration of their execution. Readable at any point without awaiting. + +- **`workflow_id=` call-time kwarg** — any task or block call now accepts `workflow_id=""` as + a keyword argument to tag that specific component. Takes precedence over any active + `workflow_scope()`. The kwarg is stripped before the function body is invoked. + +### Fixed + +- **Block future state lifecycle** — block futures now transition to `RUNNING` immediately after + `asyncio.create_task`, to `DONE` on normal completion, and to `FAILED` when the block body + raises. Previously all three transitions were missing. + +- **`_block_members` cleanup on non-cancelled outcomes** — member sets are now removed from + `_block_members` for every terminal outcome (DONE, FAILED, CANCELLED). Previously the cleanup + only ran on cancellation, leaving stale entries after normal or failed block completion. + +- **`patched_cancel` forwards `msg` argument** — `fut.cancel(msg=...)` now correctly forwards + all positional and keyword arguments to the underlying `asyncio.Future.cancel()`. Previously + the `msg` was silently dropped, breaking callers that rely on the cancellation message being + propagated through `CancelledError`. + +- **Pending-task cancellation sets `future.state`** — when a pending task is cancelled locally + via `patched_cancel`, `future.state` is now set to `"CANCELLED"`. All other cancellation paths + already set this attribute; this was the only missing case. + +- **`_clear_internal_records` completeness** — `resolved`, `running`, `_task_submit_times`, and + `_task_start_times` are now cleared alongside the other internal structures. Previously these + accumulated stale entries across engine reuse after `shutdown()`. + +- **`_block_asyncio_tasks` cleared on shutdown** — `_clear_internal_records` now also clears the + asyncio.Task registry for blocks, eliminating stale references after engine reset. + +### Changed + +- **`self.running` changed from `list` to `set`** — membership check (`uid in self.running`) and + removal (`running.discard`) in the run loop and `patched_cancel` are now O(1) instead of O(n). + No public API change. + ## [0.4.0] - 2026-05-18 ### Added diff --git a/docs/async_workflows.md b/docs/async_workflows.md index 7715913..d458fa0 100644 --- a/docs/async_workflows.md +++ b/docs/async_workflows.md @@ -98,3 +98,52 @@ asyncio.run(main()) !!! important "When to Use Each" - Use synchronous when workflows must run in sequence or have dependencies - Use asynchronous when workflows are independent and you want better performance + +--- + +## Tagging Workflows with IDs + +AsyncFlow provides two complementary ways to attach a `workflow_id` label to tasks, useful for +grouping related work in logs, dashboards, and telemetry. + +### `workflow_scope()` — tag all tasks in a block of code + +`workflow_scope()` is an async context manager. Every task submitted inside the `async with` block +inherits the given `workflow_id`: + +```python +async with flow.workflow_scope("experiment-42") as wid: + t1 = preprocess() + t2 = train(t1) + await t2 +# all tasks submitted inside carry workflow_id="experiment-42" +``` + +If no ID is passed, a short UUID is generated automatically: + +```python +async with flow.workflow_scope() as wid: + print(wid) # e.g. "wf-3a7f1c2b" + my_task() +``` + +!!! note + Scoping is per-engine instance. Multiple engines running in the same process each have their + own independent scope — one engine's `workflow_scope()` never leaks into another engine's tasks. + +### `workflow_id=` kwarg — tag a single task at call time + +Pass `workflow_id=` directly when calling any task or block to tag only that component: + +```python +result = my_task(workflow_id="run-007") +``` + +- Call-time `workflow_id=` takes precedence over any active `workflow_scope()`. +- The kwarg is stripped before being forwarded to the function body — the function never sees it. + +### How workflow IDs are used + +Workflow IDs flow through to the telemetry system when enabled, making it possible to filter +OTel spans and RHAPSODY JSONL events by workflow. They are also available on the component +description as `comp["description"]["workflow_id"]` for custom introspection. diff --git a/docs/best_practice.md b/docs/best_practice.md index b806b1e..00a3390 100644 --- a/docs/best_practice.md +++ b/docs/best_practice.md @@ -37,6 +37,72 @@ By following these best practices, you can: ``` --- +## Inspect Task and Block State + +Every future returned by a task or block call exposes a `.state` attribute that tracks its lifecycle: + +| State | Meaning | +|---|---| +| `"PENDING"` | Registered; waiting for dependencies to resolve | +| `"RUNNING"` | Submitted to the backend and executing | +| `"DONE"` | Completed successfully | +| `"FAILED"` | Raised an exception | +| `"CANCELLED"` | Cancelled before or during execution | + +```python +fut = my_task() +print(fut.state) # "PENDING" — not yet submitted + +await asyncio.sleep(0) +print(fut.state) # "RUNNING" or "DONE" depending on backend speed + +await fut +print(fut.state) # "DONE", "FAILED", or "CANCELLED" +``` + +!!! tip + Prefer reading `.state` over combining `fut.done()` + `fut.exception()` — `.state` is explicit + about which terminal outcome occurred without a separate branch. + +--- + +## React to Task Completion Without Background Coroutines + +A common mistake is spawning one background coroutine per task to monitor its completion: + +```python +# ❌ Anti-pattern: one coroutine per task +async def watch(fut): + await fut + handle(fut.result()) + +for fut in futs: + asyncio.create_task(watch(fut)) # N coroutines for N tasks +``` + +Use `add_done_callback` instead — one callback function handles all completions, zero extra coroutines: + +```python +# ✅ Correct: zero coroutines, purely event-driven +def on_done(fut): + if fut.cancelled(): + print(f"cancelled — state={fut.state}") + elif fut.exception(): + print(f"failed: {fut.exception()} — state={fut.state}") + else: + print(f"done: {fut.result()} — state={fut.state}") + +for fut in [task1(), task2(), task3()]: + fut.add_done_callback(on_done) +``` + +!!! note + The callback fires synchronously from within the event loop when the future settles. + Keep it fast — offload heavy work to a queue or schedule it with `asyncio.create_task`. + `fut.state` is already set before the callback fires, so it is safe to read inside. + +--- + ## Use Dependencies Correctly Tasks can depend on the output of other tasks: From de74ec92db2e11f31de6b49ee55724144a83cfed Mon Sep 17 00:00:00 2001 From: AymenFJA Date: Thu, 11 Jun 2026 22:53:35 +0200 Subject: [PATCH 6/6] extend shutdown cleanup with block cancellation --- CHANGELOG.md | 2 ++ src/radical/asyncflow/workflow_manager.py | 8 ++++++-- tests/unit/test_cancellation.py | 19 +++++++++++++++++++ tests/unit/test_termination.py | 2 ++ 4 files changed, 29 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1693bba..dab19ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Fixed +- **`shutdown()` no longer crashes when blocks are still running** — block futures are now cancelled directly instead of routing through `handle_task_cancellation`, which assumed `original_cancel` was set (only true for task futures). + - **Block future state lifecycle** — block futures now transition to `RUNNING` immediately after `asyncio.create_task`, to `DONE` on normal completion, and to `FAILED` when the block body raises. Previously all three transitions were missing. diff --git a/src/radical/asyncflow/workflow_manager.py b/src/radical/asyncflow/workflow_manager.py index 387c5cb..da29a21 100644 --- a/src/radical/asyncflow/workflow_manager.py +++ b/src/radical/asyncflow/workflow_manager.py @@ -1750,9 +1750,13 @@ async def shutdown(self, skip_execution_backend: bool = False) -> None: # cancel workflow futures (tasks and blocks) for comp in self.components.values(): future = comp["future"] - comp_desc = comp["description"] if not future.done(): - self.handle_task_cancellation(comp_desc, future) + if comp["type"] == BLOCK: + # Block futures are not patched with original_cancel — cancel directly. + future.cancel() + future.state = "CANCELLED" + else: + self.handle_task_cancellation(comp["description"], future) # Cancel internal components task if not self._run_task.done(): diff --git a/tests/unit/test_cancellation.py b/tests/unit/test_cancellation.py index 3ad1027..67dae46 100644 --- a/tests/unit/test_cancellation.py +++ b/tests/unit/test_cancellation.py @@ -79,6 +79,25 @@ async def long_block(): await engine.shutdown() +@pytest.mark.asyncio +async def test_shutdown_with_running_block_does_not_crash(): + """Shutdown() must not raise AttributeError when a block is still running.""" + engine = await _make_engine() + block_started = asyncio.Event() + + @engine.block + async def long_block(): + block_started.set() + await asyncio.sleep(10) + + block_fut = long_block() + await block_started.wait() + + await engine.shutdown() + + assert block_fut.cancelled() or block_fut.done() + + @pytest.mark.asyncio async def test_handle_task_cancellation_is_idempotent(): """Calling handle_task_cancellation twice on the same future is a silent no-op.""" diff --git a/tests/unit/test_termination.py b/tests/unit/test_termination.py index 7e7c71a..96e1c11 100644 --- a/tests/unit/test_termination.py +++ b/tests/unit/test_termination.py @@ -97,10 +97,12 @@ def mock_handle_cancellation(task_desc, task_fut): engine.components = { "task.000001": { + "type": "task", "future": mock_future1, "description": {"uid": "task.000001", "name": "test_task_1"}, }, "task.000002": { + "type": "task", "future": mock_future2, "description": {"uid": "task.000002", "name": "test_task_2"}, },