From 2dbed5522eaba8bf8b3bc3e953bc408eb9582691 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 15 May 2026 21:52:11 +0800 Subject: [PATCH 1/7] [python] Add ReadBuilder.explain() for scan-plan visibility Introduce ReadBuilder.explain() returning a structured ExplainResult that summarises the target snapshot, the pushed-down predicate / projection / limit, the partition / bucket / file-stats pruning funnel, and split- level execution signals (raw-convertible ratio, deletion-vector ratio, level histogram, files-per-split and split-size distribution). A new opt-in ScanStats counter set is wired through FileScanner via TableScan.scan_with_stats(). The regular read hot path is unaffected when scan_stats is None. To produce accurate before/after counters, explain() suppresses the manifest reader's early bucket filter and forces single-threaded manifest decoding for the one pass that drives it. The order of partition and bucket checks in _filter_manifest_entry is rearranged so each pruning stage maps cleanly to one counter; both filters remain pure AND tests and the final survivor set is identical. Predicate rendering lives in a standalone helper so Predicate itself stays rendering-agnostic. --- paimon-python/pypaimon/read/explain.py | 241 ++++++++++++++++++ paimon-python/pypaimon/read/explain_render.py | 100 ++++++++ paimon-python/pypaimon/read/read_builder.py | 215 ++++++++++++++++ paimon-python/pypaimon/read/scan_stats.py | 51 ++++ .../pypaimon/read/scanner/file_scanner.py | 83 +++++- paimon-python/pypaimon/read/table_scan.py | 11 +- 6 files changed, 691 insertions(+), 10 deletions(-) create mode 100644 paimon-python/pypaimon/read/explain.py create mode 100644 paimon-python/pypaimon/read/explain_render.py create mode 100644 paimon-python/pypaimon/read/scan_stats.py diff --git a/paimon-python/pypaimon/read/explain.py b/paimon-python/pypaimon/read/explain.py new file mode 100644 index 000000000000..13db6d41571c --- /dev/null +++ b/paimon-python/pypaimon/read/explain.py @@ -0,0 +1,241 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import io +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +@dataclass +class PruningStat: + """Before / after counters for one pruning stage. + + ``before`` is the input size to the stage, ``after`` is the size that + survived. Either may be ``None`` when the stage did not run (for + example, ``bucket_pruning`` is ``None`` for tables that are not + HASH_FIXED with all bucket keys pinned). + """ + + before: Optional[int] + after: Optional[int] + + @property + def pruned(self) -> Optional[int]: + if self.before is None or self.after is None: + return None + return self.before - self.after + + def format(self) -> str: + if self.before is None and self.after is None: + return "n/a" + before = "?" if self.before is None else str(self.before) + after = "?" if self.after is None else str(self.after) + pruned = self.pruned + suffix = "" if pruned is None else " (pruned {})".format(pruned) + return "{} -> {}{}".format(before, after, suffix) + + +@dataclass +class ExplainSplitInfo: + """Per-split detail surfaced when ``explain(verbose=True)`` is used.""" + + partition: Dict[str, Any] + bucket: int + file_count: int + row_count: int + merged_row_count: Optional[int] + file_size: int + raw_convertible: bool + has_deletion_vectors: bool + level_histogram: Dict[int, int] + deletion_file_count: int + file_paths: List[str] + + +@dataclass +class ExplainResult: + """Structured scan plan returned by ``ReadBuilder.explain()``. + + The compact ``__str__`` shows enough signal to reason about cost: the + snapshot, the pushed-down predicate / projection / limit, the three + pruning before-after counters, split-level shape (raw-convertible + ratio, DV ratio, all-above-L0 ratio, files/split, size/split + distribution), and the file-level totals. ``verbose=True`` adds a + block listing each split. + """ + + # Identity + table_identifier: str + is_primary_key_table: bool + bucket_mode: str + deletion_vectors_enabled: bool + data_evolution_enabled: bool + + # Snapshot + snapshot_id: Optional[int] + schema_id: Optional[int] + + # Pushdown + predicate: Optional[str] = None + projection: Optional[List[str]] = None + limit: Optional[int] = None + + # Pruning (None when not applicable) + partition_pruning: Optional[PruningStat] = None + bucket_pruning: Optional[PruningStat] = None + file_skipping: Optional[PruningStat] = None + + # File-level aggregates over final splits + file_count: int = 0 + total_file_size: int = 0 + estimated_row_count: int = 0 + estimated_merged_row_count: Optional[int] = None + deletion_file_count: int = 0 + level_histogram: Dict[int, int] = field(default_factory=dict) + + # Split-level aggregates (shown in compact mode too) + split_count: int = 0 + splits_raw_convertible: int = 0 + splits_with_deletion_vectors: int = 0 + splits_all_above_l0: int = 0 + files_per_split_min: int = 0 + files_per_split_max: int = 0 + files_per_split_avg: float = 0.0 + split_size_min: int = 0 + split_size_max: int = 0 + split_size_avg: float = 0.0 + split_size_p50: int = 0 + split_size_p95: int = 0 + + # Verbose-only + splits: Optional[List[ExplainSplitInfo]] = None + + def __str__(self) -> str: + return render_explain(self) + + +# --------------------------------------------------------------------------- +# Pretty-print helpers +# --------------------------------------------------------------------------- + +def render_explain(result: ExplainResult) -> str: + out = io.StringIO() + out.write("== PyPaimon Scan Plan ==\n") + + flags = [] + flags.append("PK" if result.is_primary_key_table else "Append") + flags.append(result.bucket_mode) + if result.deletion_vectors_enabled: + flags.append("dv=on") + if result.data_evolution_enabled: + flags.append("data-evolution=on") + _line(out, "Table", "{} ({})".format(result.table_identifier, ", ".join(flags))) + + if result.snapshot_id is None: + _line(out, "Snapshot", " (table is empty or has no snapshot)") + else: + schema_part = "" if result.schema_id is None else " (schema {})".format(result.schema_id) + _line(out, "Snapshot", "{}{}".format(result.snapshot_id, schema_part)) + + _line(out, "Predicate", result.predicate if result.predicate else "") + _line(out, "Projection", + "[{}]".format(", ".join(result.projection)) if result.projection else "") + _line(out, "Limit", str(result.limit) if result.limit is not None else "") + + out.write("\n") + _line(out, "Partition pruning", + result.partition_pruning.format() if result.partition_pruning else "n/a") + _line(out, "Bucket pruning", + result.bucket_pruning.format() if result.bucket_pruning else "n/a") + _line(out, "File skipping", + result.file_skipping.format() if result.file_skipping else "n/a") + + out.write("\n") + _line(out, "Splits", str(result.split_count)) + if result.split_count > 0: + out.write(" raw-convertible: {} / {}\n".format( + result.splits_raw_convertible, result.split_count)) + out.write(" with DV: {} / {}\n".format( + result.splits_with_deletion_vectors, result.split_count)) + out.write(" all-above-L0: {} / {}\n".format( + result.splits_all_above_l0, result.split_count)) + out.write(" files/split: min={} max={} avg={:.2f}\n".format( + result.files_per_split_min, + result.files_per_split_max, + result.files_per_split_avg)) + out.write(" size/split: min={} p50={} p95={} max={}\n".format( + _format_size(result.split_size_min), + _format_size(result.split_size_p50), + _format_size(result.split_size_p95), + _format_size(result.split_size_max))) + + out.write("\n") + _line(out, "Files", str(result.file_count)) + _line(out, "Total size", _format_size(result.total_file_size)) + + rows = "{:,}".format(result.estimated_row_count) + if result.estimated_merged_row_count is not None: + rows += " (merged: {:,})".format(result.estimated_merged_row_count) + _line(out, "Estimated rows", rows) + + if result.level_histogram: + levels = sorted(result.level_histogram.items()) + levels_str = " ".join("L{}={}".format(lv, cnt) for lv, cnt in levels) + _line(out, "Level histogram", levels_str) + _line(out, "Deletion files", str(result.deletion_file_count)) + + if result.splits: + out.write("\nSplits[]\n") + for idx, split in enumerate(result.splits): + out.write(" [{}] partition={} bucket={} files={} size={} rows={} raw={} dv={}\n".format( + idx, + split.partition, + split.bucket, + split.file_count, + _format_size(split.file_size), + split.row_count, + split.raw_convertible, + split.has_deletion_vectors, + )) + if split.level_histogram: + levels = sorted(split.level_histogram.items()) + out.write(" levels: {}\n".format( + " ".join("L{}={}".format(lv, cnt) for lv, cnt in levels))) + for path in split.file_paths: + out.write(" file: {}\n".format(path)) + + return out.getvalue().rstrip("\n") + + +def _line(out: io.StringIO, label: str, value: str) -> None: + out.write("{:<19} {}\n".format(label + ":", value)) + + +_SIZE_UNITS = ("B", "KiB", "MiB", "GiB", "TiB", "PiB") + + +def _format_size(num_bytes: int) -> str: + if num_bytes is None: + return "?" + size = float(num_bytes) + for unit in _SIZE_UNITS: + if size < 1024.0 or unit == _SIZE_UNITS[-1]: + if unit == "B": + return "{:d} {}".format(int(size), unit) + return "{:.1f} {}".format(size, unit) + size /= 1024.0 + return "{:.1f} {}".format(size, _SIZE_UNITS[-1]) diff --git a/paimon-python/pypaimon/read/explain_render.py b/paimon-python/pypaimon/read/explain_render.py new file mode 100644 index 000000000000..2e09da842a64 --- /dev/null +++ b/paimon-python/pypaimon/read/explain_render.py @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any + +from pypaimon.common.predicate import Predicate + + +_BINARY_OPS = { + 'equal': '=', + 'notEqual': '!=', + 'lessThan': '<', + 'lessOrEqual': '<=', + 'greaterThan': '>', + 'greaterOrEqual': '>=', +} + + +def render_predicate(predicate: Predicate) -> str: + """Render a :class:`Predicate` tree as a human-readable string. + + The renderer relies only on the existing ``method`` / ``field`` / + ``literals`` shape and never mutates the predicate. Used by + ``ReadBuilder.explain()`` and intentionally lives outside + :mod:`pypaimon.common.predicate` so the predicate module stays + rendering-agnostic. + """ + if predicate is None: + return "" + method = predicate.method + field = predicate.field + literals = predicate.literals + + if method == 'and': + return _join_children(literals, 'AND') + if method == 'or': + return _join_children(literals, 'OR') + if method in _BINARY_OPS: + return "{} {} {}".format(field, _BINARY_OPS[method], _format_literal(literals[0])) + if method == 'in': + return "{} IN [{}]".format(field, ", ".join(_format_literal(v) for v in literals)) + if method == 'notIn': + return "{} NOT IN [{}]".format(field, ", ".join(_format_literal(v) for v in literals)) + if method == 'between': + return "{} BETWEEN {} AND {}".format( + field, _format_literal(literals[0]), _format_literal(literals[1])) + if method == 'notBetween': + return "{} NOT BETWEEN {} AND {}".format( + field, _format_literal(literals[0]), _format_literal(literals[1])) + if method == 'isNull': + return "{} IS NULL".format(field) + if method == 'isNotNull': + return "{} IS NOT NULL".format(field) + if method == 'startsWith': + return "{} STARTSWITH {}".format(field, _format_literal(literals[0])) + if method == 'endsWith': + return "{} ENDSWITH {}".format(field, _format_literal(literals[0])) + if method == 'contains': + return "{} CONTAINS {}".format(field, _format_literal(literals[0])) + if method == 'like': + return "{} LIKE {}".format(field, _format_literal(literals[0])) + return "{}({}{})".format( + method, + field if field is not None else "", + ", " + ", ".join(_format_literal(v) for v in (literals or [])) if literals else "", + ) + + +def _join_children(children, joiner: str) -> str: + parts = [render_predicate(c) for c in (children or [])] + parts = [p for p in parts if p] + if not parts: + return "" + if len(parts) == 1: + return parts[0] + return " {} ".format(joiner).join("({})".format(p) for p in parts) + + +def _format_literal(value: Any) -> str: + if value is None: + return "NULL" + if isinstance(value, str): + return "'{}'".format(value.replace("'", "\\'")) + if isinstance(value, bytes): + return "b'{}'".format(value.hex()) + return repr(value) diff --git a/paimon-python/pypaimon/read/read_builder.py b/paimon-python/pypaimon/read/read_builder.py index 13a951df0b8e..51233856f259 100644 --- a/paimon-python/pypaimon/read/read_builder.py +++ b/paimon-python/pypaimon/read/read_builder.py @@ -19,6 +19,10 @@ from pypaimon.common.predicate import Predicate from pypaimon.common.predicate_builder import PredicateBuilder +from pypaimon.read.explain import ExplainResult, ExplainSplitInfo, PruningStat +from pypaimon.read.explain_render import render_predicate +from pypaimon.read.scan_stats import ScanStats +from pypaimon.read.split import Split from pypaimon.read.table_read import TableRead from pypaimon.read.table_scan import TableScan from pypaimon.schema.data_types import DataField @@ -97,6 +101,41 @@ def _nested_name_paths(self) -> Optional[List[List[str]]]: def new_predicate_builder(self) -> PredicateBuilder: return PredicateBuilder(self.read_type()) + # TODO: surface this through pypaimon's CLI (alongside cli_sql / + # cli_table) so users can run `pypaimon explain ...` against a table + # without writing any Python. + def explain(self, verbose: bool = False) -> ExplainResult: + """Produce a structured scan plan for this builder. + + Runs one planning pass (manifest list + manifest reads, no data + files) and returns an :class:`ExplainResult` summarising the + target snapshot, the pushed-down predicate / projection / limit, + the partition / bucket / file-stats pruning funnel, and split- + level execution signals (raw-convertible ratio, deletion-vector + ratio, level histogram, files-per-split and split-size + distribution). With ``verbose=True``, every split is listed. + + Cost: ``explain()`` reads manifest list + manifests but never + opens data files. To produce accurate before/after counters it + suppresses the manifest-reader's early bucket filter and forces + single-threaded manifest decoding, so it can be measurably + heavier than a regular ``new_scan().plan()`` on tables where the + early filter usually prunes aggressively (e.g. very wide + HASH_FIXED tables with a tight predicate). + """ + scan = self.new_scan() + plan, stats = scan.scan_with_stats() + return _build_explain_result( + table=self.table, + scan=scan, + plan=plan, + stats=stats, + predicate=self._predicate, + projection=self._projection, + limit=self._limit, + verbose=verbose, + ) + def read_type(self) -> List[DataField]: table_fields = self.table.fields @@ -158,3 +197,179 @@ def _resolve_dotted_paths(self, names: List[str]) -> List[List[int]]: if ok: paths.append(path) return paths + + +def _build_explain_result(table, scan: TableScan, plan, stats: ScanStats, + predicate, projection, limit, verbose: bool) -> ExplainResult: + """Translate one (Plan, ScanStats) pair into an ExplainResult.""" + splits: List[Split] = plan.splits() + + table_schema = table.table_schema + bucket_mode_str = _safe_bucket_mode(table) + + partition_pruning = _partition_pruning(stats, scan) + bucket_pruning = _bucket_pruning(stats, scan) + file_skipping = _file_skipping(stats, scan) + + files_per_split = [len(getattr(s, 'files', []) or []) for s in splits] + sizes = [int(getattr(s, 'file_size', 0) or 0) for s in splits] + + rows_total = sum(int(getattr(s, 'row_count', 0) or 0) for s in splits) + merged_per_split = [s.merged_row_count() for s in splits] + if splits and all(v is not None for v in merged_per_split): + merged_total: Optional[int] = sum(merged_per_split) + else: + merged_total = None + + file_count = sum(files_per_split) + total_size = sum(sizes) + level_hist: dict = {} + deletion_file_total = 0 + splits_raw_convertible = 0 + splits_with_dv = 0 + splits_all_above_l0 = 0 + split_infos: List[ExplainSplitInfo] = [] + + for split in splits: + files = getattr(split, 'files', []) or [] + per_split_levels: dict = {} + for f in files: + lv = getattr(f, 'level', 0) or 0 + level_hist[lv] = level_hist.get(lv, 0) + 1 + per_split_levels[lv] = per_split_levels.get(lv, 0) + 1 + dvs = getattr(split, 'data_deletion_files', None) or [] + dv_count_here = sum(1 for d in dvs if d is not None) + deletion_file_total += dv_count_here + has_dv = dv_count_here > 0 + raw = bool(getattr(split, 'raw_convertible', False)) + if raw: + splits_raw_convertible += 1 + if has_dv: + splits_with_dv += 1 + if files and all((getattr(f, 'level', 0) or 0) > 0 for f in files): + splits_all_above_l0 += 1 + + if verbose: + split_infos.append(ExplainSplitInfo( + partition=_format_partition(split, table), + bucket=int(getattr(split, 'bucket', -1)), + file_count=len(files), + row_count=int(getattr(split, 'row_count', 0) or 0), + merged_row_count=split.merged_row_count(), + file_size=int(getattr(split, 'file_size', 0) or 0), + raw_convertible=raw, + has_deletion_vectors=has_dv, + level_histogram=per_split_levels, + deletion_file_count=dv_count_here, + file_paths=list(getattr(split, 'file_paths', []) or []), + )) + + fps_min, fps_max, fps_avg = _min_max_avg(files_per_split) + sz_min, sz_max, sz_avg = _min_max_avg(sizes) + sz_p50 = _percentile(sizes, 50) + sz_p95 = _percentile(sizes, 95) + + return ExplainResult( + table_identifier=str(table.identifier.get_full_name()), + is_primary_key_table=bool(table.is_primary_key_table), + bucket_mode=bucket_mode_str, + deletion_vectors_enabled=bool(table.options.deletion_vectors_enabled()), + data_evolution_enabled=bool(table.options.data_evolution_enabled()), + snapshot_id=plan.snapshot_id, + schema_id=table_schema.id if plan.snapshot_id is not None else None, + predicate=render_predicate(predicate) if predicate is not None else None, + projection=list(projection) if projection else None, + limit=limit, + partition_pruning=partition_pruning, + bucket_pruning=bucket_pruning, + file_skipping=file_skipping, + file_count=file_count, + total_file_size=total_size, + estimated_row_count=rows_total, + estimated_merged_row_count=merged_total, + deletion_file_count=deletion_file_total, + level_histogram=level_hist, + split_count=len(splits), + splits_raw_convertible=splits_raw_convertible, + splits_with_deletion_vectors=splits_with_dv, + splits_all_above_l0=splits_all_above_l0, + files_per_split_min=fps_min, + files_per_split_max=fps_max, + files_per_split_avg=fps_avg, + split_size_min=sz_min, + split_size_max=sz_max, + split_size_avg=sz_avg, + split_size_p50=sz_p50, + split_size_p95=sz_p95, + splits=split_infos if verbose else None, + ) + + +def _partition_pruning(stats: ScanStats, scan: TableScan) -> Optional[PruningStat]: + if scan.predicate is None: + return None + table_partition_keys = scan.table.partition_keys or [] + if not table_partition_keys: + return None + # ``entries_potential_total`` is the count from manifest-file metadata + # (manifest-level pruning has not been applied yet). The "after" side + # is everything that survived both manifest-stats and per-entry + # partition filters. + return PruningStat( + before=stats.entries_potential_total, + after=stats.entries_after_partition, + ) + + +def _bucket_pruning(stats: ScanStats, scan: TableScan) -> Optional[PruningStat]: + # Visible whenever the scan applies any bucket-level filtering — the + # HASH_FIXED predicate-driven selector OR the POSTPONE_BUCKET + # synthetic-bucket skip. Tables with neither (e.g. BUCKET_UNAWARE + # append) leave this counter as ``None``. + fs = scan.file_scanner + if fs._bucket_selector is None and not fs.only_read_real_buckets: + return None + return PruningStat(before=stats.entries_after_partition, after=stats.entries_after_bucket) + + +def _file_skipping(stats: ScanStats, scan: TableScan) -> Optional[PruningStat]: + # Captures the funnel between bucket-stage survivors and the entries + # that actually feed the split generator. The drop here includes both + # predicate-driven file-stats pruning AND structural skips that fire + # in ``_filter_manifest_entry`` once a file is fully decoded (most + # notably the "do not read level-0 file" rule for DV-enabled PK + # tables, which is an LSM-shape decision rather than a predicate + # test). + if scan.predicate is None: + return None + return PruningStat(before=stats.entries_after_bucket, after=stats.entries_after_stats) + + +def _safe_bucket_mode(table) -> str: + try: + return table.bucket_mode().name + except Exception: + return "UNKNOWN" + + +def _format_partition(split, table) -> dict: + keys = list(table.partition_keys or []) + partition = getattr(split, 'partition', None) + if partition is None or not keys: + return {} + values = getattr(partition, 'values', None) or [] + return {k: v for k, v in zip(keys, values)} + + +def _min_max_avg(values): + if not values: + return 0, 0, 0.0 + return min(values), max(values), sum(values) / float(len(values)) + + +def _percentile(values, pct: int) -> int: + if not values: + return 0 + ordered = sorted(values) + idx = int(round((pct / 100.0) * (len(ordered) - 1))) + return int(ordered[idx]) diff --git a/paimon-python/pypaimon/read/scan_stats.py b/paimon-python/pypaimon/read/scan_stats.py new file mode 100644 index 000000000000..6626b0c240f2 --- /dev/null +++ b/paimon-python/pypaimon/read/scan_stats.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from dataclasses import dataclass, field +from typing import Set, Tuple + + +@dataclass +class ScanStats: + """Counters accumulated by :class:`FileScanner` when it is asked to + track stats for ``ReadBuilder.explain()``. + + The scanner mutates these counters in place; consumers should treat + instances as immutable once the scan returns. Default factory values + keep the dataclass usable both as a blank "no tracking" sentinel and + as a live accumulator. + """ + + manifest_files_total: int = 0 + manifest_files_after_partition: int = 0 + + # ``entries_potential_total`` is the row count we would have processed if no + # filtering occurred — derived from manifest-file metadata before the + # manifest-level partition skip. ``entries_total`` is what actually reached + # ``_filter_manifest_entry``; the gap between the two captures + # manifest-level pruning. + entries_potential_total: int = 0 + entries_total: int = 0 + entries_after_partition: int = 0 + entries_after_bucket: int = 0 + entries_after_stats: int = 0 + + partition_keys_before: Set[Tuple] = field(default_factory=set) + partition_keys_after: Set[Tuple] = field(default_factory=set) + + buckets_seen: Set[Tuple[Tuple, int]] = field(default_factory=set) + buckets_after_pruning: Set[Tuple[Tuple, int]] = field(default_factory=set) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index 2878d411718e..1d2831c194e3 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -35,6 +35,7 @@ from pypaimon.read.push_down_utils import (_get_all_fields, remove_row_id_filter, trim_and_transform_predicate) +from pypaimon.read.scan_stats import ScanStats from pypaimon.read.scanner.append_table_split_generator import \ AppendTableSplitGenerator from pypaimon.read.scanner.bucket_select_converter import \ @@ -210,6 +211,10 @@ def __init__( self._global_index_result = None self._scanned_snapshot = None self._scanned_snapshot_id = None + # Opt-in scan-plan tracking. Stays ``None`` for the read hot path; + # ``scan_with_stats()`` flips it on for a single explain pass and + # the filter callbacks below increment counters when present. + self.scan_stats: Optional[ScanStats] = None # Predicate-driven bucket pruning (HASH_FIXED only). Mirrors Java # BucketSelectConverter. Set on demand and reused across all @@ -345,7 +350,20 @@ def _eval_global_index(self): def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ManifestEntry]: max_workers = self.table.options.scan_manifest_parallelism(os.cpu_count() or 8) + if self.scan_stats is not None: + self.scan_stats.manifest_files_total += len(manifest_files) + # ``num_added_files + num_deleted_files`` is the entry count + # recorded in the manifest-file metadata; combined across + # all input manifest files this is the partition-prune-free + # baseline. The difference against ``entries_total`` reveals + # how much manifest-level pruning saved. + self.scan_stats.entries_potential_total += sum( + f.num_added_files + f.num_deleted_files for f in manifest_files) manifest_files = [entry for entry in manifest_files if self._filter_manifest_file(entry)] + if self.scan_stats is not None: + self.scan_stats.manifest_files_after_partition += len(manifest_files) + # Force single-threaded so we can mutate stats without locking. + max_workers = 1 return self.manifest_file_manager.read_entries_parallel( manifest_files, self._filter_manifest_entry, @@ -364,6 +382,11 @@ def _build_early_bucket_filter(self): still happens later in ``_filter_manifest_entry`` once the entry is fully decoded. """ + # explain() needs accurate before/after pruning counters; suppress + # the early bucket filter so every entry reaches + # ``_filter_manifest_entry`` where each rejection stage is counted. + if self.scan_stats is not None: + return None only_real = self.only_read_real_buckets selector = self._bucket_selector if not only_real and selector is None: @@ -402,6 +425,19 @@ def with_global_index_result(self, result) -> 'FileScanner': self._global_index_result = result return self + def scan_with_stats(self) -> Tuple[Plan, ScanStats]: + """Run one scan pass while recording :class:`ScanStats` counters. + + Side-effects: forces single-thread manifest reads and disables the + early bucket filter so every entry reaches + ``_filter_manifest_entry`` exactly once. The scanner is one-shot + in this mode — call ``scan()`` on a fresh instance afterwards if + you need the regular hot path. + """ + self.scan_stats = ScanStats() + plan = self.scan() + return plan, self.scan_stats + def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: """Mirror Java ``DataTableBatchScan.applyPushDownLimit``: sum the DV-aware ``merged_row_count`` (== Java ``Split.mergedRowCount()``) @@ -498,9 +534,30 @@ def _init_bucket_selector(self): ) def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: - # Redundant safety net: the early filter in the manifest reader - # already enforces these, but guard here too so this method is - # self-contained if called outside read_entries_parallel. + stats = self.scan_stats + if stats is not None: + stats.entries_total += 1 + partition_key = tuple(entry.partition.values) + stats.partition_keys_before.add(partition_key) + stats.buckets_seen.add((partition_key, entry.bucket)) + # Stage 1: partition predicate. The early manifest-reader filter + # only sees ``(bucket, total_buckets)`` and never enforces + # partition predicates, so this check is the sole partition gate + # at the entry level — not a "redundant safety net". + if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition): + return False + if stats is not None: + stats.entries_after_partition += 1 + stats.partition_keys_after.add(partition_key) + # Stage 2: bucket rejection. Two reasons land here: + # * ``only_read_real_buckets`` drops the synthetic + # POSTPONE_BUCKET bucket id (also enforced by the early + # filter when present; kept here so the method is correct + # standalone). + # * ``_bucket_selector`` is the HASH_FIXED predicate-driven + # selector built by ``_init_bucket_selector``. + # Both are accounted for under ``entries_after_bucket`` so the + # explain funnel reports bucket-level pruning end-to-end. if self.only_read_real_buckets and entry.bucket < 0: return False if (self._bucket_selector is not None @@ -508,8 +565,9 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: and not self._bucket_selector( entry.partition, entry.bucket, entry.total_buckets)): return False - if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition): - return False + if stats is not None: + stats.entries_after_bucket += 1 + stats.buckets_after_pruning.add((partition_key, entry.bucket)) # Get SimpleStatsEvolution for this schema evolution = self.simple_stats_evolutions.get_or_create(entry.file.schema_id) @@ -540,14 +598,18 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: entry.file.row_count ): return False + if stats is not None: + stats.entries_after_stats += 1 return True else: - if not self.predicate: - return True - if self.predicate_for_stats is None: + if not self.predicate or self.predicate_for_stats is None: + if stats is not None: + stats.entries_after_stats += 1 return True # Data evolution: file stats may be from another schema, skip stats filter and filter in reader. if self.data_evolution: + if stats is not None: + stats.entries_after_stats += 1 return True if entry.file.value_stats_cols is None and entry.file.write_cols is not None: stats_fields = entry.file.write_cols @@ -558,10 +620,13 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: entry.file.row_count, stats_fields ) - return self.predicate_for_stats.test_by_simple_stats( + kept = self.predicate_for_stats.test_by_simple_stats( evolved_stats, entry.file.row_count ) + if kept and stats is not None: + stats.entries_after_stats += 1 + return kept def _scan_dv_index(self, snapshot, buckets: Set[tuple]) -> Dict[tuple, Dict[str, DeletionFile]]: """ diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index d4ac6cfe16b9..bc610134e0d0 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -15,12 +15,13 @@ # specific language governing permissions and limitations # under the License. -from typing import Optional +from typing import Optional, Tuple from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.read.plan import Plan +from pypaimon.read.scan_stats import ScanStats from pypaimon.read.scanner.file_scanner import FileScanner from pypaimon.manifest.manifest_list_manager import ManifestListManager @@ -44,6 +45,14 @@ def __init__( def plan(self) -> Plan: return self.file_scanner.scan() + def scan_with_stats(self) -> Tuple[Plan, ScanStats]: + """Run :meth:`plan` while recording manifest / pruning counters. + + Only used by :meth:`ReadBuilder.explain`; the regular read path + keeps going through :meth:`plan`. + """ + return self.file_scanner.scan_with_stats() + def _create_file_scanner(self) -> FileScanner: options = self.table.options.options snapshot_manager = self.table.snapshot_manager() From cd2ae8983bd75c346f163ad317e0dcaca348e659 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 15 May 2026 21:52:11 +0800 Subject: [PATCH 2/7] [python] Tests for ReadBuilder.explain() Cover the seven scenarios called out in the design: append-only baseline, PK partitioned + HASH_FIXED with predicate that triggers both partition and bucket pruning, predicate rendering shapes (equal, in, between, isNull, and/or), verbose split detail alignment with plan().splits(), empty snapshot path, split-level signals (raw-convertible / DV / L0) across append-only and DV-on PK tables, and pretty-print smoke for the compact layout anchors. --- .../tests/read_builder_explain_test.py | 270 ++++++++++++++++++ 1 file changed, 270 insertions(+) create mode 100644 paimon-python/pypaimon/tests/read_builder_explain_test.py diff --git a/paimon-python/pypaimon/tests/read_builder_explain_test.py b/paimon-python/pypaimon/tests/read_builder_explain_test.py new file mode 100644 index 000000000000..82b0f0387f8c --- /dev/null +++ b/paimon-python/pypaimon/tests/read_builder_explain_test.py @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Tests for ``ReadBuilder.explain``: pruning funnel counters, +split-level execution signals, and pretty-print smoke.""" + +import os +import shutil +import tempfile +import unittest +from typing import Any, Dict, List + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema +from pypaimon.common.predicate import Predicate +from pypaimon.read.explain import ExplainResult +from pypaimon.read.explain_render import render_predicate + + +def _write(table, rows: List[Dict], pa_schema: pa.Schema) -> None: + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + + +class ReadBuilderExplainTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', False) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + # ---- helpers -------------------------------------------------------- + + def _append_table(self, name: str) -> Any: + pa_schema = pa.schema([ + pa.field('id', pa.int64(), nullable=False), + ('val', pa.int64()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={'bucket': '-1', 'file.format': 'parquet'}, + ) + full = 'default.{}'.format(name) + self.catalog.create_table(full, schema, False) + return self.catalog.get_table(full), pa_schema + + def _pk_partitioned_bucketed_table(self, name: str, num_buckets: int = 4) -> Any: + pa_schema = pa.schema([ + pa.field('dt', pa.string(), nullable=False), + pa.field('id', pa.int64(), nullable=False), + ('val', pa.int64()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=['dt'], + primary_keys=['dt', 'id'], + options={ + 'bucket': str(num_buckets), + 'bucket-key': 'id', + 'file.format': 'parquet', + }, + ) + full = 'default.{}'.format(name) + self.catalog.create_table(full, schema, False) + return self.catalog.get_table(full), pa_schema + + def _pk_dv_table(self, name: str, num_buckets: int = 2) -> Any: + pa_schema = pa.schema([ + pa.field('id', pa.int64(), nullable=False), + ('val', pa.int64()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + primary_keys=['id'], + options={ + 'bucket': str(num_buckets), + 'file.format': 'parquet', + 'deletion-vectors.enabled': 'true', + }, + ) + full = 'default.{}'.format(name) + self.catalog.create_table(full, schema, False) + return self.catalog.get_table(full), pa_schema + + # ---- 1. append-only baseline --------------------------------------- + + def test_explain_append_only_no_predicate(self): + table, pa_schema = self._append_table('explain_basic') + _write(table, [{'id': i, 'val': i * 2} for i in range(20)], pa_schema) + _write(table, [{'id': i, 'val': i * 3} for i in range(20, 40)], pa_schema) + + rb = table.new_read_builder() + result = rb.explain() + + plan_splits = rb.new_scan().plan().splits() + self.assertEqual(result.split_count, len(plan_splits)) + self.assertGreater(result.file_count, 0) + self.assertGreater(result.total_file_size, 0) + self.assertIsNone(result.partition_pruning) + self.assertIsNone(result.bucket_pruning) + self.assertIsNone(result.file_skipping) + self.assertIsNone(result.predicate) + + # ---- 2. PK + partition + bucket pruning ---------------------------- + + def test_explain_pk_table_with_partition_and_bucket_predicate(self): + table, pa_schema = self._pk_partitioned_bucketed_table( + 'explain_pk_pruning', num_buckets=4) + for day in ['2026-05-01', '2026-05-02', '2026-05-03', '2026-05-04']: + _write( + table, + [{'dt': day, 'id': i, 'val': i + 1} for i in range(20)], + pa_schema, + ) + + rb = table.new_read_builder() + pb = rb.new_predicate_builder() + pred = pb.and_predicates([ + pb.equal('dt', '2026-05-01'), + pb.equal('id', 7), + ]) + rb = rb.with_filter(pred) + result = rb.explain() + + self.assertIsNotNone(result.partition_pruning) + self.assertIsNotNone(result.bucket_pruning) + self.assertIsNotNone(result.file_skipping) + self.assertGreater( + result.partition_pruning.before, result.partition_pruning.after, + "partition predicate must drop at least one entry") + self.assertGreater( + result.bucket_pruning.before, result.bucket_pruning.after, + "HASH_FIXED bucket pruning must drop at least one entry") + + # Cross-check against the actual scan + plan_splits = rb.new_scan().plan().splits() + self.assertEqual(result.split_count, len(plan_splits)) + + # ---- 3. predicate rendering ---------------------------------------- + + def test_render_predicate_shapes(self): + eq = Predicate(method='equal', index=0, field='dt', literals=['2026-05-01']) + self.assertEqual(render_predicate(eq), "dt = '2026-05-01'") + + in_p = Predicate(method='in', index=1, field='id', literals=[1, 2, 3]) + self.assertEqual(render_predicate(in_p), "id IN [1, 2, 3]") + + between = Predicate(method='between', index=1, field='id', literals=[5, 10]) + self.assertEqual(render_predicate(between), "id BETWEEN 5 AND 10") + + is_null = Predicate(method='isNull', index=0, field='val', literals=None) + self.assertEqual(render_predicate(is_null), "val IS NULL") + + and_p = Predicate(method='and', index=None, field=None, literals=[eq, in_p]) + self.assertEqual( + render_predicate(and_p), + "(dt = '2026-05-01') AND (id IN [1, 2, 3])") + + or_p = Predicate(method='or', index=None, field=None, literals=[between, is_null]) + self.assertEqual( + render_predicate(or_p), + "(id BETWEEN 5 AND 10) OR (val IS NULL)") + + # ---- 4. verbose split detail --------------------------------------- + + def test_explain_verbose_lists_per_split_detail(self): + table, pa_schema = self._append_table('explain_verbose') + _write(table, [{'id': i, 'val': i} for i in range(50)], pa_schema) + _write(table, [{'id': i, 'val': i} for i in range(50, 100)], pa_schema) + + rb = table.new_read_builder() + result = rb.explain(verbose=True) + self.assertIsNotNone(result.splits) + self.assertEqual(len(result.splits), result.split_count) + + plan_splits = rb.new_scan().plan().splits() + for explained, actual in zip(result.splits, plan_splits): + self.assertEqual(explained.bucket, actual.bucket) + self.assertEqual(explained.file_count, len(actual.files)) + self.assertEqual(set(explained.file_paths), set(actual.file_paths)) + + # ---- 5. empty snapshot path ---------------------------------------- + + def test_explain_empty_snapshot(self): + table, _ = self._append_table('explain_empty') + + rb = table.new_read_builder() + result = rb.explain() + self.assertIsNone(result.snapshot_id) + self.assertEqual(result.split_count, 0) + self.assertEqual(result.file_count, 0) + self.assertIn("", str(result)) + + # ---- 6. split-level metrics: DV vs append-only --------------------- + + def test_explain_split_level_metrics(self): + # Append-only: every split is raw-convertible, no deletion vectors. + table, pa_schema = self._append_table('explain_split_signals_ap') + _write(table, [{'id': i, 'val': i} for i in range(30)], pa_schema) + _write(table, [{'id': i, 'val': i} for i in range(30, 60)], pa_schema) + result = table.new_read_builder().explain() + self.assertGreater(result.split_count, 0) + self.assertEqual(result.splits_with_deletion_vectors, 0) + self.assertEqual(result.splits_raw_convertible, result.split_count) + # All written files are at L0, and append-only doesn't filter them. + self.assertIn(0, result.level_histogram) + self.assertEqual(result.splits_all_above_l0, 0) + + # DV-enabled PK table: pypaimon writes alone don't trigger compaction, + # so every file stays at L0. ``_filter_manifest_entry`` then drops + # them all, leaving an empty plan. The vacuous "all above L0" + # invariant (0 / 0) still has to hold. + dv_table, dv_pa = self._pk_dv_table('explain_split_signals_dv', num_buckets=2) + _write(dv_table, [{'id': i, 'val': i} for i in range(20)], dv_pa) + _write(dv_table, [{'id': i, 'val': i * 10} for i in range(10)], dv_pa) + dv_result = dv_table.new_read_builder().explain() + self.assertEqual(dv_result.split_count, 0) + self.assertEqual(dv_result.splits_all_above_l0, 0) + + # ---- 7. pretty-print smoke ----------------------------------------- + + def test_pretty_print_smoke(self): + table, pa_schema = self._append_table('explain_print_smoke') + _write(table, [{'id': i, 'val': i} for i in range(40)], pa_schema) + + result = table.new_read_builder().explain() + self.assertIsInstance(result, ExplainResult) + printed = str(result) + for anchor in ( + "Snapshot:", + "Splits:", + "raw-convertible:", + "with DV:", + "size/split:", + "Files:", + "Total size:", + ): + self.assertIn(anchor, printed, "missing anchor: " + anchor) + + +if __name__ == '__main__': + unittest.main() From 6fab9b3775e33d5b2ace9db4b1a24e7bc7027e76 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 15 May 2026 21:52:11 +0800 Subject: [PATCH 3/7] [python] Document ReadBuilder.explain() in README --- paimon-python/README.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/paimon-python/README.md b/paimon-python/README.md index e216dc00197c..ac22d7e04030 100644 --- a/paimon-python/README.md +++ b/paimon-python/README.md @@ -31,3 +31,36 @@ pip3 install dist/*.tar.gz The command will install the package and core dependencies to your local Python environment. +# Inspecting query plans + +`ReadBuilder.explain()` returns a structured scan plan so you can see what +a query will actually do — which snapshot it targets, what predicate / +projection / limit were pushed down, how partition / bucket / file-stats +pruning trimmed the scan, and split-level execution signals such as the +raw-convertible ratio, deletion-vector ratio, and split-size skew. + +```python +read_builder = ( + table.new_read_builder() + .with_filter(predicate) + .with_projection(['dt', 'user_id']) + .with_limit(1000) +) + +# Compact layout +print(read_builder.explain()) + +# Verbose layout: also lists every split +print(read_builder.explain(verbose=True)) + +# Programmatic access +result = read_builder.explain() +result.split_count +result.partition_pruning # PruningStat(before=..., after=...) or None +result.splits_raw_convertible +result.split_size_p95 +``` + +`explain()` runs one planning pass (manifest list + manifests only — data +files are never opened). + From b2ad4cef513d49273bc8466cafb70a7fe1727eed Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Sat, 16 May 2026 00:09:05 +0800 Subject: [PATCH 4/7] Trigger CI From 9fd4f9ce5cac45182c468d9deb5c3811173c62c1 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Sat, 16 May 2026 00:58:45 +0800 Subject: [PATCH 5/7] Trigger CI From 4f3b46da638e0f1599085a725ecd707313ca1f3b Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Sat, 16 May 2026 13:44:28 +0800 Subject: [PATCH 6/7] [python] Remove explain() docs from README per review feedback --- paimon-python/README.md | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/paimon-python/README.md b/paimon-python/README.md index ac22d7e04030..e216dc00197c 100644 --- a/paimon-python/README.md +++ b/paimon-python/README.md @@ -31,36 +31,3 @@ pip3 install dist/*.tar.gz The command will install the package and core dependencies to your local Python environment. -# Inspecting query plans - -`ReadBuilder.explain()` returns a structured scan plan so you can see what -a query will actually do — which snapshot it targets, what predicate / -projection / limit were pushed down, how partition / bucket / file-stats -pruning trimmed the scan, and split-level execution signals such as the -raw-convertible ratio, deletion-vector ratio, and split-size skew. - -```python -read_builder = ( - table.new_read_builder() - .with_filter(predicate) - .with_projection(['dt', 'user_id']) - .with_limit(1000) -) - -# Compact layout -print(read_builder.explain()) - -# Verbose layout: also lists every split -print(read_builder.explain(verbose=True)) - -# Programmatic access -result = read_builder.explain() -result.split_count -result.partition_pruning # PruningStat(before=..., after=...) or None -result.splits_raw_convertible -result.split_size_p95 -``` - -`explain()` runs one planning pass (manifest list + manifests only — data -files are never opened). - From 1e6fc2f6e135b7ce0f3b03c3fdabe557693ad54e Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Mon, 18 May 2026 15:55:09 +0800 Subject: [PATCH 7/7] [python] Document ReadBuilder.explain() in Paimon docs site Add an "Explain Scan Plan" subsection to the PyPaimon Python API page (under Batch Read) covering the ReadBuilder.explain() method introduced in this PR. The section shows the structured scan plan output, how to interpret the pruning funnel and split-shape signals, the verbose=True mode for per-split listings, and a hint block describing the cost relative to a regular new_scan().plan(). Replaces the README-based docs removed earlier in this PR per review feedback, moving the user-facing documentation onto the Apache Paimon docs site as requested. --- docs/content/pypaimon/python-api.md | 66 +++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/docs/content/pypaimon/python-api.md b/docs/content/pypaimon/python-api.md index 50a0117891e2..e83e1fa5067b 100644 --- a/docs/content/pypaimon/python-api.md +++ b/docs/content/pypaimon/python-api.md @@ -594,6 +594,72 @@ Key points about shard read: - **Parallel Processing**: Each shard can be processed independently for better performance - **Consistency**: Combining all shards should produce the complete table data +### Explain Scan Plan + +`ReadBuilder.explain()` returns a structured view of the scan plan without reading any data files. It is useful for understanding which splits a query will produce, how aggressively the pushdown pruned the input, and whether the resulting splits can be read on the zero-copy fast path. + +```python +table = catalog.get_table('default.events') +read_builder = table.new_read_builder() +predicate_builder = read_builder.new_predicate_builder() +read_builder = read_builder.with_filter(predicate_builder.equal('dt', '2026-05-16')) +print(read_builder.explain()) + +# == PyPaimon Scan Plan == +# Table: default.events (PK, HASH_FIXED) +# Snapshot: 1 (schema 0) +# Predicate: dt = '2026-05-16' +# Projection: +# Limit: +# +# Partition pruning: 12 -> 4 (pruned 8) +# Bucket pruning: 4 -> 4 (pruned 0) +# File skipping: 4 -> 4 (pruned 0) +# +# Splits: 4 +# raw-convertible: 4 / 4 +# with DV: 0 / 4 +# all-above-L0: 0 / 4 +# files/split: min=1 max=1 avg=1.00 +# size/split: min=2.8 KiB p50=2.9 KiB p95=3.0 KiB max=3.0 KiB +# +# Files: 4 +# Total size: 11.6 KiB +# Estimated rows: 20 (merged: 20) +# Level histogram: L0=4 +# Deletion files: 0 +``` + +Pass `verbose=True` to also list every split with its partition, bucket, file count, size, level histogram, and file paths: + +```python +print(read_builder.explain(verbose=True)) + +# ... +# +# Splits[] +# [0] partition={'dt': '2026-05-16'} bucket=3 files=1 size=2.9 KiB rows=4 raw=True dv=False +# levels: L0=1 +# file: /warehouse/default.db/events/dt=2026-05-16/bucket-3/data-...parquet +# [1] partition={'dt': '2026-05-16'} bucket=2 files=1 size=2.8 KiB rows=2 raw=True dv=False +# levels: L0=1 +# file: /warehouse/default.db/events/dt=2026-05-16/bucket-2/data-...parquet +# ... +``` + +What the fields tell you: + +- **Pushdown** (`Predicate` / `Projection` / `Limit`): exactly what the reader sees after `with_filter` / `with_projection` / `with_limit`. +- **Pruning funnel** (`Partition pruning` / `Bucket pruning` / `File skipping`): three `before -> after` counts that show at which stage the predicate paid off. `n/a` means the stage did not apply — for example, bucket pruning is reported for HASH_FIXED tables where every bucket key is pinned by the predicate, and for POSTPONE_BUCKET tables that skip their synthetic-bucket entries. +- **Split shape**: `raw-convertible` counts splits that can be read zero-copy (no merge, no deletion-vector apply); `with DV` counts splits whose files need a deletion vector applied; `all-above-L0` counts splits whose data lives entirely on L1+, i.e. the merge pipeline can skip the L0 buffer. +- **File aggregates**: total file size + estimated rows (with the post-merge row estimate for primary-key tables in parentheses), plus a level histogram of where the data sits. + +{{< hint info >}} +**Cost**: `explain()` reads the manifest list and manifest files but does not open any data files. It suppresses the manifest-reader's early bucket filter and forces single-threaded manifest decoding so the before/after counters are accurate. On tables where the early filter usually prunes aggressively (e.g. very wide HASH_FIXED tables with a tight predicate), this can make `explain()` measurably slower than a regular `new_scan().plan()`. +{{< /hint >}} + +`ExplainResult` is a plain dataclass — alongside the human-readable `__str__` shown above, every field (`partition_pruning`, `bucket_pruning`, `file_skipping`, `split_count`, `splits_raw_convertible`, `level_histogram`, `splits`, ...) is addressable in Python for programmatic use. + ## Rollback Paimon supports rolling back a table to a previous snapshot or tag. This is useful for undoing unwanted changes or