Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,59 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

### Added

- **`WorkflowEngine.workflow_scope(workflow_id=None)`** — new async context
manager that groups all tasks registered inside under a shared
`asyncflow.workflow_id`. Internally sets `_workflow_id_ctx` (a module-level
`ContextVar`), which asyncio copies into every `create_task` / `gather`
branch automatically so concurrent workflow instances remain isolated. When
telemetry is active, also opens an OTel `"workflow"` span, making all task
spans inside structural children in the trace hierarchy. Auto-generates a
short UUID if `workflow_id` is `None`.

- **`_workflow_id_ctx` ContextVar** — per-instance `ContextVar` (default
`None`) that carries the active workflow ID across asyncio task boundaries
without explicit argument passing. Each `WorkflowEngine` instance owns its
own uniquely-named ContextVar (`asyncflow_workflow_id.<uid>`), preventing
context leakage when multiple engines are used within the same coroutine.
`@flow.block` execution sets it to the block's UID so tasks inside a block
inherit the workflow ID automatically.

- **`WorkflowEngine.start_telemetry()` new parameters** — `span_processors`,
`metric_readers`, `resource` — forwarded to `TelemetryManager.__init__()`,
mirroring `Session.start_telemetry()`. Enables passing pre-built OTel
exporters (OTLP, Prometheus, Jaeger) without any RHAPSODY code changes.

- **Span enricher for `asyncflow.workflow_id`** — registered automatically by
`start_telemetry()` via `register_span_enricher()`. Stamps
`asyncflow.workflow_id` onto every task OTel span when the task was created
inside a `workflow_scope()` or `@flow.block`. Enables per-workflow Gantt
views and span filtering in Jaeger / Grafana Tempo.

- **`_emit()` `workflow_id` kwarg** — when `workflow_id` is set, injects
`asyncflow.workflow_id` into the event's `attributes` dict. All task lifecycle
events emitted by `WorkflowEngine` (TaskCreated, asyncflow.TaskResolved,
TaskSubmitted) carry the active workflow ID.

- **Example `01-workflow_grouping.py`** — complete rewrite as an HPC Campaign
Manager simulation. Models 4 workflow types with resource tracking and
dependency chains:
- `simulate` (4 tasks, GPU, no deps) — molecular dynamics runs
- `analyze` (4 tasks, GPU, deps=simulate) — post-processing per simulation
- `train` (8 tasks, GPU, no deps) — distributed ML training
- `evaluate` (8 tasks, CPU, deps=train) — lightweight model evaluation
Uses `ResourcePool` (asyncio-queue-based GPU/CPU slot tracking) and emits
`campaign.ResourceAssigned` custom events to record per-instance resource
assignments in the JSONL checkpoint.

- **`plot_campaign.py`** — new plotting script producing a two-panel Campaign
Manager timeline figure:
- **Top panel**: Gantt chart with rows per workflow instance, bars coloured by
workflow type and labelled with the assigned resource (`gpu:N` / `cpu:N`).
Right-margin annotations show priority / cpu / gpu per type. Right column
renders a config table, scheduler description box, and dependency graph.
- **Bottom panel**: GPU (left axis) and CPU (right axis) utilisation as step
functions over elapsed time, with total-capacity dashed reference lines.

- **`capture_stdio` decorator parameter** — `@flow.executable_task(capture_stdio=True)` redirects
stdout/stderr from executable tasks directly to files instead of collecting them in memory.
The awaited future resolves to the stdout **file path** (`{work_dir}/{uid}/{task_uid}.stdout`)
Expand All @@ -25,6 +78,43 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Unit tests for `capture_stdio` field placement, default value, `_work_dir` authority,
and end-to-end file I/O with `ConcurrentExecutionBackend`.

### Fixed

- **Block spans parented correctly** — `execute_block()` now benefits from the
`span_scope()` session-span fallback added in RHAPSODY: block spans that run
in a context with no active OTel span (e.g. spawned from `run()` before
`start_telemetry()` was awaited) now correctly nest under the session root
span instead of floating as unrooted traces.

- **`execute_block()` dead branch removed** — the `run_in_executor` code path
(used when the wrapped function was sync) is removed. `WorkflowEngine`
enforces a strict async API; all block functions must be `async def`.

### Changed

- **`execute_block()` uses `nullcontext`** — the no-telemetry code path uses
stdlib `nullcontext` (Python >= 3.7) instead of the former custom
`_null_context()` helper, which is removed.

- **`execute_block()` sets `_workflow_id_ctx`** — the block's UID is set as the
active `_workflow_id_ctx` for the duration of block execution so every task
registered inside the block inherits it as `asyncflow.workflow_id` without
requiring an explicit `workflow_scope()` call.

### Docs

- **`docs/telemetry.md`**:
- Added "Forwarding to an external backend" section with corrected
`span_processors` / `metric_readers` code examples (replaces the broken
`set_tracer_provider()` pattern).
- Added "`workflow_scope()` context manager" section with usage examples and
auto-ID generation.
- Added "OTel span hierarchy" section showing the four-level
`session -> workflow -> block -> task` tree and explaining how
`asyncflow.workflow_id` propagates to every span attribute and JSONL event.
- Updated `start_telemetry()` signature to include `span_processors`,
`metric_readers`, and `resource` parameters.

