diff --git a/docs/content/pypaimon/python-api.md b/docs/content/pypaimon/python-api.md index 50a0117891e2..e83e1fa5067b 100644 --- a/docs/content/pypaimon/python-api.md +++ b/docs/content/pypaimon/python-api.md @@ -594,6 +594,72 @@ Key points about shard read: - **Parallel Processing**: Each shard can be processed independently for better performance - **Consistency**: Combining all shards should produce the complete table data +### Explain Scan Plan + +`ReadBuilder.explain()` returns a structured view of the scan plan without reading any data files. It is useful for understanding which splits a query will produce, how aggressively the pushdown pruned the input, and whether the resulting splits can be read on the zero-copy fast path. + +```python +table = catalog.get_table('default.events') +read_builder = table.new_read_builder() +predicate_builder = read_builder.new_predicate_builder() +read_builder = read_builder.with_filter(predicate_builder.equal('dt', '2026-05-16')) +print(read_builder.explain()) + +# == PyPaimon Scan Plan == +# Table: default.events (PK, HASH_FIXED) +# Snapshot: 1 (schema 0) +# Predicate: dt = '2026-05-16' +# Projection: +# Limit: +# +# Partition pruning: 12 -> 4 (pruned 8) +# Bucket pruning: 4 -> 4 (pruned 0) +# File skipping: 4 -> 4 (pruned 0) +# +# Splits: 4 +# raw-convertible: 4 / 4 +# with DV: 0 / 4 +# all-above-L0: 0 / 4 +# files/split: min=1 max=1 avg=1.00 +# size/split: min=2.8 KiB p50=2.9 KiB p95=3.0 KiB max=3.0 KiB +# +# Files: 4 +# Total size: 11.6 KiB +# Estimated rows: 20 (merged: 20) +# Level histogram: L0=4 +# Deletion files: 0 +``` + +Pass `verbose=True` to also list every split with its partition, bucket, file count, size, level histogram, and file paths: + +```python +print(read_builder.explain(verbose=True)) + +# ... +# +# Splits[] +# [0] partition={'dt': '2026-05-16'} bucket=3 files=1 size=2.9 KiB rows=4 raw=True dv=False +# levels: L0=1 +# file: /warehouse/default.db/events/dt=2026-05-16/bucket-3/data-...parquet +# [1] partition={'dt': '2026-05-16'} bucket=2 files=1 size=2.8 KiB rows=2 raw=True dv=False +# levels: L0=1 +# file: /warehouse/default.db/events/dt=2026-05-16/bucket-2/data-...parquet +# ... +``` + +What the fields tell you: + +- **Pushdown** (`Predicate` / `Projection` / `Limit`): exactly what the reader sees after `with_filter` / `with_projection` / `with_limit`. +- **Pruning funnel** (`Partition pruning` / `Bucket pruning` / `File skipping`): three `before -> after` counts that show at which stage the predicate paid off. `n/a` means the stage did not apply — for example, bucket pruning is reported for HASH_FIXED tables where every bucket key is pinned by the predicate, and for POSTPONE_BUCKET tables that skip their synthetic-bucket entries. +- **Split shape**: `raw-convertible` counts splits that can be read zero-copy (no merge, no deletion-vector apply); `with DV` counts splits whose files need a deletion vector applied; `all-above-L0` counts splits whose data lives entirely on L1+, i.e. the merge pipeline can skip the L0 buffer. +- **File aggregates**: total file size + estimated rows (with the post-merge row estimate for primary-key tables in parentheses), plus a level histogram of where the data sits. + +{{< hint info >}} +**Cost**: `explain()` reads the manifest list and manifest files but does not open any data files. It suppresses the manifest-reader's early bucket filter and forces single-threaded manifest decoding so the before/after counters are accurate. On tables where the early filter usually prunes aggressively (e.g. very wide HASH_FIXED tables with a tight predicate), this can make `explain()` measurably slower than a regular `new_scan().plan()`. +{{< /hint >}} + +`ExplainResult` is a plain dataclass — alongside the human-readable `__str__` shown above, every field (`partition_pruning`, `bucket_pruning`, `file_skipping`, `split_count`, `splits_raw_convertible`, `level_histogram`, `splits`, ...) is addressable in Python for programmatic use. + ## Rollback Paimon supports rolling back a table to a previous snapshot or tag. This is useful for undoing unwanted changes or diff --git a/paimon-python/pypaimon/read/explain.py b/paimon-python/pypaimon/read/explain.py new file mode 100644 index 000000000000..13db6d41571c --- /dev/null +++ b/paimon-python/pypaimon/read/explain.py @@ -0,0 +1,241 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import io +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +@dataclass +class PruningStat: + """Before / after counters for one pruning stage. + + ``before`` is the input size to the stage, ``after`` is the size that + survived. Either may be ``None`` when the stage did not run (for + example, ``bucket_pruning`` is ``None`` for tables that are not + HASH_FIXED with all bucket keys pinned). + """ + + before: Optional[int] + after: Optional[int] + + @property + def pruned(self) -> Optional[int]: + if self.before is None or self.after is None: + return None + return self.before - self.after + + def format(self) -> str: + if self.before is None and self.after is None: + return "n/a" + before = "?" if self.before is None else str(self.before) + after = "?" if self.after is None else str(self.after) + pruned = self.pruned + suffix = "" if pruned is None else " (pruned {})".format(pruned) + return "{} -> {}{}".format(before, after, suffix) + + +@dataclass +class ExplainSplitInfo: + """Per-split detail surfaced when ``explain(verbose=True)`` is used.""" + + partition: Dict[str, Any] + bucket: int + file_count: int + row_count: int + merged_row_count: Optional[int] + file_size: int + raw_convertible: bool + has_deletion_vectors: bool + level_histogram: Dict[int, int] + deletion_file_count: int + file_paths: List[str] + + +@dataclass +class ExplainResult: + """Structured scan plan returned by ``ReadBuilder.explain()``. + + The compact ``__str__`` shows enough signal to reason about cost: the + snapshot, the pushed-down predicate / projection / limit, the three + pruning before-after counters, split-level shape (raw-convertible + ratio, DV ratio, all-above-L0 ratio, files/split, size/split + distribution), and the file-level totals. ``verbose=True`` adds a + block listing each split. + """ + + # Identity + table_identifier: str + is_primary_key_table: bool + bucket_mode: str + deletion_vectors_enabled: bool + data_evolution_enabled: bool + + # Snapshot + snapshot_id: Optional[int] + schema_id: Optional[int] + + # Pushdown + predicate: Optional[str] = None + projection: Optional[List[str]] = None + limit: Optional[int] = None + + # Pruning (None when not applicable) + partition_pruning: Optional[PruningStat] = None + bucket_pruning: Optional[PruningStat] = None + file_skipping: Optional[PruningStat] = None + + # File-level aggregates over final splits + file_count: int = 0 + total_file_size: int = 0 + estimated_row_count: int = 0 + estimated_merged_row_count: Optional[int] = None + deletion_file_count: int = 0 + level_histogram: Dict[int, int] = field(default_factory=dict) + + # Split-level aggregates (shown in compact mode too) + split_count: int = 0 + splits_raw_convertible: int = 0 + splits_with_deletion_vectors: int = 0 + splits_all_above_l0: int = 0 + files_per_split_min: int = 0 + files_per_split_max: int = 0 + files_per_split_avg: float = 0.0 + split_size_min: int = 0 + split_size_max: int = 0 + split_size_avg: float = 0.0 + split_size_p50: int = 0 + split_size_p95: int = 0 + + # Verbose-only + splits: Optional[List[ExplainSplitInfo]] = None + + def __str__(self) -> str: + return render_explain(self) + + +# --------------------------------------------------------------------------- +# Pretty-print helpers +# --------------------------------------------------------------------------- + +def render_explain(result: ExplainResult) -> str: + out = io.StringIO() + out.write("== PyPaimon Scan Plan ==\n") + + flags = [] + flags.append("PK" if result.is_primary_key_table else "Append") + flags.append(result.bucket_mode) + if result.deletion_vectors_enabled: + flags.append("dv=on") + if result.data_evolution_enabled: + flags.append("data-evolution=on") + _line(out, "Table", "{} ({})".format(result.table_identifier, ", ".join(flags))) + + if result.snapshot_id is None: + _line(out, "Snapshot", " (table is empty or has no snapshot)") + else: + schema_part = "" if result.schema_id is None else " (schema {})".format(result.schema_id) + _line(out, "Snapshot", "{}{}".format(result.snapshot_id, schema_part)) + + _line(out, "Predicate", result.predicate if result.predicate else "") + _line(out, "Projection", + "[{}]".format(", ".join(result.projection)) if result.projection else "") + _line(out, "Limit", str(result.limit) if result.limit is not None else "") + + out.write("\n") + _line(out, "Partition pruning", + result.partition_pruning.format() if result.partition_pruning else "n/a") + _line(out, "Bucket pruning", + result.bucket_pruning.format() if result.bucket_pruning else "n/a") + _line(out, "File skipping", + result.file_skipping.format() if result.file_skipping else "n/a") + + out.write("\n") + _line(out, "Splits", str(result.split_count)) + if result.split_count > 0: + out.write(" raw-convertible: {} / {}\n".format( + result.splits_raw_convertible, result.split_count)) + out.write(" with DV: {} / {}\n".format( + result.splits_with_deletion_vectors, result.split_count)) + out.write(" all-above-L0: {} / {}\n".format( + result.splits_all_above_l0, result.split_count)) + out.write(" files/split: min={} max={} avg={:.2f}\n".format( + result.files_per_split_min, + result.files_per_split_max, + result.files_per_split_avg)) + out.write(" size/split: min={} p50={} p95={} max={}\n".format( + _format_size(result.split_size_min), + _format_size(result.split_size_p50), + _format_size(result.split_size_p95), + _format_size(result.split_size_max))) + + out.write("\n") + _line(out, "Files", str(result.file_count)) + _line(out, "Total size", _format_size(result.total_file_size)) + + rows = "{:,}".format(result.estimated_row_count) + if result.estimated_merged_row_count is not None: + rows += " (merged: {:,})".format(result.estimated_merged_row_count) + _line(out, "Estimated rows", rows) + + if result.level_histogram: + levels = sorted(result.level_histogram.items()) + levels_str = " ".join("L{}={}".format(lv, cnt) for lv, cnt in levels) + _line(out, "Level histogram", levels_str) + _line(out, "Deletion files", str(result.deletion_file_count)) + + if result.splits: + out.write("\nSplits[]\n") + for idx, split in enumerate(result.splits): + out.write(" [{}] partition={} bucket={} files={} size={} rows={} raw={} dv={}\n".format( + idx, + split.partition, + split.bucket, + split.file_count, + _format_size(split.file_size), + split.row_count, + split.raw_convertible, + split.has_deletion_vectors, + )) + if split.level_histogram: + levels = sorted(split.level_histogram.items()) + out.write(" levels: {}\n".format( + " ".join("L{}={}".format(lv, cnt) for lv, cnt in levels))) + for path in split.file_paths: + out.write(" file: {}\n".format(path)) + + return out.getvalue().rstrip("\n") + + +def _line(out: io.StringIO, label: str, value: str) -> None: + out.write("{:<19} {}\n".format(label + ":", value)) + + +_SIZE_UNITS = ("B", "KiB", "MiB", "GiB", "TiB", "PiB") + + +def _format_size(num_bytes: int) -> str: + if num_bytes is None: + return "?" + size = float(num_bytes) + for unit in _SIZE_UNITS: + if size < 1024.0 or unit == _SIZE_UNITS[-1]: + if unit == "B": + return "{:d} {}".format(int(size), unit) + return "{:.1f} {}".format(size, unit) + size /= 1024.0 + return "{:.1f} {}".format(size, _SIZE_UNITS[-1]) diff --git a/paimon-python/pypaimon/read/explain_render.py b/paimon-python/pypaimon/read/explain_render.py new file mode 100644 index 000000000000..2e09da842a64 --- /dev/null +++ b/paimon-python/pypaimon/read/explain_render.py @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any + +from pypaimon.common.predicate import Predicate + + +_BINARY_OPS = { + 'equal': '=', + 'notEqual': '!=', + 'lessThan': '<', + 'lessOrEqual': '<=', + 'greaterThan': '>', + 'greaterOrEqual': '>=', +} + + +def render_predicate(predicate: Predicate) -> str: + """Render a :class:`Predicate` tree as a human-readable string. + + The renderer relies only on the existing ``method`` / ``field`` / + ``literals`` shape and never mutates the predicate. Used by + ``ReadBuilder.explain()`` and intentionally lives outside + :mod:`pypaimon.common.predicate` so the predicate module stays + rendering-agnostic. + """ + if predicate is None: + return "" + method = predicate.method + field = predicate.field + literals = predicate.literals + + if method == 'and': + return _join_children(literals, 'AND') + if method == 'or': + return _join_children(literals, 'OR') + if method in _BINARY_OPS: + return "{} {} {}".format(field, _BINARY_OPS[method], _format_literal(literals[0])) + if method == 'in': + return "{} IN [{}]".format(field, ", ".join(_format_literal(v) for v in literals)) + if method == 'notIn': + return "{} NOT IN [{}]".format(field, ", ".join(_format_literal(v) for v in literals)) + if method == 'between': + return "{} BETWEEN {} AND {}".format( + field, _format_literal(literals[0]), _format_literal(literals[1])) + if method == 'notBetween': + return "{} NOT BETWEEN {} AND {}".format( + field, _format_literal(literals[0]), _format_literal(literals[1])) + if method == 'isNull': + return "{} IS NULL".format(field) + if method == 'isNotNull': + return "{} IS NOT NULL".format(field) + if method == 'startsWith': + return "{} STARTSWITH {}".format(field, _format_literal(literals[0])) + if method == 'endsWith': + return "{} ENDSWITH {}".format(field, _format_literal(literals[0])) + if method == 'contains': + return "{} CONTAINS {}".format(field, _format_literal(literals[0])) + if method == 'like': + return "{} LIKE {}".format(field, _format_literal(literals[0])) + return "{}({}{})".format( + method, + field if field is not None else "", + ", " + ", ".join(_format_literal(v) for v in (literals or [])) if literals else "", + ) + + +def _join_children(children, joiner: str) -> str: + parts = [render_predicate(c) for c in (children or [])] + parts = [p for p in parts if p] + if not parts: + return "" + if len(parts) == 1: + return parts[0] + return " {} ".format(joiner).join("({})".format(p) for p in parts) + + +def _format_literal(value: Any) -> str: + if value is None: + return "NULL" + if isinstance(value, str): + return "'{}'".format(value.replace("'", "\\'")) + if isinstance(value, bytes): + return "b'{}'".format(value.hex()) + return repr(value) diff --git a/paimon-python/pypaimon/read/read_builder.py b/paimon-python/pypaimon/read/read_builder.py index 13a951df0b8e..51233856f259 100644 --- a/paimon-python/pypaimon/read/read_builder.py +++ b/paimon-python/pypaimon/read/read_builder.py @@ -19,6 +19,10 @@ from pypaimon.common.predicate import Predicate from pypaimon.common.predicate_builder import PredicateBuilder +from pypaimon.read.explain import ExplainResult, ExplainSplitInfo, PruningStat +from pypaimon.read.explain_render import render_predicate +from pypaimon.read.scan_stats import ScanStats +from pypaimon.read.split import Split from pypaimon.read.table_read import TableRead from pypaimon.read.table_scan import TableScan from pypaimon.schema.data_types import DataField @@ -97,6 +101,41 @@ def _nested_name_paths(self) -> Optional[List[List[str]]]: def new_predicate_builder(self) -> PredicateBuilder: return PredicateBuilder(self.read_type()) + # TODO: surface this through pypaimon's CLI (alongside cli_sql / + # cli_table) so users can run `pypaimon explain ...` against a table + # without writing any Python. + def explain(self, verbose: bool = False) -> ExplainResult: + """Produce a structured scan plan for this builder. + + Runs one planning pass (manifest list + manifest reads, no data + files) and returns an :class:`ExplainResult` summarising the + target snapshot, the pushed-down predicate / projection / limit, + the partition / bucket / file-stats pruning funnel, and split- + level execution signals (raw-convertible ratio, deletion-vector + ratio, level histogram, files-per-split and split-size + distribution). With ``verbose=True``, every split is listed. + + Cost: ``explain()`` reads manifest list + manifests but never + opens data files. To produce accurate before/after counters it + suppresses the manifest-reader's early bucket filter and forces + single-threaded manifest decoding, so it can be measurably + heavier than a regular ``new_scan().plan()`` on tables where the + early filter usually prunes aggressively (e.g. very wide + HASH_FIXED tables with a tight predicate). + """ + scan = self.new_scan() + plan, stats = scan.scan_with_stats() + return _build_explain_result( + table=self.table, + scan=scan, + plan=plan, + stats=stats, + predicate=self._predicate, + projection=self._projection, + limit=self._limit, + verbose=verbose, + ) + def read_type(self) -> List[DataField]: table_fields = self.table.fields @@ -158,3 +197,179 @@ def _resolve_dotted_paths(self, names: List[str]) -> List[List[int]]: if ok: paths.append(path) return paths + + +def _build_explain_result(table, scan: TableScan, plan, stats: ScanStats, + predicate, projection, limit, verbose: bool) -> ExplainResult: + """Translate one (Plan, ScanStats) pair into an ExplainResult.""" + splits: List[Split] = plan.splits() + + table_schema = table.table_schema + bucket_mode_str = _safe_bucket_mode(table) + + partition_pruning = _partition_pruning(stats, scan) + bucket_pruning = _bucket_pruning(stats, scan) + file_skipping = _file_skipping(stats, scan) + + files_per_split = [len(getattr(s, 'files', []) or []) for s in splits] + sizes = [int(getattr(s, 'file_size', 0) or 0) for s in splits] + + rows_total = sum(int(getattr(s, 'row_count', 0) or 0) for s in splits) + merged_per_split = [s.merged_row_count() for s in splits] + if splits and all(v is not None for v in merged_per_split): + merged_total: Optional[int] = sum(merged_per_split) + else: + merged_total = None + + file_count = sum(files_per_split) + total_size = sum(sizes) + level_hist: dict = {} + deletion_file_total = 0 + splits_raw_convertible = 0 + splits_with_dv = 0 + splits_all_above_l0 = 0 + split_infos: List[ExplainSplitInfo] = [] + + for split in splits: + files = getattr(split, 'files', []) or [] + per_split_levels: dict = {} + for f in files: + lv = getattr(f, 'level', 0) or 0 + level_hist[lv] = level_hist.get(lv, 0) + 1 + per_split_levels[lv] = per_split_levels.get(lv, 0) + 1 + dvs = getattr(split, 'data_deletion_files', None) or [] + dv_count_here = sum(1 for d in dvs if d is not None) + deletion_file_total += dv_count_here + has_dv = dv_count_here > 0 + raw = bool(getattr(split, 'raw_convertible', False)) + if raw: + splits_raw_convertible += 1 + if has_dv: + splits_with_dv += 1 + if files and all((getattr(f, 'level', 0) or 0) > 0 for f in files): + splits_all_above_l0 += 1 + + if verbose: + split_infos.append(ExplainSplitInfo( + partition=_format_partition(split, table), + bucket=int(getattr(split, 'bucket', -1)), + file_count=len(files), + row_count=int(getattr(split, 'row_count', 0) or 0), + merged_row_count=split.merged_row_count(), + file_size=int(getattr(split, 'file_size', 0) or 0), + raw_convertible=raw, + has_deletion_vectors=has_dv, + level_histogram=per_split_levels, + deletion_file_count=dv_count_here, + file_paths=list(getattr(split, 'file_paths', []) or []), + )) + + fps_min, fps_max, fps_avg = _min_max_avg(files_per_split) + sz_min, sz_max, sz_avg = _min_max_avg(sizes) + sz_p50 = _percentile(sizes, 50) + sz_p95 = _percentile(sizes, 95) + + return ExplainResult( + table_identifier=str(table.identifier.get_full_name()), + is_primary_key_table=bool(table.is_primary_key_table), + bucket_mode=bucket_mode_str, + deletion_vectors_enabled=bool(table.options.deletion_vectors_enabled()), + data_evolution_enabled=bool(table.options.data_evolution_enabled()), + snapshot_id=plan.snapshot_id, + schema_id=table_schema.id if plan.snapshot_id is not None else None, + predicate=render_predicate(predicate) if predicate is not None else None, + projection=list(projection) if projection else None, + limit=limit, + partition_pruning=partition_pruning, + bucket_pruning=bucket_pruning, + file_skipping=file_skipping, + file_count=file_count, + total_file_size=total_size, + estimated_row_count=rows_total, + estimated_merged_row_count=merged_total, + deletion_file_count=deletion_file_total, + level_histogram=level_hist, + split_count=len(splits), + splits_raw_convertible=splits_raw_convertible, + splits_with_deletion_vectors=splits_with_dv, + splits_all_above_l0=splits_all_above_l0, + files_per_split_min=fps_min, + files_per_split_max=fps_max, + files_per_split_avg=fps_avg, + split_size_min=sz_min, + split_size_max=sz_max, + split_size_avg=sz_avg, + split_size_p50=sz_p50, + split_size_p95=sz_p95, + splits=split_infos if verbose else None, + ) + + +def _partition_pruning(stats: ScanStats, scan: TableScan) -> Optional[PruningStat]: + if scan.predicate is None: + return None + table_partition_keys = scan.table.partition_keys or [] + if not table_partition_keys: + return None + # ``entries_potential_total`` is the count from manifest-file metadata + # (manifest-level pruning has not been applied yet). The "after" side + # is everything that survived both manifest-stats and per-entry + # partition filters. + return PruningStat( + before=stats.entries_potential_total, + after=stats.entries_after_partition, + ) + + +def _bucket_pruning(stats: ScanStats, scan: TableScan) -> Optional[PruningStat]: + # Visible whenever the scan applies any bucket-level filtering — the + # HASH_FIXED predicate-driven selector OR the POSTPONE_BUCKET + # synthetic-bucket skip. Tables with neither (e.g. BUCKET_UNAWARE + # append) leave this counter as ``None``. + fs = scan.file_scanner + if fs._bucket_selector is None and not fs.only_read_real_buckets: + return None + return PruningStat(before=stats.entries_after_partition, after=stats.entries_after_bucket) + + +def _file_skipping(stats: ScanStats, scan: TableScan) -> Optional[PruningStat]: + # Captures the funnel between bucket-stage survivors and the entries + # that actually feed the split generator. The drop here includes both + # predicate-driven file-stats pruning AND structural skips that fire + # in ``_filter_manifest_entry`` once a file is fully decoded (most + # notably the "do not read level-0 file" rule for DV-enabled PK + # tables, which is an LSM-shape decision rather than a predicate + # test). + if scan.predicate is None: + return None + return PruningStat(before=stats.entries_after_bucket, after=stats.entries_after_stats) + + +def _safe_bucket_mode(table) -> str: + try: + return table.bucket_mode().name + except Exception: + return "UNKNOWN" + + +def _format_partition(split, table) -> dict: + keys = list(table.partition_keys or []) + partition = getattr(split, 'partition', None) + if partition is None or not keys: + return {} + values = getattr(partition, 'values', None) or [] + return {k: v for k, v in zip(keys, values)} + + +def _min_max_avg(values): + if not values: + return 0, 0, 0.0 + return min(values), max(values), sum(values) / float(len(values)) + + +def _percentile(values, pct: int) -> int: + if not values: + return 0 + ordered = sorted(values) + idx = int(round((pct / 100.0) * (len(ordered) - 1))) + return int(ordered[idx]) diff --git a/paimon-python/pypaimon/read/scan_stats.py b/paimon-python/pypaimon/read/scan_stats.py new file mode 100644 index 000000000000..6626b0c240f2 --- /dev/null +++ b/paimon-python/pypaimon/read/scan_stats.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from dataclasses import dataclass, field +from typing import Set, Tuple + + +@dataclass +class ScanStats: + """Counters accumulated by :class:`FileScanner` when it is asked to + track stats for ``ReadBuilder.explain()``. + + The scanner mutates these counters in place; consumers should treat + instances as immutable once the scan returns. Default factory values + keep the dataclass usable both as a blank "no tracking" sentinel and + as a live accumulator. + """ + + manifest_files_total: int = 0 + manifest_files_after_partition: int = 0 + + # ``entries_potential_total`` is the row count we would have processed if no + # filtering occurred — derived from manifest-file metadata before the + # manifest-level partition skip. ``entries_total`` is what actually reached + # ``_filter_manifest_entry``; the gap between the two captures + # manifest-level pruning. + entries_potential_total: int = 0 + entries_total: int = 0 + entries_after_partition: int = 0 + entries_after_bucket: int = 0 + entries_after_stats: int = 0 + + partition_keys_before: Set[Tuple] = field(default_factory=set) + partition_keys_after: Set[Tuple] = field(default_factory=set) + + buckets_seen: Set[Tuple[Tuple, int]] = field(default_factory=set) + buckets_after_pruning: Set[Tuple[Tuple, int]] = field(default_factory=set) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index 2878d411718e..1d2831c194e3 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -35,6 +35,7 @@ from pypaimon.read.push_down_utils import (_get_all_fields, remove_row_id_filter, trim_and_transform_predicate) +from pypaimon.read.scan_stats import ScanStats from pypaimon.read.scanner.append_table_split_generator import \ AppendTableSplitGenerator from pypaimon.read.scanner.bucket_select_converter import \ @@ -210,6 +211,10 @@ def __init__( self._global_index_result = None self._scanned_snapshot = None self._scanned_snapshot_id = None + # Opt-in scan-plan tracking. Stays ``None`` for the read hot path; + # ``scan_with_stats()`` flips it on for a single explain pass and + # the filter callbacks below increment counters when present. + self.scan_stats: Optional[ScanStats] = None # Predicate-driven bucket pruning (HASH_FIXED only). Mirrors Java # BucketSelectConverter. Set on demand and reused across all @@ -345,7 +350,20 @@ def _eval_global_index(self): def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ManifestEntry]: max_workers = self.table.options.scan_manifest_parallelism(os.cpu_count() or 8) + if self.scan_stats is not None: + self.scan_stats.manifest_files_total += len(manifest_files) + # ``num_added_files + num_deleted_files`` is the entry count + # recorded in the manifest-file metadata; combined across + # all input manifest files this is the partition-prune-free + # baseline. The difference against ``entries_total`` reveals + # how much manifest-level pruning saved. + self.scan_stats.entries_potential_total += sum( + f.num_added_files + f.num_deleted_files for f in manifest_files) manifest_files = [entry for entry in manifest_files if self._filter_manifest_file(entry)] + if self.scan_stats is not None: + self.scan_stats.manifest_files_after_partition += len(manifest_files) + # Force single-threaded so we can mutate stats without locking. + max_workers = 1 return self.manifest_file_manager.read_entries_parallel( manifest_files, self._filter_manifest_entry, @@ -364,6 +382,11 @@ def _build_early_bucket_filter(self): still happens later in ``_filter_manifest_entry`` once the entry is fully decoded. """ + # explain() needs accurate before/after pruning counters; suppress + # the early bucket filter so every entry reaches + # ``_filter_manifest_entry`` where each rejection stage is counted. + if self.scan_stats is not None: + return None only_real = self.only_read_real_buckets selector = self._bucket_selector if not only_real and selector is None: @@ -402,6 +425,19 @@ def with_global_index_result(self, result) -> 'FileScanner': self._global_index_result = result return self + def scan_with_stats(self) -> Tuple[Plan, ScanStats]: + """Run one scan pass while recording :class:`ScanStats` counters. + + Side-effects: forces single-thread manifest reads and disables the + early bucket filter so every entry reaches + ``_filter_manifest_entry`` exactly once. The scanner is one-shot + in this mode — call ``scan()`` on a fresh instance afterwards if + you need the regular hot path. + """ + self.scan_stats = ScanStats() + plan = self.scan() + return plan, self.scan_stats + def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: """Mirror Java ``DataTableBatchScan.applyPushDownLimit``: sum the DV-aware ``merged_row_count`` (== Java ``Split.mergedRowCount()``) @@ -498,9 +534,30 @@ def _init_bucket_selector(self): ) def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: - # Redundant safety net: the early filter in the manifest reader - # already enforces these, but guard here too so this method is - # self-contained if called outside read_entries_parallel. + stats = self.scan_stats + if stats is not None: + stats.entries_total += 1 + partition_key = tuple(entry.partition.values) + stats.partition_keys_before.add(partition_key) + stats.buckets_seen.add((partition_key, entry.bucket)) + # Stage 1: partition predicate. The early manifest-reader filter + # only sees ``(bucket, total_buckets)`` and never enforces + # partition predicates, so this check is the sole partition gate + # at the entry level — not a "redundant safety net". + if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition): + return False + if stats is not None: + stats.entries_after_partition += 1 + stats.partition_keys_after.add(partition_key) + # Stage 2: bucket rejection. Two reasons land here: + # * ``only_read_real_buckets`` drops the synthetic + # POSTPONE_BUCKET bucket id (also enforced by the early + # filter when present; kept here so the method is correct + # standalone). + # * ``_bucket_selector`` is the HASH_FIXED predicate-driven + # selector built by ``_init_bucket_selector``. + # Both are accounted for under ``entries_after_bucket`` so the + # explain funnel reports bucket-level pruning end-to-end. if self.only_read_real_buckets and entry.bucket < 0: return False if (self._bucket_selector is not None @@ -508,8 +565,9 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: and not self._bucket_selector( entry.partition, entry.bucket, entry.total_buckets)): return False - if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition): - return False + if stats is not None: + stats.entries_after_bucket += 1 + stats.buckets_after_pruning.add((partition_key, entry.bucket)) # Get SimpleStatsEvolution for this schema evolution = self.simple_stats_evolutions.get_or_create(entry.file.schema_id) @@ -540,14 +598,18 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: entry.file.row_count ): return False + if stats is not None: + stats.entries_after_stats += 1 return True else: - if not self.predicate: - return True - if self.predicate_for_stats is None: + if not self.predicate or self.predicate_for_stats is None: + if stats is not None: + stats.entries_after_stats += 1 return True # Data evolution: file stats may be from another schema, skip stats filter and filter in reader. if self.data_evolution: + if stats is not None: + stats.entries_after_stats += 1 return True if entry.file.value_stats_cols is None and entry.file.write_cols is not None: stats_fields = entry.file.write_cols @@ -558,10 +620,13 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: entry.file.row_count, stats_fields ) - return self.predicate_for_stats.test_by_simple_stats( + kept = self.predicate_for_stats.test_by_simple_stats( evolved_stats, entry.file.row_count ) + if kept and stats is not None: + stats.entries_after_stats += 1 + return kept def _scan_dv_index(self, snapshot, buckets: Set[tuple]) -> Dict[tuple, Dict[str, DeletionFile]]: """ diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index d4ac6cfe16b9..bc610134e0d0 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -15,12 +15,13 @@ # specific language governing permissions and limitations # under the License. -from typing import Optional +from typing import Optional, Tuple from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.read.plan import Plan +from pypaimon.read.scan_stats import ScanStats from pypaimon.read.scanner.file_scanner import FileScanner from pypaimon.manifest.manifest_list_manager import ManifestListManager @@ -44,6 +45,14 @@ def __init__( def plan(self) -> Plan: return self.file_scanner.scan() + def scan_with_stats(self) -> Tuple[Plan, ScanStats]: + """Run :meth:`plan` while recording manifest / pruning counters. + + Only used by :meth:`ReadBuilder.explain`; the regular read path + keeps going through :meth:`plan`. + """ + return self.file_scanner.scan_with_stats() + def _create_file_scanner(self) -> FileScanner: options = self.table.options.options snapshot_manager = self.table.snapshot_manager() diff --git a/paimon-python/pypaimon/tests/read_builder_explain_test.py b/paimon-python/pypaimon/tests/read_builder_explain_test.py new file mode 100644 index 000000000000..82b0f0387f8c --- /dev/null +++ b/paimon-python/pypaimon/tests/read_builder_explain_test.py @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Tests for ``ReadBuilder.explain``: pruning funnel counters, +split-level execution signals, and pretty-print smoke.""" + +import os +import shutil +import tempfile +import unittest +from typing import Any, Dict, List + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema +from pypaimon.common.predicate import Predicate +from pypaimon.read.explain import ExplainResult +from pypaimon.read.explain_render import render_predicate + + +def _write(table, rows: List[Dict], pa_schema: pa.Schema) -> None: + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + + +class ReadBuilderExplainTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', False) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + # ---- helpers -------------------------------------------------------- + + def _append_table(self, name: str) -> Any: + pa_schema = pa.schema([ + pa.field('id', pa.int64(), nullable=False), + ('val', pa.int64()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={'bucket': '-1', 'file.format': 'parquet'}, + ) + full = 'default.{}'.format(name) + self.catalog.create_table(full, schema, False) + return self.catalog.get_table(full), pa_schema + + def _pk_partitioned_bucketed_table(self, name: str, num_buckets: int = 4) -> Any: + pa_schema = pa.schema([ + pa.field('dt', pa.string(), nullable=False), + pa.field('id', pa.int64(), nullable=False), + ('val', pa.int64()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=['dt'], + primary_keys=['dt', 'id'], + options={ + 'bucket': str(num_buckets), + 'bucket-key': 'id', + 'file.format': 'parquet', + }, + ) + full = 'default.{}'.format(name) + self.catalog.create_table(full, schema, False) + return self.catalog.get_table(full), pa_schema + + def _pk_dv_table(self, name: str, num_buckets: int = 2) -> Any: + pa_schema = pa.schema([ + pa.field('id', pa.int64(), nullable=False), + ('val', pa.int64()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + primary_keys=['id'], + options={ + 'bucket': str(num_buckets), + 'file.format': 'parquet', + 'deletion-vectors.enabled': 'true', + }, + ) + full = 'default.{}'.format(name) + self.catalog.create_table(full, schema, False) + return self.catalog.get_table(full), pa_schema + + # ---- 1. append-only baseline --------------------------------------- + + def test_explain_append_only_no_predicate(self): + table, pa_schema = self._append_table('explain_basic') + _write(table, [{'id': i, 'val': i * 2} for i in range(20)], pa_schema) + _write(table, [{'id': i, 'val': i * 3} for i in range(20, 40)], pa_schema) + + rb = table.new_read_builder() + result = rb.explain() + + plan_splits = rb.new_scan().plan().splits() + self.assertEqual(result.split_count, len(plan_splits)) + self.assertGreater(result.file_count, 0) + self.assertGreater(result.total_file_size, 0) + self.assertIsNone(result.partition_pruning) + self.assertIsNone(result.bucket_pruning) + self.assertIsNone(result.file_skipping) + self.assertIsNone(result.predicate) + + # ---- 2. PK + partition + bucket pruning ---------------------------- + + def test_explain_pk_table_with_partition_and_bucket_predicate(self): + table, pa_schema = self._pk_partitioned_bucketed_table( + 'explain_pk_pruning', num_buckets=4) + for day in ['2026-05-01', '2026-05-02', '2026-05-03', '2026-05-04']: + _write( + table, + [{'dt': day, 'id': i, 'val': i + 1} for i in range(20)], + pa_schema, + ) + + rb = table.new_read_builder() + pb = rb.new_predicate_builder() + pred = pb.and_predicates([ + pb.equal('dt', '2026-05-01'), + pb.equal('id', 7), + ]) + rb = rb.with_filter(pred) + result = rb.explain() + + self.assertIsNotNone(result.partition_pruning) + self.assertIsNotNone(result.bucket_pruning) + self.assertIsNotNone(result.file_skipping) + self.assertGreater( + result.partition_pruning.before, result.partition_pruning.after, + "partition predicate must drop at least one entry") + self.assertGreater( + result.bucket_pruning.before, result.bucket_pruning.after, + "HASH_FIXED bucket pruning must drop at least one entry") + + # Cross-check against the actual scan + plan_splits = rb.new_scan().plan().splits() + self.assertEqual(result.split_count, len(plan_splits)) + + # ---- 3. predicate rendering ---------------------------------------- + + def test_render_predicate_shapes(self): + eq = Predicate(method='equal', index=0, field='dt', literals=['2026-05-01']) + self.assertEqual(render_predicate(eq), "dt = '2026-05-01'") + + in_p = Predicate(method='in', index=1, field='id', literals=[1, 2, 3]) + self.assertEqual(render_predicate(in_p), "id IN [1, 2, 3]") + + between = Predicate(method='between', index=1, field='id', literals=[5, 10]) + self.assertEqual(render_predicate(between), "id BETWEEN 5 AND 10") + + is_null = Predicate(method='isNull', index=0, field='val', literals=None) + self.assertEqual(render_predicate(is_null), "val IS NULL") + + and_p = Predicate(method='and', index=None, field=None, literals=[eq, in_p]) + self.assertEqual( + render_predicate(and_p), + "(dt = '2026-05-01') AND (id IN [1, 2, 3])") + + or_p = Predicate(method='or', index=None, field=None, literals=[between, is_null]) + self.assertEqual( + render_predicate(or_p), + "(id BETWEEN 5 AND 10) OR (val IS NULL)") + + # ---- 4. verbose split detail --------------------------------------- + + def test_explain_verbose_lists_per_split_detail(self): + table, pa_schema = self._append_table('explain_verbose') + _write(table, [{'id': i, 'val': i} for i in range(50)], pa_schema) + _write(table, [{'id': i, 'val': i} for i in range(50, 100)], pa_schema) + + rb = table.new_read_builder() + result = rb.explain(verbose=True) + self.assertIsNotNone(result.splits) + self.assertEqual(len(result.splits), result.split_count) + + plan_splits = rb.new_scan().plan().splits() + for explained, actual in zip(result.splits, plan_splits): + self.assertEqual(explained.bucket, actual.bucket) + self.assertEqual(explained.file_count, len(actual.files)) + self.assertEqual(set(explained.file_paths), set(actual.file_paths)) + + # ---- 5. empty snapshot path ---------------------------------------- + + def test_explain_empty_snapshot(self): + table, _ = self._append_table('explain_empty') + + rb = table.new_read_builder() + result = rb.explain() + self.assertIsNone(result.snapshot_id) + self.assertEqual(result.split_count, 0) + self.assertEqual(result.file_count, 0) + self.assertIn("", str(result)) + + # ---- 6. split-level metrics: DV vs append-only --------------------- + + def test_explain_split_level_metrics(self): + # Append-only: every split is raw-convertible, no deletion vectors. + table, pa_schema = self._append_table('explain_split_signals_ap') + _write(table, [{'id': i, 'val': i} for i in range(30)], pa_schema) + _write(table, [{'id': i, 'val': i} for i in range(30, 60)], pa_schema) + result = table.new_read_builder().explain() + self.assertGreater(result.split_count, 0) + self.assertEqual(result.splits_with_deletion_vectors, 0) + self.assertEqual(result.splits_raw_convertible, result.split_count) + # All written files are at L0, and append-only doesn't filter them. + self.assertIn(0, result.level_histogram) + self.assertEqual(result.splits_all_above_l0, 0) + + # DV-enabled PK table: pypaimon writes alone don't trigger compaction, + # so every file stays at L0. ``_filter_manifest_entry`` then drops + # them all, leaving an empty plan. The vacuous "all above L0" + # invariant (0 / 0) still has to hold. + dv_table, dv_pa = self._pk_dv_table('explain_split_signals_dv', num_buckets=2) + _write(dv_table, [{'id': i, 'val': i} for i in range(20)], dv_pa) + _write(dv_table, [{'id': i, 'val': i * 10} for i in range(10)], dv_pa) + dv_result = dv_table.new_read_builder().explain() + self.assertEqual(dv_result.split_count, 0) + self.assertEqual(dv_result.splits_all_above_l0, 0) + + # ---- 7. pretty-print smoke ----------------------------------------- + + def test_pretty_print_smoke(self): + table, pa_schema = self._append_table('explain_print_smoke') + _write(table, [{'id': i, 'val': i} for i in range(40)], pa_schema) + + result = table.new_read_builder().explain() + self.assertIsInstance(result, ExplainResult) + printed = str(result) + for anchor in ( + "Snapshot:", + "Splits:", + "raw-convertible:", + "with DV:", + "size/split:", + "Files:", + "Total size:", + ): + self.assertIn(anchor, printed, "missing anchor: " + anchor) + + +if __name__ == '__main__': + unittest.main()