diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 522ccbc94c..a9bc18b582 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -54,6 +54,9 @@ object CometConf extends ShimCometConf { private val TRACING_GUIDE = "For more information, refer to the Comet Tracing " + "Guide (https://datafusion.apache.org/comet/contributor-guide/tracing.html)" + private val DEBUGGING_GUIDE = "For more information, refer to the Comet Debugging " + + "Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html" + /** List of all configs that is used for generating documentation */ val allConfs = new ListBuffer[ConfigEntry[_]] @@ -549,6 +552,13 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] = + conf(s"$COMET_PREFIX.debug.memory") + .category(CATEGORY_TESTING) + .doc(s"When enabled, log all native memory pool interactions. $DEBUGGING_GUIDE.") + .booleanConf + .createWithDefault(false) + val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose" val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback" diff --git a/dev/benchmarks/comet-tpcds.sh b/dev/benchmarks/comet-tpcds.sh index b55b27188c..dc9048fe6f 100755 --- a/dev/benchmarks/comet-tpcds.sh +++ b/dev/benchmarks/comet-tpcds.sh @@ -22,7 +22,7 @@ $SPARK_HOME/sbin/stop-master.sh $SPARK_HOME/sbin/stop-worker.sh $SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER +RUST_BACKTRACE=1 $SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER $SPARK_HOME/bin/spark-submit \ --master $SPARK_MASTER \ diff --git a/dev/benchmarks/comet-tpch.sh b/dev/benchmarks/comet-tpch.sh index a748a02319..0cbf2f1e20 100755 --- a/dev/benchmarks/comet-tpch.sh +++ b/dev/benchmarks/comet-tpch.sh @@ -22,7 +22,7 @@ $SPARK_HOME/sbin/stop-master.sh $SPARK_HOME/sbin/stop-worker.sh $SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER +RUST_BACKTRACE=1 $SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER $SPARK_HOME/bin/spark-submit \ --master $SPARK_MASTER \ diff --git a/dev/scripts/mem_debug_to_csv.py b/dev/scripts/mem_debug_to_csv.py new file mode 100644 index 0000000000..ad2c7af052 --- /dev/null +++ b/dev/scripts/mem_debug_to_csv.py @@ -0,0 +1,73 @@ +#!/usr/bin/python +############################################################################## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +############################################################################## + +import argparse +import re +import sys + +def main(file, task_filter): + # keep track of running total allocation per consumer + alloc = {} + + # open file + with open(file) as f: + # iterate over lines in file + print("name,size,label") + for line in f: + # print(line, file=sys.stderr) + + # example line: [Task 486] MemoryPool[HashJoinInput[6]].shrink(1000) + # parse consumer name + re_match = re.search('\[Task (.*)\] MemoryPool\[(.*)\]\.(try_grow|grow|shrink)\(([0-9]*)\)', line, re.IGNORECASE) + if re_match: + try: + task = int(re_match.group(1)) + if task != task_filter: + continue + + consumer = re_match.group(2) + method = re_match.group(3) + size = int(re_match.group(4)) + + if alloc.get(consumer) is None: + alloc[consumer] = size + else: + if method == "grow" or method == "try_grow": + if "Err" in line: + # do not update allocation if try_grow failed + # annotate this entry so it can be shown in the chart + print(consumer, ",", alloc[consumer], ",ERR") + else: + alloc[consumer] = alloc[consumer] + size + elif method == "shrink": + alloc[consumer] = alloc[consumer] - size + + print(consumer, ",", alloc[consumer]) + + except Exception as e: + print("error parsing", line, e, file=sys.stderr) + + +if __name__ == "__main__": + ap = argparse.ArgumentParser(description="Generate CSV From memory debug output") + ap.add_argument("--task", default=None, help="Task ID.") + ap.add_argument("--file", default=None, help="Spark log containing memory debug output") + args = ap.parse_args() + main(args.file, int(args.task)) diff --git a/dev/scripts/plot_memory_usage.py b/dev/scripts/plot_memory_usage.py new file mode 100644 index 0000000000..1ba82baeff --- /dev/null +++ b/dev/scripts/plot_memory_usage.py @@ -0,0 +1,69 @@ +#!/usr/bin/python +############################################################################## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +############################################################################## + +import pandas as pd +import matplotlib.pyplot as plt +import sys + +def plot_memory_usage(csv_file): + # Read the CSV file + df = pd.read_csv(csv_file) + + # Create time index based on row order (each row is a sequential time point) + df['time'] = range(len(df)) + + # Pivot the data to have consumers as columns + pivot_df = df.pivot(index='time', columns='name', values='size') + pivot_df = pivot_df.fillna(method='ffill').fillna(0) + + # Create stacked area chart + plt.figure(figsize=(8, 4)) + plt.stackplot(pivot_df.index, + [pivot_df[col] for col in pivot_df.columns], + labels=pivot_df.columns, + alpha=0.8) + + # Add annotations for ERR labels + if 'label' in df.columns: + err_points = df[df['label'].str.contains('ERR', na=False)] + for _, row in err_points.iterrows(): + plt.axvline(x=row['time'], color='red', linestyle='--', alpha=0.7, linewidth=1.5) + plt.text(row['time'], plt.ylim()[1] * 0.95, 'ERR', + ha='center', va='top', color='red', fontweight='bold') + + plt.xlabel('Time') + plt.ylabel('Memory Usage') + plt.title('Memory Usage Over Time by Consumer') + plt.legend(loc='upper left', bbox_to_anchor=(1.05, 1), borderaxespad=0, fontsize='small') + plt.grid(True, alpha=0.3) + plt.tight_layout() + + # Save the plot + output_file = csv_file.replace('.csv', '_chart.png') + plt.savefig(output_file, dpi=300, bbox_inches='tight') + print(f"Chart saved to: {output_file}") + plt.show() + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python plot_memory_usage.py ") + sys.exit(1) + + plot_memory_usage(sys.argv[1]) diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index db5bdfc593..a0c13afa32 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -127,7 +127,7 @@ To build Comet with this feature enabled: make release COMET_FEATURES=backtrace ``` -Start Comet with `RUST_BACKTRACE=1` +Set `RUST_BACKTRACE=1` for the Spark worker/executor process, or for `spark-submit` if running in local mode. ```console RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION.jar --conf spark.plugins=org.apache.spark.CometPlugin --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true @@ -188,3 +188,43 @@ This produces output like the following: Additionally, you can place a `log4rs.yaml` configuration file inside the Comet configuration directory specified by the `COMET_CONF_DIR` environment variable to enable more advanced logging configurations. This file uses the [log4rs YAML configuration format](https://docs.rs/log4rs/latest/log4rs/#configuration-via-a-yaml-file). For example, see: [log4rs.yaml](https://github.com/apache/datafusion-comet/blob/main/conf/log4rs.yaml). + +### Debugging Memory Reservations + +Set `spark.comet.debug.memory=true` to log all calls that grow or shrink memory reservations. + +Example log output: + +``` +[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256232960) returning Ok +[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256375168) returning Ok +[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256899456) returning Ok +[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257296128) returning Ok +[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257820416) returning Err +[Task 486] MemoryPool[ExternalSorterMerge[6]].shrink(10485760) +[Task 486] MemoryPool[ExternalSorter[6]].shrink(150464) +[Task 486] MemoryPool[ExternalSorter[6]].shrink(146688) +[Task 486] MemoryPool[ExternalSorter[6]].shrink(137856) +[Task 486] MemoryPool[ExternalSorter[6]].shrink(141952) +[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok +[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok +[Task 486] MemoryPool[ExternalSorter[6]].shrink(524288) +[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok +[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(68928) returning Ok +``` + +When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations. + +There are Python scripts in `dev/scripts` that can be used to produce charts for a particular Spark task. + +First, extract the memory logging and write to CSV: + +```shell +python3 dev/scripts/mem_debug_to_csv.py /path/to/executor/log > /tmp/mem.csv +``` + +Next, generate a chart from the CSV file for a specific Spark task: + +```shell +python3 dev/scripts/plot_memory_usage.py /tmp/mem.csv --task 1234 +``` \ No newline at end of file diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 146e0feb8e..f9061a48cc 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -81,9 +81,10 @@ use crate::execution::spark_plan::SparkPlan; use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace}; +use crate::execution::memory_pools::logging_pool::LoggingPool; use crate::execution::spark_config::{ - SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_MAX_TEMP_DIRECTORY_SIZE, - COMET_TRACING_ENABLED, + SparkConfig, COMET_DEBUG_ENABLED, COMET_DEBUG_MEMORY, COMET_EXPLAIN_NATIVE_ENABLED, + COMET_MAX_TEMP_DIRECTORY_SIZE, COMET_TRACING_ENABLED, }; use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID}; use datafusion_comet_proto::spark_operator::operator::OpStruct; @@ -192,6 +193,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED); let max_temp_directory_size = spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 1024 * 1024); + let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY); with_trace("createPlan", tracing_enabled, || { // Init JVM classes @@ -228,6 +230,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let memory_pool = create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id); + let memory_pool = if logging_memory_pool { + Arc::new(LoggingPool::new(task_attempt_id as u64, memory_pool)) + } else { + memory_pool + }; + // Get local directories for storing spill files let num_local_dirs = env.get_array_length(&local_dirs)?; let mut local_dirs_vec = vec![]; diff --git a/native/core/src/execution/memory_pools/logging_pool.rs b/native/core/src/execution/memory_pools/logging_pool.rs new file mode 100644 index 0000000000..8cd2ca6e55 --- /dev/null +++ b/native/core/src/execution/memory_pools/logging_pool.rs @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::execution::memory_pool::{ + MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, +}; +use log::info; +use std::sync::Arc; + +#[derive(Debug)] +pub(crate) struct LoggingPool { + task_attempt_id: u64, + pool: Arc, +} + +impl LoggingPool { + pub fn new(task_attempt_id: u64, pool: Arc) -> Self { + Self { + task_attempt_id, + pool, + } + } +} + +impl MemoryPool for LoggingPool { + fn register(&self, consumer: &MemoryConsumer) { + self.pool.register(consumer) + } + + fn unregister(&self, consumer: &MemoryConsumer) { + self.pool.unregister(consumer) + } + + fn grow(&self, reservation: &MemoryReservation, additional: usize) { + info!( + "[Task {}] MemoryPool[{}].grow({})", + self.task_attempt_id, + reservation.consumer().name(), + additional + ); + self.pool.grow(reservation, additional); + } + + fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { + info!( + "[Task {}] MemoryPool[{}].shrink({})", + self.task_attempt_id, + reservation.consumer().name(), + shrink + ); + self.pool.shrink(reservation, shrink); + } + + fn try_grow( + &self, + reservation: &MemoryReservation, + additional: usize, + ) -> datafusion::common::Result<()> { + match self.pool.try_grow(reservation, additional) { + Ok(_) => { + info!( + "[Task {}] MemoryPool[{}].try_grow({}) returning Ok", + self.task_attempt_id, + reservation.consumer().name(), + additional + ); + Ok(()) + } + Err(e) => { + info!( + "[Task {}] MemoryPool[{}].try_grow({}) returning Err: {e:?}", + self.task_attempt_id, + reservation.consumer().name(), + additional + ); + Err(e) + } + } + } + + fn reserved(&self) -> usize { + self.pool.reserved() + } + + fn memory_limit(&self) -> MemoryLimit { + self.pool.memory_limit() + } +} diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index fc6a81a5e5..d8b3473353 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -17,6 +17,7 @@ mod config; mod fair_pool; +pub mod logging_pool; mod task_shared; mod unified_pool; diff --git a/native/core/src/execution/spark_config.rs b/native/core/src/execution/spark_config.rs index 60ebb2ff8b..b257a5ba68 100644 --- a/native/core/src/execution/spark_config.rs +++ b/native/core/src/execution/spark_config.rs @@ -21,6 +21,7 @@ pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled"; pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled"; pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled"; pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize"; +pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory"; pub(crate) trait SparkConfig { fn get_bool(&self, name: &str) -> bool;