diff --git a/datafusion-tracing/src/instrumented_exec.rs b/datafusion-tracing/src/instrumented_exec.rs index a0b2f97..a33e545 100644 --- a/datafusion-tracing/src/instrumented_exec.rs +++ b/datafusion-tracing/src/instrumented_exec.rs @@ -25,11 +25,11 @@ use crate::{ utils::is_internal_optimizer_check, }; use datafusion::{ - arrow::datatypes::SchemaRef, + arrow::{array::RecordBatch, datatypes::SchemaRef}, common::Statistics, config::ConfigOptions, error::Result, - execution::{SendableRecordBatchStream, TaskContext}, + execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}, physical_expr::{Distribution, OrderingRequirements, PhysicalSortExpr}, physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, @@ -46,11 +46,18 @@ use datafusion::{ }, }; use delegate::delegate; +use futures::Stream; +use pin_project::{pin_project, pinned_drop}; use std::{ any::Any, - collections::HashMap, + collections::{HashMap, HashSet}, fmt::{self, Debug}, - sync::{Arc, OnceLock}, + pin::Pin, + sync::{ + Arc, Mutex, Weak, + atomic::{AtomicUsize, Ordering}, + }, + task::{Context, Poll}, }; use tracing::{Span, field}; use tracing_futures::Instrument; @@ -63,22 +70,19 @@ pub struct InstrumentedExec { /// The inner execution plan to delegate execution to. inner: Arc, - /// Tracing span lazily initialized during execution, shared safely across concurrent partition executions. - span: OnceLock, - record_metrics: bool, - /// Metrics recorder lazily initialized during execution, shared safely across concurrent partition executions. - metrics_recorder: OnceLock>, - - /// Node recorder lazily initialized during execution, shared safely across concurrent partition executions. - node_recorder: OnceLock>, - preview_limit: usize, preview_fn: Option>, - /// Preview recorder lazily initialized during execution, shared safely across concurrent partition executions. - preview_recorder: OnceLock>, + /// Shared recorder groups for active executions of this plan node. + /// + /// Groups are kept alive only until their streams have finished, so dropping + /// an already-consumed plan does not close spans synchronously. Concurrent + /// executions that belong to the same task context and touch distinct + /// partitions share a group; independent or duplicate executions get a fresh + /// group. + recorders: Arc>>>, /// Function to create and initialize tracing spans. span_create_fn: Arc, @@ -101,13 +105,10 @@ impl InstrumentedExec { ) -> InstrumentedExec { Self { inner, - span: OnceLock::new(), record_metrics: options.record_metrics, - metrics_recorder: OnceLock::new(), - node_recorder: OnceLock::new(), preview_limit: options.preview_limit, preview_fn: options.preview_fn.clone(), - preview_recorder: OnceLock::new(), + recorders: Arc::new(Mutex::new(Vec::new())), span_create_fn, } } @@ -129,57 +130,85 @@ impl InstrumentedExec { )) } - /// Retrieves the tracing span, initializing it if necessary. - fn get_span(&self) -> Span { - self.span - .get_or_init(|| self.create_populated_span()) - .clone() + /// Returns and reserves the recorder group for this execution stream. + /// + /// The stream is reserved before calling `inner.execute` so concurrent + /// callers cannot observe an idle recorder group and clear or reuse it + /// incorrectly. + fn reserve_recorders( + &self, + context: Arc, + partition: usize, + ) -> Arc { + let mut groups = self.recorders.lock().unwrap(); + for recorders in groups.iter() { + if recorders.is_same_context(&context) + && recorders.try_reserve_partition(partition) + { + return recorders.clone(); + } + } + + let span = self.create_populated_span(); + let preview_recorder = (self.preview_limit > 0).then(|| { + let partition_count = self.inner.output_partitioning().partition_count(); + PreviewRecorder::builder(span.clone(), partition_count) + .limit(self.preview_limit) + .preview_fn(self.preview_fn.clone()) + .build() + }); + let recorders = Arc::new(ExecutionRecorders::new( + Arc::downgrade(&self.recorders), + context, + partition, + NodeRecorder::new(self.inner.clone(), span.clone()), + self.record_metrics + .then(|| MetricsRecorder::new(self.inner.clone(), span.clone())), + preview_recorder, + )); + groups.push(recorders.clone()); + recorders + } + + /// Wraps the given stream so recorder state for this partition is released + /// after the stream is dropped. + fn execution_recording_stream( + &self, + inner_stream: SendableRecordBatchStream, + recorders: Arc, + ) -> SendableRecordBatchStream { + Box::pin(ExecutionRecordingStream::new(inner_stream, recorders)) + } + + /// Wraps the given stream with a completion recorder so fields that are only + /// fully qualified after execution (such as `datafusion.node`) are recorded + /// once all partitions have finished executing. + fn node_recording_stream( + &self, + inner_stream: SendableRecordBatchStream, + recorder: Arc, + ) -> SendableRecordBatchStream { + Box::pin(NodeRecordingStream::new(inner_stream, recorder)) } - /// Wraps the given stream with metrics recording if metrics are available. - /// The input span is shared across all partitions, - /// and metrics will be aggregated across all partitions before being reported. + /// Wraps the given stream with metrics recording. + /// Metrics are aggregated across all partitions before being reported. fn metrics_recording_stream( &self, inner_stream: SendableRecordBatchStream, - span: &Span, + recorder: Arc, ) -> SendableRecordBatchStream { - if !self.record_metrics { - return inner_stream; - } - let recorder = self - .metrics_recorder - .get_or_init(|| { - Arc::new(MetricsRecorder::new(self.inner.clone(), span.clone())) - }) - .clone(); Box::pin(MetricsRecordingStream::new(inner_stream, recorder)) } /// Wraps the given stream with batch preview recording. - /// The input span is shared across all partitions, - /// and the preview limit will be applied globally on all partitions before the preview is reported. + /// The preview limit is applied globally across all partitions before the preview is reported. fn preview_recording_stream( &self, inner_stream: SendableRecordBatchStream, - span: &Span, + recorder: Arc, partition: usize, ) -> SendableRecordBatchStream { - if self.preview_limit == 0 { - return inner_stream; - } - let recorder = self - .preview_recorder - .get_or_init(|| { - let partition_count = self.inner.output_partitioning().partition_count(); - Arc::new( - PreviewRecorder::builder(span.clone(), partition_count) - .limit(self.preview_limit) - .preview_fn(self.preview_fn.clone()) - .build(), - ) - }) - .clone(); Box::pin(PreviewRecordingStream::new( inner_stream, recorder, @@ -187,21 +216,6 @@ impl InstrumentedExec { )) } - /// Wraps the given stream with a completion recorder so fields that are only - /// fully qualified after execution (such as `datafusion.node`) are recorded - /// once all partitions have finished executing. - fn node_recording_stream( - &self, - inner_stream: SendableRecordBatchStream, - span: &Span, - ) -> SendableRecordBatchStream { - let recorder = self - .node_recorder - .get_or_init(|| Arc::new(NodeRecorder::new(self.inner.clone(), span.clone()))) - .clone(); - Box::pin(NodeRecordingStream::new(inner_stream, recorder)) - } - /// Creates a tracing span populated with metadata about the execution plan. fn create_populated_span(&self) -> Span { let span = self.span_create_fn.as_ref()(); @@ -392,28 +406,156 @@ impl ExecutionPlan for InstrumentedExec { partition: usize, context: Arc, ) -> Result { - let span = self.get_span(); + let recorders = self.reserve_recorders(context.clone(), partition); + let span = recorders.span(); - let inner_stream = span.in_scope(|| self.inner.execute(partition, context))?; + let inner_stream = match span.in_scope(|| self.inner.execute(partition, context)) + { + Ok(stream) => stream, + Err(error) => { + recorders.cancel_stream(partition); + return Err(error); + } + }; // Wrap the stream with node recording so `datafusion.node` is recorded only after // completion, once it is fully qualified. - let node_stream = self.node_recording_stream(inner_stream, &span); + let node_stream = + self.node_recording_stream(inner_stream, recorders.node_recorder.clone()); // Wrap the stream with metrics recording capability (only if inner metrics are available). - let metrics_stream = self.metrics_recording_stream(node_stream, &span); + let metrics_stream = if let Some(metrics_recorder) = &recorders.metrics_recorder { + self.metrics_recording_stream(node_stream, metrics_recorder.clone()) + } else { + node_stream + }; // Wrap the stream with batch preview recording (only if preview limit is set). - let preview_stream = - self.preview_recording_stream(metrics_stream, &span, partition); + let preview_stream = if let Some(preview_recorder) = &recorders.preview_recorder { + self.preview_recording_stream( + metrics_stream, + preview_recorder.clone(), + partition, + ) + } else { + metrics_stream + }; + let recording_stream = self.execution_recording_stream(preview_stream, recorders); Ok(Box::pin(RecordBatchStreamAdapter::new( self.inner.schema(), - preview_stream.instrument(span), + recording_stream.instrument(span), ))) } } +struct ExecutionRecorders { + slot: Weak>>>, + context: Arc, + seen_partitions: Mutex>, + active_streams: AtomicUsize, + node_recorder: Arc, + metrics_recorder: Option>, + preview_recorder: Option>, +} + +impl ExecutionRecorders { + fn new( + slot: Weak>>>, + context: Arc, + partition: usize, + node_recorder: NodeRecorder, + metrics_recorder: Option, + preview_recorder: Option, + ) -> Self { + Self { + slot, + context, + seen_partitions: Mutex::new(HashSet::from([partition])), + active_streams: AtomicUsize::new(1), + node_recorder: Arc::new(node_recorder), + metrics_recorder: metrics_recorder.map(Arc::new), + preview_recorder: preview_recorder.map(Arc::new), + } + } + + fn span(&self) -> Span { + self.node_recorder.span() + } + + fn is_same_context(&self, context: &Arc) -> bool { + Arc::ptr_eq(&self.context, context) + } + + fn try_reserve_partition(&self, partition: usize) -> bool { + let mut seen_partitions = self.seen_partitions.lock().unwrap(); + if seen_partitions.contains(&partition) { + return false; + } + + seen_partitions.insert(partition); + self.active_streams.fetch_add(1, Ordering::AcqRel); + true + } + + fn finish_stream(self: &Arc) { + self.release_stream(None); + } + + fn cancel_stream(self: &Arc, partition: usize) { + self.release_stream(Some(partition)); + } + + fn release_stream(self: &Arc, canceled_partition: Option) { + if let Some(slot) = self.slot.upgrade() { + let mut groups = slot.lock().unwrap(); + + if let Some(partition) = canceled_partition { + self.seen_partitions.lock().unwrap().remove(&partition); + } + + if self.active_streams.fetch_sub(1, Ordering::AcqRel) == 1 { + groups.retain(|recorders| !Arc::ptr_eq(recorders, self)); + } + } + } +} + +#[pin_project(PinnedDrop)] +struct ExecutionRecordingStream { + #[pin] + inner: SendableRecordBatchStream, + recorders: Arc, +} + +impl ExecutionRecordingStream { + fn new(inner: SendableRecordBatchStream, recorders: Arc) -> Self { + Self { inner, recorders } + } +} + +impl Stream for ExecutionRecordingStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(cx) + } +} + +#[pinned_drop] +impl PinnedDrop for ExecutionRecordingStream { + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + this.recorders.finish_stream(); + } +} + +impl RecordBatchStream for ExecutionRecordingStream { + fn schema(&self) -> SchemaRef { + self.inner.schema() + } +} + impl DisplayAs for InstrumentedExec { delegate! { to self.inner { @@ -421,3 +563,524 @@ impl DisplayAs for InstrumentedExec { } } } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::common::DataFusionError; + use datafusion::execution::SessionStateBuilder; + use datafusion::prelude::SessionContext; + use futures::StreamExt; + use std::sync::atomic::AtomicBool; + use tracing::field::{Field, Visit}; + use tracing::{Id, Subscriber}; + use tracing_subscriber::Layer; + use tracing_subscriber::layer::Context; + use tracing_subscriber::prelude::*; + use tracing_subscriber::registry::LookupSpan; + + // ----------------------------------------------------------------------- + // Minimal span-event capture layer + // ----------------------------------------------------------------------- + + struct CapturedName(String); + + #[derive(Clone, Default)] + struct SpanCapture(Arc>>); + + #[derive(Clone)] + struct SpanEvent { + kind: &'static str, + name: String, + } + + struct NameVisitor(Option); + impl Visit for NameVisitor { + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "otel.name" { + self.0 = Some(value.to_owned()); + } + } + fn record_debug(&mut self, _: &Field, _: &dyn Debug) {} + } + + impl SpanCapture { + fn opened(&self, name: &str) -> usize { + self.0 + .lock() + .unwrap() + .iter() + .filter(|e| e.kind == "open" && e.name == name) + .count() + } + + fn closed(&self, name: &str) -> usize { + self.0 + .lock() + .unwrap() + .iter() + .filter(|e| e.kind == "close" && e.name == name) + .count() + } + } + + impl LookupSpan<'s>> Layer for SpanCapture { + fn on_new_span( + &self, + attrs: &tracing::span::Attributes<'_>, + id: &Id, + ctx: Context<'_, S>, + ) { + let mut v = NameVisitor(None); + attrs.record(&mut v); + let name = v.0.unwrap_or_else(|| attrs.metadata().name().to_owned()); + if let Some(span) = ctx.span(id) { + span.extensions_mut().insert(CapturedName(name.clone())); + } + self.0 + .lock() + .unwrap() + .push(SpanEvent { kind: "open", name }); + } + + fn on_close(&self, id: Id, ctx: Context<'_, S>) { + let name = ctx + .span(&id) + .and_then(|s| s.extensions().get::().map(|n| n.0.clone())) + .unwrap_or_default(); + self.0.lock().unwrap().push(SpanEvent { + kind: "close", + name, + }); + } + } + + // ----------------------------------------------------------------------- + // Context helper + // ----------------------------------------------------------------------- + + async fn make_ctx() -> SessionContext { + let rule = crate::instrument_with_info_spans!(options: InstrumentationOptions::default()); + let state = SessionStateBuilder::new() + .with_default_features() + .with_physical_optimizer_rule(rule) + .build(); + SessionContext::new_with_state(state) + } + + // ----------------------------------------------------------------------- + // Tests + // ----------------------------------------------------------------------- + + /// Regression for issue #27: spans must close when the last execution stream + /// is consumed, not when the plan is dropped. + /// + /// Before the fix, `InstrumentedExec` held `OnceLock>` and a + /// `Span` clone, keeping spans alive until `drop(plan)`. With + /// `SimpleSpanProcessor` that caused `drop()` to block for + /// `N_nodes × OTLP_latency` seconds. + #[tokio::test] + async fn span_closes_when_stream_finishes_not_when_plan_drops() { + let capture = SpanCapture::default(); + let _guard = tracing::subscriber::set_default( + tracing_subscriber::registry() + .with(tracing::level_filters::LevelFilter::INFO) + .with(capture.clone()), + ); + + let ctx = make_ctx().await; + let plan = ctx + .sql("SELECT 1") + .await + .unwrap() + .create_physical_plan() + .await + .unwrap(); + let plan_clone = plan.clone(); // keep plan alive after streams are consumed + + let task_ctx = ctx.task_ctx(); + for part in 0..plan.properties().partitioning.partition_count() { + let mut stream = plan.execute(part, task_ctx.clone()).unwrap(); + while let Some(b) = stream.next().await { + b.unwrap(); + } + } + drop(plan); + + // Spans are already closed — they were closed when the streams finished. + let closed_after_collect = capture.closed("InstrumentedExec"); + assert!( + closed_after_collect > 0, + "InstrumentedExec spans must close when streams finish" + ); + + // Dropping the extra plan clone must not trigger any new span closures. + drop(plan_clone); + assert_eq!( + capture.closed("InstrumentedExec"), + closed_after_collect, + "dropping the plan must not close additional spans (regression: issue #27)" + ); + } + + /// Spans must remain open while execution streams are alive, even after the + /// plan itself is dropped. Span lifetime tracks stream lifetime. + #[tokio::test] + async fn span_stays_open_while_stream_alive() { + let capture = SpanCapture::default(); + let _guard = tracing::subscriber::set_default( + tracing_subscriber::registry() + .with(tracing::level_filters::LevelFilter::INFO) + .with(capture.clone()), + ); + + let ctx = make_ctx().await; + let plan = ctx + .sql("SELECT 1") + .await + .unwrap() + .create_physical_plan() + .await + .unwrap(); + + let task_ctx = ctx.task_ctx(); + // Collect all streams before dropping the plan, so the streams (and their + // Arc) are alive while the plan Weak is dropped. + let streams: Vec<_> = (0..plan.properties().partitioning.partition_count()) + .map(|p| plan.execute(p, task_ctx.clone()).unwrap()) + .collect(); + + // Drop the plan — only the Weak is released, not the span. + drop(plan); + assert_eq!( + capture.closed("InstrumentedExec"), + 0, + "spans must not close when the plan drops while streams are still alive" + ); + + // Consuming and dropping the streams releases the Arc. + for mut stream in streams { + while let Some(b) = stream.next().await { + b.unwrap(); + } + } + assert!( + capture.closed("InstrumentedExec") > 0, + "InstrumentedExec spans must close once all streams are consumed" + ); + } + + fn two_partition_inner() -> Arc { + use datafusion::arrow::array::Int64Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::record_batch::RecordBatch; + use datafusion::physical_plan::test::TestMemoryExec; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch_1 = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from(vec![1]))], + ) + .unwrap(); + let batch_2 = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from(vec![2]))], + ) + .unwrap(); + TestMemoryExec::try_new_exec(&[vec![batch_1], vec![batch_2]], schema, None) + .unwrap() + } + + fn two_partition_plan() -> InstrumentedExec { + InstrumentedExec::new( + two_partition_inner(), + Arc::new(|| tracing::info_span!("InstrumentedExec")), + &InstrumentationOptions::default(), + ) + } + + struct FailFirstExecute { + inner: Arc, + fail_next: AtomicBool, + } + + impl FailFirstExecute { + fn new(inner: Arc) -> Self { + Self { + inner, + fail_next: AtomicBool::new(true), + } + } + } + + impl Debug for FailFirstExecute { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FailFirstExecute") + .field("inner", &self.inner) + .finish() + } + } + + impl DisplayAs for FailFirstExecute { + fn fmt_as( + &self, + format: DisplayFormatType, + f: &mut fmt::Formatter<'_>, + ) -> fmt::Result { + self.inner.fmt_as(format, f) + } + } + + impl ExecutionPlan for FailFirstExecute { + fn name(&self) -> &str { + self.inner.name() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc { + self.inner.properties() + } + + fn children(&self) -> Vec<&Arc> { + self.inner.children() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + let inner = self.inner.clone().with_new_children(children)?; + Ok(Arc::new(FailFirstExecute::new(inner))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + if self.fail_next.swap(false, Ordering::AcqRel) { + return Err(DataFusionError::Internal( + "intentional execute failure".into(), + )); + } + self.inner.execute(partition, context) + } + } + + /// Concurrent partition streams from one execution share a recorder group, + /// preserving aggregation while those streams are active. + #[tokio::test] + async fn concurrent_partitions_share_one_span() { + let capture = SpanCapture::default(); + let _guard = tracing::subscriber::set_default( + tracing_subscriber::registry() + .with(tracing::level_filters::LevelFilter::INFO) + .with(capture.clone()), + ); + + let plan = two_partition_plan(); + let task_ctx = Arc::new(TaskContext::default()); + let streams: Vec<_> = (0..2) + .map(|part| plan.execute(part, task_ctx.clone()).unwrap()) + .collect(); + + for mut stream in streams { + while let Some(batch) = stream.next().await { + batch.unwrap(); + } + } + + assert_eq!( + capture.opened("InstrumentedExec"), + 1, + "concurrent partition streams should share one span" + ); + assert_eq!( + capture.closed("InstrumentedExec"), + 1, + "shared span should close when the active stream group finishes" + ); + } + + /// Concurrent executions on different task contexts represent independent + /// executions and must not share a span. + #[tokio::test] + async fn different_task_contexts_get_fresh_spans() { + let capture = SpanCapture::default(); + let _guard = tracing::subscriber::set_default( + tracing_subscriber::registry() + .with(tracing::level_filters::LevelFilter::INFO) + .with(capture.clone()), + ); + + let plan = two_partition_plan(); + let task_ctx_1 = Arc::new(TaskContext::default()); + let task_ctx_2 = Arc::new(TaskContext::default()); + let streams = vec![ + plan.execute(0, task_ctx_1).unwrap(), + plan.execute(1, task_ctx_2).unwrap(), + ]; + + for mut stream in streams { + while let Some(batch) = stream.next().await { + batch.unwrap(); + } + } + + assert_eq!( + capture.opened("InstrumentedExec"), + 2, + "different task contexts should create independent spans" + ); + assert_eq!( + capture.closed("InstrumentedExec"), + 2, + "each independent context execution should close its own span" + ); + } + + /// `ExecutionPlan::execute` may legally be called for only a subset of + /// partitions. Recorder lifetime must not wait for partitions that were + /// never executed. + #[tokio::test] + async fn partial_partition_execution_closes_span() { + let capture = SpanCapture::default(); + let _guard = tracing::subscriber::set_default( + tracing_subscriber::registry() + .with(tracing::level_filters::LevelFilter::INFO) + .with(capture.clone()), + ); + + let plan = two_partition_plan(); + let task_ctx = Arc::new(TaskContext::default()); + let mut stream = plan.execute(0, task_ctx).unwrap(); + while let Some(batch) = stream.next().await { + batch.unwrap(); + } + drop(stream); + + assert_eq!( + capture.opened("InstrumentedExec"), + 1, + "same plan node should create one span across sequential partitions" + ); + assert_eq!( + capture.closed("InstrumentedExec"), + 1, + "span should close after the only executed partition stream finishes" + ); + } + + /// Recorder acquisition reserves a stream before calling the inner plan. + /// If the inner plan fails to execute, the reservation must be released so + /// the failed group is not left active forever. + #[tokio::test] + async fn execute_error_releases_reserved_recorder_group() { + let capture = SpanCapture::default(); + let _guard = tracing::subscriber::set_default( + tracing_subscriber::registry() + .with(tracing::level_filters::LevelFilter::INFO) + .with(capture.clone()), + ); + + let plan = InstrumentedExec::new( + Arc::new(FailFirstExecute::new(two_partition_inner())), + Arc::new(|| tracing::info_span!("InstrumentedExec")), + &InstrumentationOptions::default(), + ); + let task_ctx = Arc::new(TaskContext::default()); + + assert!(plan.execute(0, task_ctx.clone()).is_err()); + + let mut stream = plan.execute(0, task_ctx).unwrap(); + while let Some(batch) = stream.next().await { + batch.unwrap(); + } + drop(stream); + + assert_eq!( + capture.opened("InstrumentedExec"), + 2, + "failed execute should close its span and retry should create a fresh one" + ); + assert_eq!( + capture.closed("InstrumentedExec"), + 2, + "failed execute must not leave an active recorder group behind" + ); + } + + /// Repeated execution of the same partition is legal. Each independent + /// stream group should close before the next one starts instead of reusing + /// stale completion state. + #[tokio::test] + async fn repeated_same_partition_execution_gets_fresh_span() { + let capture = SpanCapture::default(); + let _guard = tracing::subscriber::set_default( + tracing_subscriber::registry() + .with(tracing::level_filters::LevelFilter::INFO) + .with(capture.clone()), + ); + + let plan = two_partition_plan(); + let task_ctx = Arc::new(TaskContext::default()); + for _ in 0..2 { + let mut stream = plan.execute(0, task_ctx.clone()).unwrap(); + while let Some(batch) = stream.next().await { + batch.unwrap(); + } + } + + assert_eq!( + capture.opened("InstrumentedExec"), + 2, + "separate stream groups should create separate spans" + ); + assert_eq!( + capture.closed("InstrumentedExec"), + 2, + "each repeated execution should close its own span" + ); + } + + /// Overlapping execution of the same partition is legal. Each active + /// duplicate stream should get a separate span so their recorder state does + /// not mix. + #[tokio::test] + async fn overlapping_same_partition_execution_gets_fresh_spans() { + let capture = SpanCapture::default(); + let _guard = tracing::subscriber::set_default( + tracing_subscriber::registry() + .with(tracing::level_filters::LevelFilter::INFO) + .with(capture.clone()), + ); + + let plan = two_partition_plan(); + let task_ctx = Arc::new(TaskContext::default()); + let mut stream_1 = plan.execute(0, task_ctx.clone()).unwrap(); + let mut stream_2 = plan.execute(0, task_ctx).unwrap(); + + while let Some(batch) = stream_1.next().await { + batch.unwrap(); + } + while let Some(batch) = stream_2.next().await { + batch.unwrap(); + } + drop(stream_1); + drop(stream_2); + + assert_eq!( + capture.opened("InstrumentedExec"), + 2, + "overlapping duplicate partition streams should create separate spans" + ); + assert_eq!( + capture.closed("InstrumentedExec"), + 2, + "each overlapping duplicate stream should close its own span" + ); + } +} diff --git a/datafusion-tracing/src/node.rs b/datafusion-tracing/src/node.rs index f454f43..b8b81c7 100644 --- a/datafusion-tracing/src/node.rs +++ b/datafusion-tracing/src/node.rs @@ -43,6 +43,10 @@ impl NodeRecorder { span, } } + + pub(crate) fn span(&self) -> Span { + self.span.clone() + } } impl Drop for NodeRecorder { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index ac13108..550d4a6 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -39,6 +39,11 @@ static LOG_BUFFER: OnceLock>>> = OnceLock::new(); // A global "once" to ensure the subscriber is initialized only once. static SUBSCRIBER_INIT: Once = Once::new(); +// Snapshot assertions read from a process-global tracing subscriber. Run these +// test cases one at a time so close-event ordering is not affected by other +// snapshot tests running concurrently. +static TEST_EXECUTION_LOCK: OnceLock> = OnceLock::new(); + /// A struct describing how a particular query test should be run. #[derive(Debug, Default)] struct QueryTestCase<'a> { @@ -197,8 +202,14 @@ async fn test_topk_lineitem() -> Result<()> { /// Executes the provided [`QueryTestCase`], setting up tracing and verifying /// log output according to its parameters. async fn execute_test_case(test_name: &str, test_case: &QueryTestCase<'_>) -> Result<()> { + let _test_guard = TEST_EXECUTION_LOCK + .get_or_init(Default::default) + .lock() + .await; + // Initialize tracing infrastructure and collect a log buffer. let log_buffer = init_tracing(); + log_buffer.lock().unwrap().clear(); // Initialize the DataFusion session with the requested options. // plan_diff disabled for tests to avoid non-deterministic output @@ -245,15 +256,102 @@ async fn execute_test_case(test_name: &str, test_case: &QueryTestCase<'_>) -> Re // General assertion on the full trace. if !test_case.ignore_full_trace { + let full_trace_lines = normalize_full_trace_lines(test_name, &json_lines); + // Redact `datafusion.preview` values as they are often non-deterministic. preview_redacted_settings().bind(|| { - assert_json_snapshot!(format!("{test_name}_trace"), json_lines); + assert_json_snapshot!(format!("{test_name}_trace"), full_trace_lines); }); } Ok(()) } +fn normalize_full_trace_lines(test_name: &str, json_lines: &[Value]) -> Vec { + if !test_name.contains("recursive") { + return json_lines.to_vec(); + } + + // Recursive queries may close child execution streams in a different order + // across runtimes. Keep the trace contents intact, but make the snapshot + // independent of that scheduling detail. + let mut normalized_lines = Vec::new(); + let mut recursive_exec_lines = Vec::new(); + let mut recursive_exec_insert_at = None; + + for json_line in json_lines { + if is_recursive_exec_close(json_line) { + recursive_exec_insert_at.get_or_insert(normalized_lines.len()); + recursive_exec_lines.push(json_line.clone()); + } else { + normalized_lines.push(json_line.clone()); + } + } + + recursive_exec_lines.sort_by_key(recursive_exec_sort_key); + + if let Some(insert_at) = recursive_exec_insert_at { + normalized_lines.splice(insert_at..insert_at, recursive_exec_lines); + } + + normalized_lines +} + +fn is_recursive_exec_close(json_line: &Value) -> bool { + extract_json_field_value(json_line, "name").as_deref() == Some("InstrumentedExec") + && json_line["spans"].as_array().is_some_and(|spans| { + spans.iter().any(|span| { + span.get("otel.name").and_then(Value::as_str) + == Some("RecursiveQueryExec") + }) + }) +} + +fn recursive_exec_sort_key(json_line: &Value) -> String { + let span_path = json_line["spans"] + .as_array() + .into_iter() + .flatten() + .map(|span| { + [ + value_to_sort_string(span.get("name")), + value_to_sort_string(span.get("otel.name")), + value_to_sort_string(span.get("datafusion.node")), + value_to_sort_string(span.get("datafusion.partitioning")), + ] + .join(":") + }) + .collect::>() + .join("/"); + + [ + span_path, + span_field_sort_value(json_line, "name"), + span_field_sort_value(json_line, "otel.name"), + span_field_sort_value(json_line, "datafusion.node"), + span_field_sort_value(json_line, "datafusion.partitioning"), + span_field_sort_value(json_line, "datafusion.metrics.output_rows"), + span_field_sort_value(json_line, "datafusion.metrics.output_batches"), + span_field_sort_value(json_line, "datafusion.metrics.output_bytes"), + ] + .join("|") +} + +fn span_field_sort_value(json_line: &Value, field_name: &str) -> String { + value_to_sort_string(json_line["span"].get(field_name)) +} + +fn value_to_sort_string(value: Option<&Value>) -> String { + value + .map(|value| { + value + .as_str() + .map(ToString::to_string) + .unwrap_or_else(|| value.to_string()) + }) + .unwrap_or_default() +} + /// Extracts a field's value from the `"span"` object in a JSON line, returning it as an `Option`. fn extract_json_field_value(json_line: &Value, field_name: &str) -> Option { let span_fields = &json_line["span"]; diff --git a/tests/snapshots/06_object_store_all_options_trace.snap b/tests/snapshots/06_object_store_all_options_trace.snap index 85710d6..27b92c4 100644 --- a/tests/snapshots/06_object_store_all_options_trace.snap +++ b/tests/snapshots/06_object_store_all_options_trace.snap @@ -1,6 +1,5 @@ --- source: tests/integration_tests.rs -assertion_line: 250 expression: json_lines --- [ @@ -2210,6 +2209,15 @@ expression: json_lines { "name": "run_traced_query", "query_name": "order_nations" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Final", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "SortExec", + "region": "us-west" } ], "target": "integration_utils", diff --git a/tests/snapshots/08_recursive_trace.snap b/tests/snapshots/08_recursive_trace.snap index 274dc47..769798f 100644 --- a/tests/snapshots/08_recursive_trace.snap +++ b/tests/snapshots/08_recursive_trace.snap @@ -1,6 +1,6 @@ --- source: tests/integration_tests.rs -expression: json_lines +expression: full_trace_lines --- [ { @@ -2118,6 +2118,42 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "ProjectionExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "FilterExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "RepartitionExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2130,11 +2166,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - "datafusion.partitioning": "RoundRobinBatch(8)", + "datafusion.node": "WorkTableExec: name=numbers", + "datafusion.partitioning": "UnknownPartitioning(1)", "env": "production", "name": "InstrumentedExec", - "otel.name": "RepartitionExec", + "otel.name": "WorkTableExec", "region": "us-west" }, "spans": [ @@ -2154,6 +2190,42 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "ProjectionExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "FilterExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "RepartitionExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2166,11 +2238,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "FilterExec: n@0 < 3", - "datafusion.partitioning": "RoundRobinBatch(8)", + "datafusion.node": "WorkTableExec: name=numbers", + "datafusion.partitioning": "UnknownPartitioning(1)", "env": "production", "name": "InstrumentedExec", - "otel.name": "FilterExec", + "otel.name": "WorkTableExec", "region": "us-west" }, "spans": [ @@ -2190,6 +2262,42 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "ProjectionExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "FilterExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "RepartitionExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2202,11 +2310,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "ProjectionExec: expr=[n@0 + 1 as numbers.n + Int64(1)]", + "datafusion.node": "FilterExec: n@0 < 3", "datafusion.partitioning": "RoundRobinBatch(8)", "env": "production", "name": "InstrumentedExec", - "otel.name": "ProjectionExec", + "otel.name": "FilterExec", "region": "us-west" }, "spans": [ @@ -2226,6 +2334,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2238,11 +2355,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "CoalescePartitionsExec", - "datafusion.partitioning": "UnknownPartitioning(1)", + "datafusion.node": "FilterExec: n@0 < 3", + "datafusion.partitioning": "RoundRobinBatch(8)", "env": "production", "name": "InstrumentedExec", - "otel.name": "CoalescePartitionsExec", + "otel.name": "FilterExec", "region": "us-west" }, "spans": [ @@ -2262,6 +2379,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2274,11 +2400,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "WorkTableExec: name=numbers", - "datafusion.partitioning": "UnknownPartitioning(1)", + "datafusion.node": "FilterExec: n@0 < 3", + "datafusion.partitioning": "RoundRobinBatch(8)", "env": "production", "name": "InstrumentedExec", - "otel.name": "WorkTableExec", + "otel.name": "FilterExec", "region": "us-west" }, "spans": [ @@ -2298,6 +2424,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2310,11 +2445,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + "datafusion.node": "ProjectionExec: expr=[n@0 + 1 as numbers.n + Int64(1)]", "datafusion.partitioning": "RoundRobinBatch(8)", "env": "production", "name": "InstrumentedExec", - "otel.name": "RepartitionExec", + "otel.name": "ProjectionExec", "region": "us-west" }, "spans": [ @@ -2334,6 +2469,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2346,11 +2490,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "FilterExec: n@0 < 3", + "datafusion.node": "ProjectionExec: expr=[n@0 + 1 as numbers.n + Int64(1)]", "datafusion.partitioning": "RoundRobinBatch(8)", "env": "production", "name": "InstrumentedExec", - "otel.name": "FilterExec", + "otel.name": "ProjectionExec", "region": "us-west" }, "spans": [ @@ -2370,6 +2514,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2406,6 +2559,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2418,11 +2580,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "CoalescePartitionsExec", - "datafusion.partitioning": "UnknownPartitioning(1)", + "datafusion.node": "RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + "datafusion.partitioning": "RoundRobinBatch(8)", "env": "production", "name": "InstrumentedExec", - "otel.name": "CoalescePartitionsExec", + "otel.name": "RepartitionExec", "region": "us-west" }, "spans": [ @@ -2442,6 +2604,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2454,11 +2625,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "WorkTableExec: name=numbers", - "datafusion.partitioning": "UnknownPartitioning(1)", + "datafusion.node": "RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + "datafusion.partitioning": "RoundRobinBatch(8)", "env": "production", "name": "InstrumentedExec", - "otel.name": "WorkTableExec", + "otel.name": "RepartitionExec", "region": "us-west" }, "spans": [ @@ -2478,6 +2649,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2514,6 +2694,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2526,11 +2715,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "FilterExec: n@0 < 3", - "datafusion.partitioning": "RoundRobinBatch(8)", + "datafusion.node": "CoalescePartitionsExec", + "datafusion.partitioning": "UnknownPartitioning(1)", "env": "production", "name": "InstrumentedExec", - "otel.name": "FilterExec", + "otel.name": "CoalescePartitionsExec", "region": "us-west" }, "spans": [ @@ -2562,11 +2751,11 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "ProjectionExec: expr=[n@0 + 1 as numbers.n + Int64(1)]", - "datafusion.partitioning": "RoundRobinBatch(8)", + "datafusion.node": "CoalescePartitionsExec", + "datafusion.partitioning": "UnknownPartitioning(1)", "env": "production", "name": "InstrumentedExec", - "otel.name": "ProjectionExec", + "otel.name": "CoalescePartitionsExec", "region": "us-west" }, "spans": [ @@ -2649,6 +2838,15 @@ expression: json_lines { "name": "run_traced_query", "query_name": "recursive" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "RecursiveQueryExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2676,6 +2874,15 @@ expression: json_lines { "name": "run_traced_query", "query_name": "recursive" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "RecursiveQueryExec", + "region": "us-west" } ], "target": "integration_utils", diff --git a/tests/snapshots/09_recursive_all_options_00_PlaceholderRowExec.snap b/tests/snapshots/09_recursive_all_options_00_PlaceholderRowExec.snap new file mode 100644 index 0000000..f450ea6 --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_00_PlaceholderRowExec.snap @@ -0,0 +1,7 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- +++ +|| +++ diff --git a/tests/snapshots/09_recursive_all_options_01_ProjectionExec.snap b/tests/snapshots/09_recursive_all_options_01_ProjectionExec.snap new file mode 100644 index 0000000..82b032d --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_01_ProjectionExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++---+ +| n | +|===| +| 1 | ++---+ diff --git a/tests/snapshots/09_recursive_all_options_02_WorkTableExec.snap b/tests/snapshots/09_recursive_all_options_02_WorkTableExec.snap new file mode 100644 index 0000000..82b032d --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_02_WorkTableExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++---+ +| n | +|===| +| 1 | ++---+ diff --git a/tests/snapshots/09_recursive_all_options_03_RepartitionExec.snap b/tests/snapshots/09_recursive_all_options_03_RepartitionExec.snap new file mode 100644 index 0000000..82b032d --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_03_RepartitionExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++---+ +| n | +|===| +| 1 | ++---+ diff --git a/tests/snapshots/09_recursive_all_options_04_FilterExec.snap b/tests/snapshots/09_recursive_all_options_04_FilterExec.snap new file mode 100644 index 0000000..82b032d --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_04_FilterExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++---+ +| n | +|===| +| 1 | ++---+ diff --git a/tests/snapshots/09_recursive_all_options_05_ProjectionExec.snap b/tests/snapshots/09_recursive_all_options_05_ProjectionExec.snap new file mode 100644 index 0000000..c3cfc8b --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_05_ProjectionExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++----------------------+ +| numbers.n + Int64(1) | +|======================| +| 2 | ++----------------------+ diff --git a/tests/snapshots/09_recursive_all_options_06_CoalescePartitionsExec.snap b/tests/snapshots/09_recursive_all_options_06_CoalescePartitionsExec.snap new file mode 100644 index 0000000..c3cfc8b --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_06_CoalescePartitionsExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++----------------------+ +| numbers.n + Int64(1) | +|======================| +| 2 | ++----------------------+ diff --git a/tests/snapshots/09_recursive_all_options_07_WorkTableExec.snap b/tests/snapshots/09_recursive_all_options_07_WorkTableExec.snap new file mode 100644 index 0000000..c3cfc8b --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_07_WorkTableExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++----------------------+ +| numbers.n + Int64(1) | +|======================| +| 2 | ++----------------------+ diff --git a/tests/snapshots/09_recursive_all_options_08_RepartitionExec.snap b/tests/snapshots/09_recursive_all_options_08_RepartitionExec.snap new file mode 100644 index 0000000..6b6948e --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_08_RepartitionExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++---+ +| n | +|===| +| 2 | ++---+ diff --git a/tests/snapshots/09_recursive_all_options_09_FilterExec.snap b/tests/snapshots/09_recursive_all_options_09_FilterExec.snap new file mode 100644 index 0000000..6b6948e --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_09_FilterExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++---+ +| n | +|===| +| 2 | ++---+ diff --git a/tests/snapshots/09_recursive_all_options_10_ProjectionExec.snap b/tests/snapshots/09_recursive_all_options_10_ProjectionExec.snap new file mode 100644 index 0000000..7f8e94d --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_10_ProjectionExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++----------------------+ +| numbers.n + Int64(1) | +|======================| +| 3 | ++----------------------+ diff --git a/tests/snapshots/09_recursive_all_options_11_CoalescePartitionsExec.snap b/tests/snapshots/09_recursive_all_options_11_CoalescePartitionsExec.snap new file mode 100644 index 0000000..7f8e94d --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_11_CoalescePartitionsExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++----------------------+ +| numbers.n + Int64(1) | +|======================| +| 3 | ++----------------------+ diff --git a/tests/snapshots/09_recursive_all_options_12_WorkTableExec.snap b/tests/snapshots/09_recursive_all_options_12_WorkTableExec.snap new file mode 100644 index 0000000..7f8e94d --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_12_WorkTableExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++----------------------+ +| numbers.n + Int64(1) | +|======================| +| 3 | ++----------------------+ diff --git a/tests/snapshots/09_recursive_all_options_13_RepartitionExec.snap b/tests/snapshots/09_recursive_all_options_13_RepartitionExec.snap new file mode 100644 index 0000000..5bf8538 --- /dev/null +++ b/tests/snapshots/09_recursive_all_options_13_RepartitionExec.snap @@ -0,0 +1,9 @@ +--- +source: tests/integration_tests.rs +expression: preview +--- ++---+ +| n | +|===| +| 3 | ++---+ diff --git a/tests/snapshots/09_recursive_all_options_trace.snap b/tests/snapshots/09_recursive_all_options_trace.snap index 7de4202..b19a7bf 100644 --- a/tests/snapshots/09_recursive_all_options_trace.snap +++ b/tests/snapshots/09_recursive_all_options_trace.snap @@ -1,7 +1,6 @@ --- source: tests/integration_tests.rs -assertion_line: 250 -expression: json_lines +expression: full_trace_lines --- [ { @@ -2120,6 +2119,42 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "ProjectionExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "FilterExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "RepartitionExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2132,24 +2167,12 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.metrics.elapsed_compute": "0.00ms", - "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.metrics.fetch_time": "0.00ms", - "datafusion.metrics.output_batches": "1", - "datafusion.metrics.output_bytes": "64.0 KB", - "datafusion.metrics.output_rows": "1", - "datafusion.metrics.repartition_time": "0.00ms", - "datafusion.metrics.send_time": "0.00ms", - "datafusion.metrics.spill_count": "0", - "datafusion.metrics.spilled_bytes": "0.0 B", - "datafusion.metrics.spilled_rows": "0", - "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.node": "RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - "datafusion.partitioning": "RoundRobinBatch(8)", + "datafusion.node": "WorkTableExec: name=numbers", + "datafusion.partitioning": "UnknownPartitioning(1)", "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", - "otel.name": "RepartitionExec", + "otel.name": "WorkTableExec", "region": "us-west" }, "spans": [ @@ -2169,6 +2192,42 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "ProjectionExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "FilterExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "RepartitionExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2181,18 +2240,12 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.metrics.elapsed_compute": "0.00ms", - "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.metrics.output_batches": "1", - "datafusion.metrics.output_bytes": "64.0 KB", - "datafusion.metrics.output_rows": "1", - "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.node": "FilterExec: n@0 < 3", - "datafusion.partitioning": "RoundRobinBatch(8)", + "datafusion.node": "WorkTableExec: name=numbers", + "datafusion.partitioning": "UnknownPartitioning(1)", "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", - "otel.name": "FilterExec", + "otel.name": "WorkTableExec", "region": "us-west" }, "spans": [ @@ -2212,6 +2265,42 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "ProjectionExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "FilterExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "RoundRobinBatch(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "RepartitionExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2226,16 +2315,15 @@ expression: json_lines "datafusion.emission_type": "Incremental", "datafusion.metrics.elapsed_compute": "0.00ms", "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.metrics.output_batches": "1", - "datafusion.metrics.output_bytes": "8.0 B", - "datafusion.metrics.output_rows": "1", + "datafusion.metrics.output_batches": "0", + "datafusion.metrics.output_bytes": "0.0 B", + "datafusion.metrics.output_rows": "0", "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.node": "ProjectionExec: expr=[n@0 + 1 as numbers.n + Int64(1)]", + "datafusion.node": "FilterExec: n@0 < 3", "datafusion.partitioning": "RoundRobinBatch(8)", - "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", - "otel.name": "ProjectionExec", + "otel.name": "FilterExec", "region": "us-west" }, "spans": [ @@ -2255,6 +2343,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2270,15 +2367,15 @@ expression: json_lines "datafusion.metrics.elapsed_compute": "0.00ms", "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", "datafusion.metrics.output_batches": "1", - "datafusion.metrics.output_bytes": "8.0 B", + "datafusion.metrics.output_bytes": "64.0 KB", "datafusion.metrics.output_rows": "1", "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.node": "CoalescePartitionsExec", - "datafusion.partitioning": "UnknownPartitioning(1)", + "datafusion.node": "FilterExec: n@0 < 3", + "datafusion.partitioning": "RoundRobinBatch(8)", "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", - "otel.name": "CoalescePartitionsExec", + "otel.name": "FilterExec", "region": "us-west" }, "spans": [ @@ -2298,6 +2395,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2310,12 +2416,18 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "WorkTableExec: name=numbers", - "datafusion.partitioning": "UnknownPartitioning(1)", + "datafusion.metrics.elapsed_compute": "0.00ms", + "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", + "datafusion.metrics.output_batches": "1", + "datafusion.metrics.output_bytes": "64.0 KB", + "datafusion.metrics.output_rows": "1", + "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", + "datafusion.node": "FilterExec: n@0 < 3", + "datafusion.partitioning": "RoundRobinBatch(8)", "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", - "otel.name": "WorkTableExec", + "otel.name": "FilterExec", "region": "us-west" }, "spans": [ @@ -2335,6 +2447,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2349,22 +2470,15 @@ expression: json_lines "datafusion.emission_type": "Incremental", "datafusion.metrics.elapsed_compute": "0.00ms", "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.metrics.fetch_time": "0.00ms", - "datafusion.metrics.output_batches": "1", - "datafusion.metrics.output_bytes": "64.0 KB", - "datafusion.metrics.output_rows": "1", - "datafusion.metrics.repartition_time": "0.00ms", - "datafusion.metrics.send_time": "0.00ms", - "datafusion.metrics.spill_count": "0", - "datafusion.metrics.spilled_bytes": "0.0 B", - "datafusion.metrics.spilled_rows": "0", + "datafusion.metrics.output_batches": "0", + "datafusion.metrics.output_bytes": "0.0 B", + "datafusion.metrics.output_rows": "0", "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.node": "RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + "datafusion.node": "ProjectionExec: expr=[n@0 + 1 as numbers.n + Int64(1)]", "datafusion.partitioning": "RoundRobinBatch(8)", - "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", - "otel.name": "RepartitionExec", + "otel.name": "ProjectionExec", "region": "us-west" }, "spans": [ @@ -2384,6 +2498,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2399,15 +2522,15 @@ expression: json_lines "datafusion.metrics.elapsed_compute": "0.00ms", "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", "datafusion.metrics.output_batches": "1", - "datafusion.metrics.output_bytes": "64.0 KB", + "datafusion.metrics.output_bytes": "8.0 B", "datafusion.metrics.output_rows": "1", "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.node": "FilterExec: n@0 < 3", + "datafusion.node": "ProjectionExec: expr=[n@0 + 1 as numbers.n + Int64(1)]", "datafusion.partitioning": "RoundRobinBatch(8)", "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", - "otel.name": "FilterExec", + "otel.name": "ProjectionExec", "region": "us-west" }, "spans": [ @@ -2427,6 +2550,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2470,6 +2602,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2484,16 +2625,22 @@ expression: json_lines "datafusion.emission_type": "Incremental", "datafusion.metrics.elapsed_compute": "0.00ms", "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", + "datafusion.metrics.fetch_time": "0.00ms", "datafusion.metrics.output_batches": "1", - "datafusion.metrics.output_bytes": "8.0 B", + "datafusion.metrics.output_bytes": "64.0 KB", "datafusion.metrics.output_rows": "1", + "datafusion.metrics.repartition_time": "0.00ms", + "datafusion.metrics.send_time": "0.00ms", + "datafusion.metrics.spill_count": "0", + "datafusion.metrics.spilled_bytes": "0.0 B", + "datafusion.metrics.spilled_rows": "0", "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.node": "CoalescePartitionsExec", - "datafusion.partitioning": "UnknownPartitioning(1)", + "datafusion.node": "RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + "datafusion.partitioning": "RoundRobinBatch(8)", "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", - "otel.name": "CoalescePartitionsExec", + "otel.name": "RepartitionExec", "region": "us-west" }, "spans": [ @@ -2513,6 +2660,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2525,12 +2681,24 @@ expression: json_lines "span": { "datafusion.boundedness": "Bounded", "datafusion.emission_type": "Incremental", - "datafusion.node": "WorkTableExec: name=numbers", - "datafusion.partitioning": "UnknownPartitioning(1)", + "datafusion.metrics.elapsed_compute": "0.00ms", + "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", + "datafusion.metrics.fetch_time": "0.00ms", + "datafusion.metrics.output_batches": "1", + "datafusion.metrics.output_bytes": "64.0 KB", + "datafusion.metrics.output_rows": "1", + "datafusion.metrics.repartition_time": "0.00ms", + "datafusion.metrics.send_time": "0.00ms", + "datafusion.metrics.spill_count": "0", + "datafusion.metrics.spilled_bytes": "0.0 B", + "datafusion.metrics.spilled_rows": "0", + "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", + "datafusion.node": "RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + "datafusion.partitioning": "RoundRobinBatch(8)", "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", - "otel.name": "WorkTableExec", + "otel.name": "RepartitionExec", "region": "us-west" }, "spans": [ @@ -2550,6 +2718,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2599,6 +2776,15 @@ expression: json_lines "name": "InstrumentedExec", "otel.name": "RecursiveQueryExec", "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "CoalescePartitionsExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2617,11 +2803,11 @@ expression: json_lines "datafusion.metrics.output_bytes": "0.0 B", "datafusion.metrics.output_rows": "0", "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.node": "FilterExec: n@0 < 3", - "datafusion.partitioning": "RoundRobinBatch(8)", + "datafusion.node": "CoalescePartitionsExec", + "datafusion.partitioning": "UnknownPartitioning(1)", "env": "production", "name": "InstrumentedExec", - "otel.name": "FilterExec", + "otel.name": "CoalescePartitionsExec", "region": "us-west" }, "spans": [ @@ -2655,15 +2841,16 @@ expression: json_lines "datafusion.emission_type": "Incremental", "datafusion.metrics.elapsed_compute": "0.00ms", "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.metrics.output_batches": "0", - "datafusion.metrics.output_bytes": "0.0 B", - "datafusion.metrics.output_rows": "0", + "datafusion.metrics.output_batches": "1", + "datafusion.metrics.output_bytes": "8.0 B", + "datafusion.metrics.output_rows": "1", "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.node": "ProjectionExec: expr=[n@0 + 1 as numbers.n + Int64(1)]", - "datafusion.partitioning": "RoundRobinBatch(8)", + "datafusion.node": "CoalescePartitionsExec", + "datafusion.partitioning": "UnknownPartitioning(1)", + "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", - "otel.name": "ProjectionExec", + "otel.name": "CoalescePartitionsExec", "region": "us-west" }, "spans": [ @@ -2697,12 +2884,13 @@ expression: json_lines "datafusion.emission_type": "Incremental", "datafusion.metrics.elapsed_compute": "0.00ms", "datafusion.metrics.end_timestamp": "1970-01-01 00:00:00 UTC", - "datafusion.metrics.output_batches": "0", - "datafusion.metrics.output_bytes": "0.0 B", - "datafusion.metrics.output_rows": "0", + "datafusion.metrics.output_batches": "1", + "datafusion.metrics.output_bytes": "8.0 B", + "datafusion.metrics.output_rows": "1", "datafusion.metrics.start_timestamp": "1970-01-01 00:00:00 UTC", "datafusion.node": "CoalescePartitionsExec", "datafusion.partitioning": "UnknownPartitioning(1)", + "datafusion.preview": "", "env": "production", "name": "InstrumentedExec", "otel.name": "CoalescePartitionsExec", @@ -2753,6 +2941,15 @@ expression: json_lines { "name": "run_traced_query", "query_name": "recursive" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "RecursiveQueryExec", + "region": "us-west" } ], "target": "integration_utils", @@ -2787,6 +2984,15 @@ expression: json_lines { "name": "run_traced_query", "query_name": "recursive" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Incremental", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "RecursiveQueryExec", + "region": "us-west" } ], "target": "integration_utils", diff --git a/tests/snapshots/10_topk_lineitem_trace.snap b/tests/snapshots/10_topk_lineitem_trace.snap index 5d23d7c..699d1d8 100644 --- a/tests/snapshots/10_topk_lineitem_trace.snap +++ b/tests/snapshots/10_topk_lineitem_trace.snap @@ -2782,6 +2782,24 @@ expression: json_lines { "name": "run_traced_query", "query_name": "topk_lineitem" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Final", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "SortPreservingMergeExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Final", + "datafusion.partitioning": "UnknownPartitioning(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "SortExec(TopK)", + "region": "us-west" } ], "target": "integration_utils", @@ -2809,6 +2827,24 @@ expression: json_lines { "name": "run_traced_query", "query_name": "topk_lineitem" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Final", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "SortPreservingMergeExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Final", + "datafusion.partitioning": "UnknownPartitioning(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "SortExec(TopK)", + "region": "us-west" } ], "target": "integration_utils", @@ -2836,6 +2872,24 @@ expression: json_lines { "name": "run_traced_query", "query_name": "topk_lineitem" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Final", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "SortPreservingMergeExec", + "region": "us-west" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Final", + "datafusion.partitioning": "UnknownPartitioning(8)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "SortExec(TopK)", + "region": "us-west" } ], "target": "integration_utils", @@ -2863,6 +2917,15 @@ expression: json_lines { "name": "run_traced_query", "query_name": "topk_lineitem" + }, + { + "datafusion.boundedness": "Bounded", + "datafusion.emission_type": "Final", + "datafusion.partitioning": "UnknownPartitioning(1)", + "env": "production", + "name": "InstrumentedExec", + "otel.name": "SortPreservingMergeExec", + "region": "us-west" } ], "target": "integration_utils",