Skip to content

fix(instrumented_exec): close spans when streams finish, not on plan#54

Open
antonmry wants to merge 7 commits intodatafusion-contrib:mainfrom
antonmry:fix/drop
Open

fix(instrumented_exec): close spans when streams finish, not on plan#54
antonmry wants to merge 7 commits intodatafusion-contrib:mainfrom
antonmry:fix/drop

Conversation

@antonmry
Copy link
Copy Markdown
Contributor

@antonmry antonmry commented May 6, 2026

Which issue does this PR close?

Rationale for this change

When datafusion-tracing is used with an OpenTelemetry subscriber backed by
SimpleSpanProcessor, dropping an Arc<dyn ExecutionPlan> could block the
calling thread for several seconds.

Each InstrumentedExec node held a strong Arc to its recorders and a clone
of its tracing Span in OnceLock fields. Those fields lived until the plan
itself was dropped. With SimpleSpanProcessor, every span close calls
futures_executor::block_on(exporter.export(...)) synchronously, so:

plan_drop_time ≈ N_nodes × OTLP_round_trip_latency

A query plan with 6 nodes and a 50 ms/span exporter blocked drop() for
~328 ms. At real-world OTLP latencies (200 ms to Jaeger under load) and larger
plans (40 nodes), this matches the 8-second drop reported in the issue.

What changes are included in this PR?

InstrumentedExec now holds Mutex<Option<Weak<*Recorder>>> instead of
OnceLock<Arc<*Recorder>>, and no longer stores a Span clone at all.

  • get_or_create_node_recorder() creates the span (inside NodeRecorder) on
    the first execute() call and stores only a Weak — the Arc is owned by
    the execution stream.
  • MetricsRecorder and PreviewRecorder follow the same pattern.
  • All span clones live in the stream pipeline. When the last stream finishes,
    the recorders drop, span fields are recorded, and the span closes.
  • Dropping the plan drops only the Weak pointers — no span I/O, no blocking.

A new span() accessor was added to NodeRecorder so execute() can get the
span from the recorder instead of a separate OnceLock.

Are these changes tested?

A self-contained reproducer is available at:
https://gist.github.com/antonmry/7f2355b83cd1cf2c37ea92bea7cd245b

It simulates a slow OTLP backend (50 ms/span) and measures drop() time for
both SimpleSpanProcessor and BatchSpanProcessor.

Before the fix (published datafusion-tracing = "53.0.0"):

=== Step 2: problem — SimpleSpanProcessor (synchronous export) ===
  drop() blocked for: 328ms  (exported 6 spans × 50ms)

=== Step 3: fix — BatchSpanProcessor (async export) ===
  drop() returned in:  456µs  ← instant, spans queued

After the fix (this PR):

=== Step 2: SimpleSpanProcessor ===
  drop() blocked for: 161µs  (exported 6 spans × 50ms)

=== Step 3: BatchSpanProcessor ===
  drop() returned in:  73µs  ← instant, spans queued

SimpleSpanProcessor drop time drops from 328 ms → 161 µs (~2000×).
Both processors are now equivalent from the caller's perspective: drop() is
instant in both cases because all span work completes during collect().

The existing unit-test suite passes without changes.

Are there any user-facing changes?

Spans now close when the last execution stream is consumed rather than when the
plan is dropped. This is a behavioral improvement: span end times more
accurately reflect when execution actually finished. There are no API changes.

…drop

- Replace `OnceLock<Arc<Recorder>>` with `Mutex<Option<Weak<Recorder>>>` so
  recorders (and their spans) are kept alive only by active streams.
- Have `NodeRecorder` own the span and expose it via `span()`.
- Add regression tests covering span lifetime vs. plan and stream drops.
@antonmry antonmry marked this pull request as draft May 6, 2026 15:14
@antonmry
Copy link
Copy Markdown
Contributor Author

antonmry commented May 6, 2026

@geoffreyclaude I was able to reproduce the issue, but I'm not so sure about the fix, in particular with the required changes in the snapshots. I would appreciate your feedback before I continue with it

@antonmry antonmry marked this pull request as ready for review May 6, 2026 15:52
@geoffreyclaude
Copy link
Copy Markdown
Collaborator

@geoffreyclaude I was able to reproduce the issue, but I'm not so sure about the fix, in particular with the required changes in the snapshots. I would appreciate your feedback before I continue with it

@antonmry I think you're on the right track, but I believe there’s still one tricky edge case here.

The current state is still tied to the plan node itself, and seems to assume each partition is executed once. But ExecutionPlan::execute(partition, context) can legally be called multiple times on the same plan (unfortunately!), including for the same partition, or for only some partitions. In those cases, I don’t think completed_partitions: Vec<bool> can accurately represent what has really finished.

I'm not sure what the proper solution here is though!

- Replace per-partition completion vector with an active-stream counter so
  partial or repeated partition executions close the span correctly.
- Release the recorder slot when `inner.execute` fails with no active streams.
- Add tests for concurrent, partial, and repeated partition execution.
@antonmry
Copy link
Copy Markdown
Contributor Author

antonmry commented May 7, 2026

@geoffreyclaude excellent point, thank you. I've committed d554065 addressing it. Would that work?

Numbers are still good:

Case SimpleSpanProcessor drop(plan_clone) BatchSpanProcessor drop(plan_clone) Exported spans
Before fix 330.791375ms 470.541µs 6
After fix 395.667µs 71.709µs 6

- Add a process-wide async mutex so snapshot test cases run one at a time.
- Clear the shared log buffer at the start of each case to avoid cross-test bleed.
Copy link
Copy Markdown
Collaborator

@geoffreyclaude geoffreyclaude left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for continuing to push this! The active-stream change fixes the weird cases of execute being called multiple times.

But I think there is still one minor issue left. Overlapping/concurrent executions will share the same recorders and so the same span, when we probably want one different span per execution call.

Maybe keying the recorders by TaskContext (to avoid mixing up two concurrent plan executions on different context) and by "has this partition been seen" (to avoid mixing up concurrent calls in the same context and on the same partition) would work?

To be clearer, for both these cases, we want two different spans:

different contexts

let s1 = plan.execute(0, ctx_a.clone())?;
let s2 = plan.execute(1, ctx_b.clone())?;

duplicate executions

let s1 = plan.execute(0, ctx.clone())?;
let s2 = plan.execute(0, ctx.clone())?;

Sorry for being so strict about this! Your initial fix of the long drops is great, but it uncovered a whole class of preexisting concurrency issues.

Comment thread datafusion-tracing/src/instrumented_exec.rs
Comment thread datafusion-tracing/src/instrumented_exec.rs
- Track recorder groups as a vec keyed by task context and reserved
  partitions so independent or duplicate executions get fresh spans.
- Reserve the stream before `inner.execute` and release on failure to
  avoid leaking active recorder groups.
- Add tests covering distinct task contexts, execute errors, and
  overlapping duplicate-partition streams.
@antonmry
Copy link
Copy Markdown
Contributor Author

antonmry commented May 7, 2026

Hey @geoffreyclaude, I appreciate the careful review. I’m still getting familiar with this part of the codebase, so I’m very happy to keep iterating until we have a solid fix. Please keep the comments coming.

I added 13a90c0 following your suggested direction. Let me know what you think

Sort RecursiveQueryExec close events in the recursive trace snapshots so
the assertion no longer depends on the non-deterministic order in which
child execution streams finish across runtimes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Excution plan takes a long time to drop for some reason

2 participants