## [0.3.1] - 2026-03-09

### Fixed
Expand Down
Binary file added docs/assets/workflow-telemetry-dashboard.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
97 changes: 95 additions & 2 deletions docs/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ backend = await ConcurrentExecutionBackend(ProcessPoolExecutor())
flow = await WorkflowEngine.create(backend)

telemetry = await flow.start_telemetry(
resource_poll_interval=5.0, # node CPU/memory/GPU every 5 s
checkpoint_path="./telemetry/", # write a JSONL file (optional)
resource_poll_interval=5.0, # node CPU/memory/GPU every 5 s
checkpoint_path="./telemetry/", # write a JSONL file (optional)
)
```

Expand All @@ -34,6 +34,99 @@ await flow.shutdown() # also stops telemetry
await telemetry.stop()
```

### Forwarding to an external backend

Pass pre-built OTel `SpanProcessor` and/or `MetricReader` instances to forward data to Jaeger, Grafana Tempo, Honeycomb, Prometheus, or any other OTel-compatible backend:

```python
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter

telemetry = await flow.start_telemetry(
span_processors=[BatchSpanProcessor(OTLPSpanExporter())],
metric_readers=[PeriodicExportingMetricReader(OTLPMetricExporter())],
)
```

Exporters read `OTEL_EXPORTER_OTLP_ENDPOINT`, `OTEL_EXPORTER_OTLP_HEADERS`, and `OTEL_SERVICE_NAME` from the environment. See [RHAPSODY Integrations](https://radical-cybertools.github.io/rhapsody/telemetry/integrations/) for the full parameter reference.

---

## Workflow grouping with `workflow_scope()`

By default all tasks share the same `session_id`. Use `workflow_scope()` to tag every task submitted inside the scope with a `asyncflow.workflow_id` attribute — enabling per-workflow filtering in Jaeger, Tempo, and the JSONL checkpoint:

```python
async with flow.workflow_scope("etl-run-42"):
raw = ingest("source.csv")
val = validate(raw)
result = await report(val)
```

Every span and JSONL event emitted inside the scope carries `asyncflow.workflow_id = "etl-run-42"`. The scope maps directly onto a named OTel span (`name="workflow"`), so the flame graph in Tempo shows `session → workflow(etl-run-42) → task` hierarchy.

If no `workflow_id` is passed, one is auto-generated (`wf-<hex8>`).

```python
# Auto-ID
async with flow.workflow_scope() as wid:
print(wid) # e.g. "wf-3a1b9c2d"
```

`@flow.block`-decorated functions are automatically tagged with the block's UID as the `workflow_id` — no explicit `workflow_scope()` needed inside a block body.

### Visualising workflow telemetry

The example in `examples/telemetry/01-workflow_grouping.py`
runs six parallel ML training pipelines (load → preprocess → train → evaluate), each
wrapped in a `workflow_scope()`. Running the companion plotting script against the
JSONL checkpoint:

```bash
python examples/telemetry/01-workflow_grouping.py --out results/
python examples/telemetry/plot_workflow_gantt.py results/
```

produces a six-panel dashboard:

![AsyncFlow workflow telemetry dashboard](assets/workflow-telemetry-dashboard.png)

| Panel | What it shows |
|---|---|
| **Workflow Gantt** | One row per workflow instance; coloured bars = pipeline stages; light band = dependency wait |
| **Stage Execution Time** | Stacked bars in ms per stage per workflow — reveals which stage dominates each run |
| **Workflow Concurrency** | In-flight workflows over time — shows scheduler saturation |
| **Workflow Throughput** | Completion histogram — when pipelines finish relative to session start |
| **E2E Latency Distribution** | Histogram of total elapsed time per workflow with mean/median lines |
| **Stage × Time Heatmap** | 2-D heatmap (stages × time bins): task occupancy per cell — reveals bottleneck stages |

---

## OTel span hierarchy

When telemetry is enabled, AsyncFlow produces a four-level span tree:

```
Trace (one per session)
└── session span [SessionStarted … SessionEnded]
├── workflow span [workflow_scope("etl-run-42")]
│ ├── task span task_id=ingest-001
│ ├── task span task_id=validate-001
│ └── task span task_id=report-001
├── block span [execute_block("pipeline_block")]
│ ├── task span task_id=preprocess-001
│ ├── task span task_id=compute-001
│ └── task span task_id=store-001
└── task span (ungrouped — submitted outside any scope)
```

The `asyncflow.workflow_id` attribute is stamped on every task span and every JSONL lifecycle event inside a scope. This makes per-workflow Gantt views, performance comparison across workflow types, and span filtering all possible without any post-processing join.

---

## AsyncFlow-specific events
Expand Down
173 changes: 173 additions & 0 deletions examples/telemetry/01-workflow_grouping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""01-workflow_grouping.py — Parallel ML training campaign with workflow telemetry.

Demonstrates ``workflow_scope()``: six independent training pipelines run
concurrently, each with four sequential stages:

load_data → preprocess → train → evaluate

Every task inside a ``workflow_scope()`` automatically carries
``asyncflow.workflow_id`` in every telemetry event, enabling per-pipeline
Gantt charts, stage breakdowns, and cross-pipeline comparisons — with zero
changes to the task functions themselves.

Usage
-----
python 01-workflow_grouping.py
python 01-workflow_grouping.py --out results/
python plot_workflow_gantt.py results/
"""

import argparse
import asyncio
import logging
import time
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path

from rhapsody.backends import ConcurrentExecutionBackend
from rhapsody.telemetry import define_event
from rhapsody.telemetry.events import make_event

from radical.asyncflow import WorkflowEngine
from radical.asyncflow.logging import init_default_logger

logger = logging.getLogger(__name__)

# ── Custom event ──────────────────────────────────────────────────────────────

ExperimentResult = define_event(
"experiment.Result",
experiment_id=str,
dataset_size=int,
learning_rate=float,
loss=float,
accuracy=float,
elapsed_s=float,
)

# ── Experiment matrix: (dataset_size, learning_rate) ─────────────────────────

EXPERIMENTS = [
(100, 0.010),
(200, 0.005),
(150, 0.020),
(80, 0.050),
(250, 0.001),
(120, 0.010),
]

STAGES = ["load", "preprocess", "train", "evaluate"]


# ── Main ──────────────────────────────────────────────────────────────────────


async def main(out_dir: str = "telemetry-output") -> None:
Path(out_dir).mkdir(parents=True, exist_ok=True)

init_default_logger(logging.INFO)

backend = await ConcurrentExecutionBackend(ProcessPoolExecutor(max_workers=4))
flow = await WorkflowEngine.create(backend)
telemetry = await flow.start_telemetry(
resource_poll_interval=0.5,
checkpoint_path=out_dir,
)

# ── Task definitions ──────────────────────────────────────────────────────
# Task functions are plain async coroutines — no telemetry imports needed.

@flow.function_task
async def load_data(experiment_id: str, size: int) -> dict:
await asyncio.sleep(0.05 + size * 0.0008)
return {"experiment_id": experiment_id, "size": size, "data": list(range(size))}

@flow.function_task
async def preprocess(raw: dict) -> dict:
await asyncio.sleep(0.08 + raw["size"] * 0.0006)
return {**raw, "features": [x / raw["size"] for x in raw["data"]]}

@flow.function_task
async def train(processed: dict, lr: float) -> dict:
await asyncio.sleep(0.20 + processed["size"] * 0.0010)
loss = 0.5 / (1.0 + processed["size"] * lr * 10)
return {**processed, "loss": loss, "lr": lr}

@flow.function_task
async def evaluate(model: dict) -> dict:
await asyncio.sleep(0.04 + model["size"] * 0.0003)
accuracy = min(0.99, 0.75 + (1.0 - model["loss"]) * 0.24)
return {**model, "accuracy": accuracy}

# ── Per-experiment runner ─────────────────────────────────────────────────

async def run_experiment(i: int, size: int, lr: float) -> dict:
t0 = time.time()
exp_id = f"exp-{i}"

async with flow.workflow_scope(exp_id) as wid:
# asyncflow.workflow_id propagates to all four tasks automatically.
raw = load_data(exp_id, size)
prep = preprocess(raw)
model = train(prep, lr)
result = await evaluate(model)

elapsed = time.time() - t0

# Emit application-level result from the orchestration layer.
telemetry.emit(
make_event(
ExperimentResult,
session_id=telemetry.session_id,
backend="pipeline",
task_id=wid,
experiment_id=exp_id,
dataset_size=size,
learning_rate=lr,
loss=result["loss"],
accuracy=result["accuracy"],
elapsed_s=elapsed,
)
)
print(
f" {exp_id} size={size:3d} lr={lr:.3f} "
f"loss={result['loss']:.4f} acc={result['accuracy']:.4f} "
f"({elapsed * 1000:.0f} ms)"
)
return result

# ── Run all experiments concurrently ──────────────────────────────────────

print(f"Running {len(EXPERIMENTS)} experiments concurrently …\n")
t_start = time.time()
await asyncio.gather(
*(run_experiment(i, size, lr) for i, (size, lr) in enumerate(EXPERIMENTS))
)
total = time.time() - t_start

summary = telemetry.summary()
print(
f"\n {len(EXPERIMENTS)} experiments · "
f"{summary['tasks']['completed']} tasks completed in {total * 1000:.0f} ms"
)

await flow.shutdown()
await telemetry.stop()

jsonl = next(Path(out_dir).glob("*.jsonl"), None)
print(f"\nCheckpoint : {jsonl}")
print(f"Plot : python plot_workflow_gantt.py {out_dir}/")


if __name__ == "__main__":
ap = argparse.ArgumentParser(
description="Parallel ML training campaign with workflow_scope() telemetry."
)
ap.add_argument(
"--out",
default="telemetry-output",
metavar="DIR",
help="Output directory for JSONL checkpoint (default: telemetry-output/)",
)
args = ap.parse_args()
asyncio.run(main(out_dir=args.out))
Loading
Loading