Skip to content

Commit 204d7e4

Browse files
authored
fix: improve tracing feature (#3688)
1 parent 6565de9 commit 204d7e4

12 files changed

Lines changed: 531 additions & 117 deletions

File tree

305 KB
Loading

docs/source/contributor-guide/tracing.md

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,76 @@ Example output:
4040
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109228835 },
4141
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "B", "pid": 1, "tid": 5, "ts": 10109245928 },
4242
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109248843 },
43-
{ "name": "execute_plan", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109350935 },
44-
{ "name": "CometExecIterator_getNextBatch", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109367116 },
45-
{ "name": "CometExecIterator_getNextBatch", "cat": "PERF", "ph": "B", "pid": 1, "tid": 5, "ts": 10109479156 },
43+
{ "name": "executePlan", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109350935 },
44+
{ "name": "getNextBatch[JVM] stage=2", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109367116 },
45+
{ "name": "getNextBatch[JVM] stage=2", "cat": "PERF", "ph": "B", "pid": 1, "tid": 5, "ts": 10109479156 },
4646
```
4747

48-
Traces can be viewed with [Trace Viewer].
48+
Traces can be viewed with [Perfetto UI].
4949

50-
[Trace Viewer]: https://github.com/catapult-project/catapult/blob/main/tracing/README.md
50+
[Perfetto UI]: https://ui.perfetto.dev
5151

5252
Example trace visualization:
5353

5454
![tracing](../_static/images/tracing.png)
5555

56+
## Analyzing Memory Usage
57+
58+
The `analyze_trace` tool parses a trace log and compares jemalloc usage against the sum of per-thread
59+
Comet memory pool reservations. This is useful for detecting untracked native memory growth where jemalloc
60+
allocations exceed what the memory pools account for.
61+
62+
Build and run:
63+
64+
```shell
65+
cd native
66+
cargo run --bin analyze_trace -- /path/to/comet-event-trace.json
67+
```
68+
69+
The tool reads counter events from the trace log. Because tracing logs metrics per thread, `jemalloc_allocated`
70+
is a process-wide value (the same global allocation reported from whichever thread logs it), while
71+
`thread_NNN_comet_memory_reserved` values are per-thread pool reservations that are summed to get the total
72+
tracked memory.
73+
74+
Sample output:
75+
76+
```
77+
=== Comet Trace Memory Analysis ===
78+
79+
Counter events parsed: 193104
80+
Threads with memory pools: 8
81+
Peak jemalloc allocated: 3068.2 MB
82+
Peak pool total: 2864.6 MB
83+
Peak excess (jemalloc - pool): 364.6 MB
84+
85+
WARNING: jemalloc exceeded pool reservation at 138 sampled points:
86+
87+
Time (us) jemalloc pool_total excess
88+
--------------------------------------------------------------
89+
179578 210.8 MB 0.1 MB 210.7 MB
90+
429663 420.5 MB 145.1 MB 275.5 MB
91+
1304969 2122.5 MB 1797.2 MB 325.2 MB
92+
21974838 407.0 MB 42.3 MB 364.6 MB
93+
33543599 5.5 MB 0.1 MB 5.3 MB
94+
95+
--- Final per-thread pool reservations ---
96+
97+
thread_60_comet_memory_reserved: 0.0 MB
98+
thread_95_comet_memory_reserved: 0.0 MB
99+
thread_96_comet_memory_reserved: 0.0 MB
100+
...
101+
102+
Total: 0.0 MB
103+
```
104+
105+
Some excess is expected (jemalloc metadata, fragmentation, non-pool allocations like Arrow IPC buffers).
106+
Large or growing excess may indicate memory that is not being tracked by the pool.
107+
56108
## Definition of Labels
57109

58-
| Label | Meaning |
59-
| --------------------- | -------------------------------------------------------------- |
60-
| jvm_heapUsed | JVM heap memory usage of live objects for the executor process |
61-
| jemalloc_allocated | Native memory usage for the executor process |
62-
| task_memory_comet_NNN | Off-heap memory allocated by Comet for query execution |
63-
| task_memory_spark_NNN | On-heap & Off-heap memory allocated by Spark |
64-
| comet_shuffle_NNN | Off-heap memory allocated by Comet for columnar shuffle |
110+
| Label | Meaning |
111+
| -------------------------------- | ------------------------------------------------------------------------------------------------------------------------ |
112+
| jvm_heap_used | JVM heap memory usage of live objects for the executor process |
113+
| jemalloc_allocated | Native memory usage for the executor process (requires `jemalloc` feature) |
114+
| thread_NNN_comet_memory_reserved | Memory reserved by Comet's DataFusion memory pool (summed across all contexts on the thread). NNN is the Rust thread ID. |
115+
| thread_NNN_comet_jvm_shuffle | Off-heap memory allocated by Comet for columnar shuffle. NNN is the Rust thread ID. |

native/common/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,7 @@ thiserror = { workspace = true }
3838
[lib]
3939
name = "datafusion_comet_common"
4040
path = "src/lib.rs"
41+
42+
[[bin]]
43+
name = "analyze_trace"
44+
path = "src/bin/analyze_trace.rs"
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Analyzes a Comet chrome trace event log (`comet-event-trace.json`) and
19+
//! compares jemalloc usage against the sum of per-thread Comet memory pool
20+
//! reservations. Reports any points where jemalloc exceeds the total pool size.
21+
//!
22+
//! Usage:
23+
//! cargo run --bin analyze_trace -- <path-to-comet-event-trace.json>
24+
25+
use serde::Deserialize;
26+
use std::collections::HashMap;
27+
use std::io::{BufRead, BufReader};
28+
use std::{env, fs::File};
29+
30+
/// A single Chrome trace event (only the fields we care about).
31+
#[derive(Deserialize)]
32+
struct TraceEvent {
33+
name: String,
34+
ph: String,
35+
#[allow(dead_code)]
36+
tid: u64,
37+
ts: u64,
38+
#[serde(default)]
39+
args: HashMap<String, serde_json::Value>,
40+
}
41+
42+
/// Snapshot of memory state at a given timestamp.
43+
struct MemorySnapshot {
44+
ts: u64,
45+
jemalloc: u64,
46+
pool_total: u64,
47+
}
48+
49+
fn format_bytes(bytes: u64) -> String {
50+
const MB: f64 = 1024.0 * 1024.0;
51+
format!("{:.1} MB", bytes as f64 / MB)
52+
}
53+
54+
fn main() {
55+
let args: Vec<String> = env::args().collect();
56+
if args.len() != 2 {
57+
eprintln!("Usage: analyze_trace <path-to-comet-event-trace.json>");
58+
std::process::exit(1);
59+
}
60+
61+
let file = File::open(&args[1]).expect("Failed to open trace file");
62+
let reader = BufReader::new(file);
63+
64+
// Latest jemalloc value (global, not per-thread)
65+
let mut latest_jemalloc: u64 = 0;
66+
// Per-thread pool reservations: thread_NNN -> bytes
67+
let mut pool_by_thread: HashMap<String, u64> = HashMap::new();
68+
// Points where jemalloc exceeded pool total
69+
let mut violations: Vec<MemorySnapshot> = Vec::new();
70+
// Track peak values
71+
let mut peak_jemalloc: u64 = 0;
72+
let mut peak_pool_total: u64 = 0;
73+
let mut peak_excess: u64 = 0;
74+
let mut counter_events: u64 = 0;
75+
76+
// Each line is one JSON event, possibly with a trailing comma.
77+
// The file starts with "[ " on the first event line or as a prefix.
78+
for line in reader.lines() {
79+
let line = line.expect("Failed to read line");
80+
let trimmed = line.trim();
81+
82+
// Skip empty lines or bare array brackets
83+
if trimmed.is_empty() || trimmed == "[" || trimmed == "]" {
84+
continue;
85+
}
86+
87+
// Strip leading "[ " (first event) and trailing comma
88+
let json_str = trimmed
89+
.trim_start_matches("[ ")
90+
.trim_start_matches('[')
91+
.trim_end_matches(',');
92+
93+
if json_str.is_empty() {
94+
continue;
95+
}
96+
97+
// Only parse counter events (they contain "\"ph\": \"C\"")
98+
if !json_str.contains("\"ph\": \"C\"") {
99+
continue;
100+
}
101+
102+
let event: TraceEvent = match serde_json::from_str(json_str) {
103+
Ok(e) => e,
104+
Err(_) => continue,
105+
};
106+
107+
if event.ph != "C" {
108+
continue;
109+
}
110+
111+
counter_events += 1;
112+
113+
if event.name == "jemalloc_allocated" {
114+
if let Some(val) = event.args.get("jemalloc_allocated") {
115+
latest_jemalloc = val.as_u64().unwrap_or(0);
116+
if latest_jemalloc > peak_jemalloc {
117+
peak_jemalloc = latest_jemalloc;
118+
}
119+
}
120+
} else if event.name.contains("comet_memory_reserved") {
121+
// Name format: thread_NNN_comet_memory_reserved
122+
let thread_key = event.name.clone();
123+
if let Some(val) = event.args.get(&event.name) {
124+
let bytes = val.as_u64().unwrap_or(0);
125+
pool_by_thread.insert(thread_key, bytes);
126+
}
127+
} else {
128+
// Skip jvm_heap_used and other counters
129+
continue;
130+
}
131+
132+
// After each jemalloc or pool update, check the current state
133+
let pool_total: u64 = pool_by_thread.values().sum();
134+
if pool_total > peak_pool_total {
135+
peak_pool_total = pool_total;
136+
}
137+
138+
if latest_jemalloc > 0 && pool_total > 0 && latest_jemalloc > pool_total {
139+
let excess = latest_jemalloc - pool_total;
140+
if excess > peak_excess {
141+
peak_excess = excess;
142+
}
143+
// Record violation (sample - don't record every single one)
144+
if violations.is_empty()
145+
|| event.ts.saturating_sub(violations.last().unwrap().ts) > 1_000_000
146+
|| excess == peak_excess
147+
{
148+
violations.push(MemorySnapshot {
149+
ts: event.ts,
150+
jemalloc: latest_jemalloc,
151+
pool_total,
152+
});
153+
}
154+
}
155+
}
156+
157+
// Print summary
158+
println!("=== Comet Trace Memory Analysis ===\n");
159+
println!("Counter events parsed: {counter_events}");
160+
println!("Threads with memory pools: {}", pool_by_thread.len());
161+
println!("Peak jemalloc allocated: {}", format_bytes(peak_jemalloc));
162+
println!(
163+
"Peak pool total: {}",
164+
format_bytes(peak_pool_total)
165+
);
166+
println!(
167+
"Peak excess (jemalloc - pool): {}",
168+
format_bytes(peak_excess)
169+
);
170+
println!();
171+
172+
if violations.is_empty() {
173+
println!("OK: jemalloc never exceeded the total pool reservation.");
174+
} else {
175+
println!(
176+
"WARNING: jemalloc exceeded pool reservation at {} sampled points:\n",
177+
violations.len()
178+
);
179+
println!(
180+
"{:>14} {:>14} {:>14} {:>14}",
181+
"Time (us)", "jemalloc", "pool_total", "excess"
182+
);
183+
println!("{}", "-".repeat(62));
184+
for snap in &violations {
185+
let excess = snap.jemalloc - snap.pool_total;
186+
println!(
187+
"{:>14} {:>14} {:>14} {:>14}",
188+
snap.ts,
189+
format_bytes(snap.jemalloc),
190+
format_bytes(snap.pool_total),
191+
format_bytes(excess),
192+
);
193+
}
194+
}
195+
196+
// Show final per-thread pool state
197+
println!("\n--- Final per-thread pool reservations ---\n");
198+
let mut threads: Vec<_> = pool_by_thread.iter().collect();
199+
threads.sort_by_key(|(k, _)| (*k).clone());
200+
for (thread, bytes) in &threads {
201+
println!(" {thread}: {}", format_bytes(**bytes));
202+
}
203+
println!("\n Total: {}", format_bytes(pool_by_thread.values().sum()));
204+
}

native/common/src/tracing.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,9 @@ impl Recorder {
6464
}
6565

6666
pub fn log_memory_usage(&self, name: &str, usage_bytes: u64) {
67-
let usage_mb = (usage_bytes as f64 / 1024.0 / 1024.0) as usize;
6867
let json = format!(
69-
"{{ \"name\": \"{name}\", \"cat\": \"PERF\", \"ph\": \"C\", \"pid\": 1, \"tid\": {}, \"ts\": {}, \"args\": {{ \"{name}\": {usage_mb} }} }},\n",
70-
Self::get_thread_id(),
68+
"{{ \"name\": \"{name}\", \"cat\": \"PERF\", \"ph\": \"C\", \"pid\": 1, \"tid\": {}, \"ts\": {}, \"args\": {{ \"{name}\": {usage_bytes} }} }},\n",
69+
get_thread_id(),
7170
self.now.elapsed().as_micros()
7271
);
7372
let mut writer = self.writer.lock().unwrap();
@@ -80,23 +79,23 @@ impl Recorder {
8079
let json = format!(
8180
"{{ \"name\": \"{}\", \"cat\": \"PERF\", \"ph\": \"{ph}\", \"pid\": 1, \"tid\": {}, \"ts\": {} }},\n",
8281
name,
83-
Self::get_thread_id(),
82+
get_thread_id(),
8483
self.now.elapsed().as_micros()
8584
);
8685
let mut writer = self.writer.lock().unwrap();
8786
writer
8887
.write_all(json.as_bytes())
8988
.expect("Error writing tracing");
9089
}
90+
}
9191

92-
fn get_thread_id() -> u64 {
93-
let thread_id = std::thread::current().id();
94-
format!("{thread_id:?}")
95-
.trim_start_matches("ThreadId(")
96-
.trim_end_matches(")")
97-
.parse()
98-
.expect("Error parsing thread id")
99-
}
92+
pub fn get_thread_id() -> u64 {
93+
let thread_id = std::thread::current().id();
94+
format!("{thread_id:?}")
95+
.trim_start_matches("ThreadId(")
96+
.trim_end_matches(")")
97+
.parse()
98+
.expect("Error parsing thread id")
10099
}
101100

102101
pub fn trace_begin(name: &str) {

0 commit comments

Comments
 (0)