Skip to content
817 changes: 740 additions & 77 deletions datafusion-tracing/src/instrumented_exec.rs

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions datafusion-tracing/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl NodeRecorder {
span,
}
}

pub(crate) fn span(&self) -> Span {
self.span.clone()
}
}

impl Drop for NodeRecorder {
Expand Down
100 changes: 99 additions & 1 deletion tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ static LOG_BUFFER: OnceLock<Arc<Mutex<Vec<u8>>>> = OnceLock::new();
// A global "once" to ensure the subscriber is initialized only once.
static SUBSCRIBER_INIT: Once = Once::new();

// Snapshot assertions read from a process-global tracing subscriber. Run these
// test cases one at a time so close-event ordering is not affected by other
// snapshot tests running concurrently.
static TEST_EXECUTION_LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();

/// A struct describing how a particular query test should be run.
#[derive(Debug, Default)]
struct QueryTestCase<'a> {
Expand Down Expand Up @@ -197,8 +202,14 @@ async fn test_topk_lineitem() -> Result<()> {
/// Executes the provided [`QueryTestCase`], setting up tracing and verifying
/// log output according to its parameters.
async fn execute_test_case(test_name: &str, test_case: &QueryTestCase<'_>) -> Result<()> {
let _test_guard = TEST_EXECUTION_LOCK
.get_or_init(Default::default)
.lock()
.await;

// Initialize tracing infrastructure and collect a log buffer.
let log_buffer = init_tracing();
log_buffer.lock().unwrap().clear();

// Initialize the DataFusion session with the requested options.
// plan_diff disabled for tests to avoid non-deterministic output
Expand Down Expand Up @@ -245,15 +256,102 @@ async fn execute_test_case(test_name: &str, test_case: &QueryTestCase<'_>) -> Re

// General assertion on the full trace.
if !test_case.ignore_full_trace {
let full_trace_lines = normalize_full_trace_lines(test_name, &json_lines);

// Redact `datafusion.preview` values as they are often non-deterministic.
preview_redacted_settings().bind(|| {
assert_json_snapshot!(format!("{test_name}_trace"), json_lines);
assert_json_snapshot!(format!("{test_name}_trace"), full_trace_lines);
});
}

Ok(())
}

fn normalize_full_trace_lines(test_name: &str, json_lines: &[Value]) -> Vec<Value> {
if !test_name.contains("recursive") {
return json_lines.to_vec();
}

// Recursive queries may close child execution streams in a different order
// across runtimes. Keep the trace contents intact, but make the snapshot
// independent of that scheduling detail.
let mut normalized_lines = Vec::new();
let mut recursive_exec_lines = Vec::new();
let mut recursive_exec_insert_at = None;

for json_line in json_lines {
if is_recursive_exec_close(json_line) {
recursive_exec_insert_at.get_or_insert(normalized_lines.len());
recursive_exec_lines.push(json_line.clone());
} else {
normalized_lines.push(json_line.clone());
}
}

recursive_exec_lines.sort_by_key(recursive_exec_sort_key);

if let Some(insert_at) = recursive_exec_insert_at {
normalized_lines.splice(insert_at..insert_at, recursive_exec_lines);
}

normalized_lines
}

fn is_recursive_exec_close(json_line: &Value) -> bool {
extract_json_field_value(json_line, "name").as_deref() == Some("InstrumentedExec")
&& json_line["spans"].as_array().is_some_and(|spans| {
spans.iter().any(|span| {
span.get("otel.name").and_then(Value::as_str)
== Some("RecursiveQueryExec")
})
})
}

fn recursive_exec_sort_key(json_line: &Value) -> String {
let span_path = json_line["spans"]
.as_array()
.into_iter()
.flatten()
.map(|span| {
[
value_to_sort_string(span.get("name")),
value_to_sort_string(span.get("otel.name")),
value_to_sort_string(span.get("datafusion.node")),
value_to_sort_string(span.get("datafusion.partitioning")),
]
.join(":")
})
.collect::<Vec<_>>()
.join("/");

[
span_path,
span_field_sort_value(json_line, "name"),
span_field_sort_value(json_line, "otel.name"),
span_field_sort_value(json_line, "datafusion.node"),
span_field_sort_value(json_line, "datafusion.partitioning"),
span_field_sort_value(json_line, "datafusion.metrics.output_rows"),
span_field_sort_value(json_line, "datafusion.metrics.output_batches"),
span_field_sort_value(json_line, "datafusion.metrics.output_bytes"),
]
.join("|")
}

fn span_field_sort_value(json_line: &Value, field_name: &str) -> String {
value_to_sort_string(json_line["span"].get(field_name))
}

fn value_to_sort_string(value: Option<&Value>) -> String {
value
.map(|value| {
value
.as_str()
.map(ToString::to_string)
.unwrap_or_else(|| value.to_string())
})
.unwrap_or_default()
}

/// Extracts a field's value from the `"span"` object in a JSON line, returning it as an `Option<String>`.
fn extract_json_field_value(json_line: &Value, field_name: &str) -> Option<String> {
let span_fields = &json_line["span"];
Expand Down
10 changes: 9 additions & 1 deletion tests/snapshots/06_object_store_all_options_trace.snap
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
---
source: tests/integration_tests.rs
assertion_line: 250
expression: json_lines
---
[
Expand Down Expand Up @@ -2210,6 +2209,15 @@ expression: json_lines
{
"name": "run_traced_query",
"query_name": "order_nations"
},
{
"datafusion.boundedness": "Bounded",
"datafusion.emission_type": "Final",
"datafusion.partitioning": "UnknownPartitioning(1)",
"env": "production",
"name": "InstrumentedExec",
"otel.name": "SortExec",
"region": "us-west"
}
],
"target": "integration_utils",
Expand Down
Loading
Loading