Enable and Improve blocks and their cancellation#89
Conversation
fix(engine): correct block/task cancellation and add member propagation - B2: patched_cancel returned method object instead of calling it - B1: block futures no longer patched; cancellation uses done_callback on block_fut → asyncio.Task cancel via _block_asyncio_tasks - B3: handle_task_cancellation is now idempotent (silent on done future) - M3: _block_members tracks tasks registered inside each block; cancel propagates recursively to all members on block cancellation - M5: future.state attribute (PENDING/RUNNING/DONE/FAILED/CANCELLED), RUNNING set in task_callbacks when backend emits it - M6: explicit workflow_id= kwarg at call-time overrides ContextVar
There was a problem hiding this comment.
Code Review
This pull request introduces block task and member registries to track block execution and propagate cancellation to member tasks, supports explicit workflow_id overrides, and tracks future states. The reviewer identified several critical issues: setting arbitrary attributes directly on asyncio.Future will raise an AttributeError in standard CPython, local cancellation fails to forward arguments and update the future's state, block futures lack state transitions and leak memory on normal completion, and the new _block_asyncio_tasks registry is not cleared during internal record resets.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| # Create async future - we only support async | ||
| comp_fut = asyncio.Future() | ||
| comp_fut.state = "PENDING" |
There was a problem hiding this comment.
In standard CPython, asyncio.Future is implemented in C and does not have a __dict__ (or uses __slots__ in its pure Python fallback). Attempting to set arbitrary attributes like state, id, or cancel directly on an asyncio.Future instance will raise an AttributeError on standard runtimes.
To make this robust and fully compatible with standard Python runtimes, we should subclass asyncio.Future (e.g., class FlowFuture(asyncio.Future): pass) and instantiate that instead. Subclasses automatically get a __dict__ in Python, allowing arbitrary attribute assignment.
| # Create async future - we only support async | |
| comp_fut = asyncio.Future() | |
| comp_fut.state = "PENDING" | |
| # Create async future - we only support async | |
| class FlowFuture(asyncio.Future): | |
| pass | |
| comp_fut = FlowFuture() | |
| comp_fut.state = 'PENDING' |
| else: | ||
| # Task is pending -> cancel locally | ||
| logger.info(f"Cancellation requested for {uid} (pending) locally") | ||
| return fut.original_cancel | ||
| return fut.original_cancel() |
There was a problem hiding this comment.
There are two issues here:
fut.original_cancelis called without any arguments. In modern Pythonasyncio,cancel()can accept an optionalmsgargument (e.g.,fut.cancel("reason")). We should forward*argsand**kwargstooriginal_cancelto avoid dropping these arguments.- When a pending task is cancelled locally, its
stateattribute is never updated to"CANCELLED". We should setfut.state = "CANCELLED"if the cancellation is successful.
| else: | |
| # Task is pending -> cancel locally | |
| logger.info(f"Cancellation requested for {uid} (pending) locally") | |
| return fut.original_cancel | |
| return fut.original_cancel() | |
| else: | |
| # Task is pending -> cancel locally | |
| logger.info(f'Cancellation requested for {uid} (pending) locally') | |
| cancelled = fut.original_cancel(*args, **kwargs) | |
| if cancelled: | |
| fut.state = 'CANCELLED' | |
| return cancelled |
| self._block_asyncio_tasks[block_uid] = t | ||
| # Remove from registry when the asyncio.Task finishes (any outcome) | ||
| t.add_done_callback(lambda _, uid=block_uid: self._block_asyncio_tasks.pop(uid, None)) | ||
| # Wire cancellation: if block_fut is cancelled externally after submission, | ||
| # propagate to the asyncio.Task. | ||
| def _on_block_fut_done(f, task=t, buid=block_uid): | ||
| if f.cancelled(): | ||
| task.cancel() | ||
| f.state = "CANCELLED" | ||
| members = self._block_members.pop(buid, None) | ||
| if members: | ||
| for member_uid in members: | ||
| comp = self.components.get(member_uid) | ||
| if comp and not comp["future"].done(): | ||
| comp["future"].cancel() | ||
| block_fut.add_done_callback(_on_block_fut_done) |
There was a problem hiding this comment.
There are two issues with block state tracking and cleanup:
- The block future state is never updated to
"RUNNING","DONE", or"FAILED"during its execution lifecycle (it remains"PENDING"forever unless cancelled). We should update the state when the block starts running and when it completes or fails. - If a block completes normally or fails, its entry in
self._block_membersis never cleaned up, which leads to a memory leak. We should pop the block fromself._block_memberswhen the block future is done.
self._block_asyncio_tasks[block_uid] = t
block_fut.state = 'RUNNING'
# Remove from registry when the asyncio.Task finishes (any outcome)
t.add_done_callback(lambda _, uid=block_uid: self._block_asyncio_tasks.pop(uid, None))
# Wire cancellation and state transitions: if block_fut is done,
# propagate cancellation and update state.
def _on_block_fut_done(f, task=t, buid=block_uid):
if f.cancelled():
task.cancel()
f.state = 'CANCELLED'
members = self._block_members.pop(buid, None)
if members:
for member_uid in members:
comp = self.components.get(member_uid)
if comp and not comp['future'].done():
comp['future'].cancel()
elif f.exception() is not None:
f.state = 'FAILED'
self._block_members.pop(buid, None)
else:
f.state = 'DONE'
self._block_members.pop(buid, None)
block_fut.add_done_callback(_on_block_fut_done)| self._ready_queue.clear() | ||
| self._dependents_map.clear() | ||
| self._dependency_count.clear() | ||
| self._block_members.clear() |
There was a problem hiding this comment.
The _block_asyncio_tasks registry is not cleared in _clear_internal_records. To prevent reference leaks and ensure a clean state when resetting the engine, we should clear it here.
| self._ready_queue.clear() | |
| self._dependents_map.clear() | |
| self._dependency_count.clear() | |
| self._block_members.clear() | |
| self._ready_queue.clear() | |
| self._dependents_map.clear() | |
| self._dependency_count.clear() | |
| self._block_members.clear() | |
| self._block_asyncio_tasks.clear() |
- H1: from O(n) to O(1): self.running changed from list to set. append → add, remove → discard. The in check and list(self.running) iteration were already compatible. - H2: _clear_internal_records now also clears resolved, running, _task_submit_times, and _task_start_times. - Added new test for engine lifecycle
This PR: correct block/task cancellation and add member propagation
future.stateattribute (PENDING/RUNNING/DONE/FAILED/CANCELLED), RUNNING set in task_callbacks when backend emits it