Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,51 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## [Unreleased]

### Added

- **`future.state` attribute** — every component future (task and block) now exposes a `.state`
string attribute tracking its full lifecycle: `PENDING` → `RUNNING` → `DONE` / `FAILED` /
`CANCELLED`. Block futures now correctly transition through all states; previously they remained
at `PENDING` for the duration of their execution. Readable at any point without awaiting.

- **`workflow_id=` call-time kwarg** — any task or block call now accepts `workflow_id="<id>"` as
a keyword argument to tag that specific component. Takes precedence over any active
`workflow_scope()`. The kwarg is stripped before the function body is invoked.

### Fixed

- **`shutdown()` no longer crashes when blocks are still running** — block futures are now cancelled directly instead of routing through `handle_task_cancellation`, which assumed `original_cancel` was set (only true for task futures).

- **Block future state lifecycle** — block futures now transition to `RUNNING` immediately after
`asyncio.create_task`, to `DONE` on normal completion, and to `FAILED` when the block body
raises. Previously all three transitions were missing.

- **`_block_members` cleanup on non-cancelled outcomes** — member sets are now removed from
`_block_members` for every terminal outcome (DONE, FAILED, CANCELLED). Previously the cleanup
only ran on cancellation, leaving stale entries after normal or failed block completion.

- **`patched_cancel` forwards `msg` argument** — `fut.cancel(msg=...)` now correctly forwards
all positional and keyword arguments to the underlying `asyncio.Future.cancel()`. Previously
the `msg` was silently dropped, breaking callers that rely on the cancellation message being
propagated through `CancelledError`.

- **Pending-task cancellation sets `future.state`** — when a pending task is cancelled locally
via `patched_cancel`, `future.state` is now set to `"CANCELLED"`. All other cancellation paths
already set this attribute; this was the only missing case.

- **`_clear_internal_records` completeness** — `resolved`, `running`, `_task_submit_times`, and
`_task_start_times` are now cleared alongside the other internal structures. Previously these
accumulated stale entries across engine reuse after `shutdown()`.

- **`_block_asyncio_tasks` cleared on shutdown** — `_clear_internal_records` now also clears the
asyncio.Task registry for blocks, eliminating stale references after engine reset.

### Changed

- **`self.running` changed from `list` to `set`** — membership check (`uid in self.running`) and
removal (`running.discard`) in the run loop and `patched_cancel` are now O(1) instead of O(n).
No public API change.

## [0.4.0] - 2026-05-18

### Added
Expand Down
49 changes: 49 additions & 0 deletions docs/async_workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,52 @@ asyncio.run(main())
!!! important "When to Use Each"
- Use synchronous when workflows must run in sequence or have dependencies
- Use asynchronous when workflows are independent and you want better performance

---

## Tagging Workflows with IDs

AsyncFlow provides two complementary ways to attach a `workflow_id` label to tasks, useful for
grouping related work in logs, dashboards, and telemetry.

### `workflow_scope()` — tag all tasks in a block of code

`workflow_scope()` is an async context manager. Every task submitted inside the `async with` block
inherits the given `workflow_id`:

```python
async with flow.workflow_scope("experiment-42") as wid:
t1 = preprocess()
t2 = train(t1)
await t2
# all tasks submitted inside carry workflow_id="experiment-42"
```

If no ID is passed, a short UUID is generated automatically:

```python
async with flow.workflow_scope() as wid:
print(wid) # e.g. "wf-3a7f1c2b"
my_task()
```

!!! note
Scoping is per-engine instance. Multiple engines running in the same process each have their
own independent scope — one engine's `workflow_scope()` never leaks into another engine's tasks.

### `workflow_id=` kwarg — tag a single task at call time

Pass `workflow_id=` directly when calling any task or block to tag only that component:

```python
result = my_task(workflow_id="run-007")
```

- Call-time `workflow_id=` takes precedence over any active `workflow_scope()`.
- The kwarg is stripped before being forwarded to the function body — the function never sees it.

### How workflow IDs are used

Workflow IDs flow through to the telemetry system when enabled, making it possible to filter
OTel spans and RHAPSODY JSONL events by workflow. They are also available on the component
description as `comp["description"]["workflow_id"]` for custom introspection.
66 changes: 66 additions & 0 deletions docs/best_practice.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,72 @@ By following these best practices, you can:
```
---

## Inspect Task and Block State

Every future returned by a task or block call exposes a `.state` attribute that tracks its lifecycle:

| State | Meaning |
|---|---|
| `"PENDING"` | Registered; waiting for dependencies to resolve |
| `"RUNNING"` | Submitted to the backend and executing |
| `"DONE"` | Completed successfully |
| `"FAILED"` | Raised an exception |
| `"CANCELLED"` | Cancelled before or during execution |

```python
fut = my_task()
print(fut.state) # "PENDING" — not yet submitted

await asyncio.sleep(0)
print(fut.state) # "RUNNING" or "DONE" depending on backend speed

await fut
print(fut.state) # "DONE", "FAILED", or "CANCELLED"
```

!!! tip
Prefer reading `.state` over combining `fut.done()` + `fut.exception()` — `.state` is explicit
about which terminal outcome occurred without a separate branch.

---

## React to Task Completion Without Background Coroutines

A common mistake is spawning one background coroutine per task to monitor its completion:

```python
# ❌ Anti-pattern: one coroutine per task
async def watch(fut):
await fut
handle(fut.result())

for fut in futs:
asyncio.create_task(watch(fut)) # N coroutines for N tasks
```

Use `add_done_callback` instead — one callback function handles all completions, zero extra coroutines:

```python
# ✅ Correct: zero coroutines, purely event-driven
def on_done(fut):
if fut.cancelled():
print(f"cancelled — state={fut.state}")
elif fut.exception():
print(f"failed: {fut.exception()} — state={fut.state}")
else:
print(f"done: {fut.result()} — state={fut.state}")

for fut in [task1(), task2(), task3()]:
fut.add_done_callback(on_done)
```

!!! note
The callback fires synchronously from within the event loop when the future settles.
Keep it fast — offload heavy work to a queue or schedule it with `asyncio.create_task`.
`fut.state` is already set before the callback fires, so it is safe to read inside.

---

## Use Dependencies Correctly

Tasks can depend on the output of other tasks:
Expand Down
Loading
Loading