Skip to content
596 changes: 520 additions & 76 deletions datafusion-tracing/src/instrumented_exec.rs

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions datafusion-tracing/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl NodeRecorder {
span,
}
}

pub(crate) fn span(&self) -> Span {
self.span.clone()
}
}

impl Drop for NodeRecorder {
Expand Down
11 changes: 11 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ static LOG_BUFFER: OnceLock<Arc<Mutex<Vec<u8>>>> = OnceLock::new();
// A global "once" to ensure the subscriber is initialized only once.
static SUBSCRIBER_INIT: Once = Once::new();

// Snapshot assertions read from a process-global tracing subscriber. Run these
// test cases one at a time so close-event ordering is not affected by other
// snapshot tests running concurrently.
static TEST_EXECUTION_LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();

/// A struct describing how a particular query test should be run.
#[derive(Debug, Default)]
struct QueryTestCase<'a> {
Expand Down Expand Up @@ -197,8 +202,14 @@ async fn test_topk_lineitem() -> Result<()> {
/// Executes the provided [`QueryTestCase`], setting up tracing and verifying
/// log output according to its parameters.
async fn execute_test_case(test_name: &str, test_case: &QueryTestCase<'_>) -> Result<()> {
let _test_guard = TEST_EXECUTION_LOCK
.get_or_init(Default::default)
.lock()
.await;

// Initialize tracing infrastructure and collect a log buffer.
let log_buffer = init_tracing();
log_buffer.lock().unwrap().clear();

// Initialize the DataFusion session with the requested options.
// plan_diff disabled for tests to avoid non-deterministic output
Expand Down
10 changes: 9 additions & 1 deletion tests/snapshots/06_object_store_all_options_trace.snap
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
---
source: tests/integration_tests.rs
assertion_line: 250
expression: json_lines
---
[
Expand Down Expand Up @@ -2210,6 +2209,15 @@ expression: json_lines
{
"name": "run_traced_query",
"query_name": "order_nations"
},
{
"datafusion.boundedness": "Bounded",
"datafusion.emission_type": "Final",
"datafusion.partitioning": "UnknownPartitioning(1)",
"env": "production",
"name": "InstrumentedExec",
"otel.name": "SortExec",
"region": "us-west"
}
],
"target": "integration_utils",
Expand Down
Loading
Loading