Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/common/statistics/src/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,21 @@ pub const DEFAULT_HISTOGRAM_BUCKETS: usize = 100;
pub struct Histogram {
pub accuracy: bool,
pub buckets: Vec<HistogramBucket>,
/// Bucket width: (max - min) / num_buckets.
/// Only set when accuracy == false (generated from NDV + min/max).
/// Used to detect range distortion caused by outlier values inflating min/max.
/// A very large bucket_width means linear interpolation within buckets is unreliable.
#[serde(default)]
pub avg_spacing: Option<f64>,
}

impl Histogram {
pub fn new(buckets: Vec<HistogramBucket>, accuracy: bool) -> Self {
Self { accuracy, buckets }
Self {
accuracy,
buckets,
avg_spacing: None,
}
}

/// Get number of buckets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
)?;

for hash in hashes.iter_mut() {
*hash = ((*hash << self.shift_bits) >> 60) & 0b1111;
*hash = Self::get_partition_id(*hash, self.shift_bits);
}

Ok(self.build_partition_stream.partition(hashes, data, true))
Expand All @@ -324,7 +324,7 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {
)?;

for hash in hashes.iter_mut() {
*hash = ((*hash << self.shift_bits) >> 60) & 0b1111;
*hash = Self::get_partition_id(*hash, self.shift_bits);
}

Ok(self.probe_partition_stream.partition(hashes, data, true))
Expand Down Expand Up @@ -385,6 +385,20 @@ impl<T: GraceMemoryJoin> GraceHashJoin<T> {

Ok(())
}

#[inline(always)]
#[cfg(target_feature = "sse4.2")]
fn get_partition_id(hash: u64, shift_bits: usize) -> u64 {
// On SSE4.2, _mm_crc32_u64 only sets the low 32 bits; high 32 bits are always 0.
// Extract partition bits from the low 32 bits to avoid all rows landing in partition 0.
(hash << shift_bits >> 28) & 0b1111
}

#[inline(always)]
#[cfg(not(target_feature = "sse4.2"))]
fn get_partition_id(hash: u64, shift_bits: usize) -> u64 {
(hash << shift_bits >> 60) & 0b1111
}
}

pub enum RestoreStage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ impl HybridHashJoinState {
level: usize,
factory: Arc<HashJoinFactory>,
) -> Result<Arc<HybridHashJoinState>> {
// On SSE4.2, fast_hash (_mm_crc32_u64) only sets the low 32 bits.
#[cfg(target_feature = "sse4.2")]
const HASH_JOIN_SPILL_MAX_LEVEL: usize = 7;
#[cfg(not(target_feature = "sse4.2"))]
const HASH_JOIN_SPILL_MAX_LEVEL: usize = 15;

let settings = ctx.get_settings();
let max_level = settings.get_max_hash_join_spill_level()? as usize;
let max_spill_level = settings.get_max_hash_join_spill_level()? as usize;
let max_level = (max_spill_level).min(HASH_JOIN_SPILL_MAX_LEVEL);

Ok(Arc::new(HybridHashJoinState {
ctx,
Expand Down
15 changes: 15 additions & 0 deletions src/query/sql/src/planner/optimizer/ir/stats/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl HistogramBuilder {
Ok(Histogram {
buckets: vec![],
accuracy: false,
avg_spacing: None,
})
};
}
Expand All @@ -69,6 +70,19 @@ impl HistogramBuilder {
}
};

// Compute avg_spacing before moving min/max into UniformSampleSet.
// avg_spacing = (max - min) / num_buckets (bucket width); used later to detect range distortion.
let avg_spacing = if let (Ok(min_f), Ok(max_f)) = (min.as_double(), max.as_double()) {
let range = max_f - min_f;
if range > 0.0 && num_buckets > 0 {
Some(range / num_buckets as f64)
} else {
None
}
} else {
None
};

// Adjust number of buckets if needed
let adjusted_num_buckets = if num_buckets > ndv as usize {
ndv as usize
Expand All @@ -83,6 +97,7 @@ impl HistogramBuilder {
Ok(Histogram {
buckets,
accuracy: false,
avg_spacing,
})
}

Expand Down
24 changes: 23 additions & 1 deletion src/query/sql/src/planner/optimizer/ir/stats/selectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ impl SelectivityVisitor<'_> {
let column_stat = self
.ensure_column_stat(column_index)
.expect("checked above");

return Self::compute_comparison_with_stat(
column_stat,
constant,
Expand Down Expand Up @@ -366,6 +367,9 @@ impl SelectivityVisitor<'_> {
Some(histogram) => {
let selectivity =
Self::compute_histogram_comparison(histogram, op, &const_datum)?;

let distorted = !histogram.accuracy && is_histogram_range_distorted(histogram);

if let Selectivity::N(n) = selectivity {
let (new_min, new_max) = match op {
ComparisonOp::GT | ComparisonOp::GTE => {
Expand All @@ -376,9 +380,20 @@ impl SelectivityVisitor<'_> {
}
_ => unreachable!(),
};

update_statistic(column_stat, new_min, new_max, n)?;
}
Ok(selectivity)

// For inaccurate histograms with a distorted range (e.g. outlier
// sentinel values inflating min/max), the linear interpolation above
// is unreliable. We still ran update_statistic so that min/max bounds
// are narrowed correctly for subsequent predicates on the same column
// (e.g. the `< 200` in `col > 100 AND col < 200`), but we override
// the selectivity with LowerBound.
match distorted {
true => Ok(Selectivity::LowerBound),
false => Ok(selectivity),
}
}
None => {
if column_is_integer {
Expand Down Expand Up @@ -806,3 +821,10 @@ impl Selectivity {
}
}
}

fn is_histogram_range_distorted(histogram: &Histogram) -> bool {
const BUCKET_WIDTH_THRESHOLD: f64 = 1e12;
histogram
.avg_spacing
.is_some_and(|bw| bw > BUCKET_WIDTH_THRESHOLD)
}
Loading