Skip to content

[python] Add parallel split reading to to_pandas / to_arrow#7870

Open
TheR1sing3un wants to merge 2 commits into
apache:masterfrom
TheR1sing3un:py-pypaimon-parallel-to-pandas
Open

[python] Add parallel split reading to to_pandas / to_arrow#7870
TheR1sing3un wants to merge 2 commits into
apache:masterfrom
TheR1sing3un:py-pypaimon-parallel-to-pandas

Conversation

@TheR1sing3un
Copy link
Copy Markdown
Member

Purpose

Today TableRead.to_pandas / to_arrow iterate splits serially in _arrow_batch_generator, so wall time scales linearly with the number of splits even though PyArrow's parquet/orc readers release the GIL during decode. Unlike Java, where Flink/Spark fan splits out across TaskManagers/Executors, PyPaimon has no external framework above the SDK; split-level parallelism therefore has to live inside the SDK.

This PR adds an opt-in max_workers parameter to to_pandas / to_arrow. Default behavior is unchanged.

Linked issue

N/A — direct contribution.

API

read.to_arrow(splits, max_workers=4)
read.to_pandas(splits, max_workers=4)
  • max_workers=None (default) or 1 → original serial path, no thread pool created.
  • >= 2 with at least 2 splits → ThreadPoolExecutor runs splits concurrently; the final Table is assembled in the input splits' order (results collected by submission index).
  • < 1ValueError.

Other to_* methods (to_arrow_batch_reader, to_iterator, to_duckdb, to_ray, to_torch) are intentionally untouched — their order-preserving / streaming semantics deserve a separate look.

Correctness under limit

_RemainingRows is a thread-safe row-quota counter shared by all workers. Quota is pre-debited under a single lock so the combined output never exceeds self.limit, even if individual readers decode one extra batch after the quota is gone (the surplus batch is simply dropped, never emitted).

Resource handling

Each worker uses try/finally: reader.close(). ThreadPoolExecutor's wait-on-exit guarantees every started reader is closed before to_arrow returns, even when one worker raises and propagates its exception.

Tests

Added paimon-python/pypaimon/tests/reader_parallel_test.py (16 tests):

  • _RemainingRows: unbounded, bounded pre-debit, zero-request, 8-thread contention.
  • Append-only multi-partition: parallel result is byte-equal to serial.
  • PK merge-on-read multi-bucket: parallel + serial produce the same merged rows.
  • limit + parallel: 10 repeated runs return exactly the configured row count.
  • Edge cases: empty splits with max_workers=4, max_workers exceeding split count, max_workers=0/-1 rejected, max_workers=1 matches serial, include_row_kind=True parity.
  • Reader error propagation: when one split's create_reader raises, the exception surfaces from to_pandas and sibling readers are cleaned up.

API / format impact

  • API: additive only (one new optional parameter; default preserves existing behavior).
  • Storage / on-disk format: no change.
  • CoreOptions: no new option introduced in this round.

Documentation impact

Docstrings on to_arrow / to_pandas updated. Design doc added at paimon-python/docs/design/2026-05-15-pypaimon-parallel-to-pandas.md. README untouched.

Generative AI disclosure

Yes — the implementation, tests, and design doc were drafted with Claude Code assistance under my direction and review.

@TheR1sing3un TheR1sing3un force-pushed the py-pypaimon-parallel-to-pandas branch from 9fe7ec1 to b762e0a Compare May 15, 2026 16:06
def to_arrow(
self,
splits: List[Split],
max_workers: Optional[int] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is better to provide an table option read.parallelism?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is better to provide an table option read.parallelism?

A very good suggestion. I added this option.
By the way, since this option is a table option, and for read apis like to_pandas/to_arrow, is it still necessary to retain a parameter to override the parallelism of the table parameter? I have currently provided this parameter. If you think it's not necessary, I can remove it again.

Today TableRead.to_pandas / to_arrow iterate splits serially in
_arrow_batch_generator, so wall time scales linearly with the number
of splits even though PyArrow's parquet/orc readers release the GIL
during decode. Unlike Java, where Flink/Spark fan splits out across
TaskManagers/Executors, PyPaimon has no external framework above the
SDK; split-level parallelism therefore has to live inside the SDK.

This commit adds an opt-in dual-track API for split-level parallelism:

  1. A new table option `read.parallelism` (default 1) sets the
     persistent default for a table:
     options={'read.parallelism': '4'}.

  2. A new method argument `parallelism` on to_pandas / to_arrow
     temporarily overrides the option for a single call:
     read.to_pandas(splits, parallelism=8).

Priority: method argument > table option > built-in default of 1.
This covers both "configure once, all reads benefit" (option) and
ad-hoc tuning without altering the table schema (argument).

Behavior:
- effective == 1 (default or explicit) keeps the serial path
  unchanged; no thread pool is created.
- effective >= 2 with at least 2 splits runs splits through a
  ThreadPoolExecutor and assembles the final Table in the input
  splits' order (results collected by submission index).
- effective < 1 (from either source) raises ValueError naming
  whichever source produced the value.

Limit pushdown stays correct under parallelism via _RemainingRows, a
thread-safe row-quota counter. Quota is pre-debited under a single
lock so the combined output never exceeds self.limit, even if
individual readers decode one extra batch after the quota is gone -
that batch is simply dropped instead of being emitted.

Reader resource handling matches the serial path: each worker uses
try/finally to close its reader, and ThreadPoolExecutor's wait-on-
exit guarantees every started reader is closed before the call
returns, even when one worker raises.

Other to_* methods (to_arrow_batch_reader, to_iterator, to_duckdb,
to_ray, to_torch) are deliberately not touched in this round - their
order-preserving / streaming semantics deserve a separate look.

Tests cover:
- _RemainingRows correctness under unbounded, bounded, zero-request,
  and 8-thread contention scenarios.
- Append-only multi-partition: parallel via method argument matches
  serial byte-for-byte; parallel via table option also matches.
- Priority matrix: method argument overrides option (both directions),
  option overrides default, explicit 1 keeps serial path.
- PK merge-on-read multi-bucket: parallel + serial produce the same
  merged rows.
- Limit + parallel: 10 repeated runs return exactly the configured
  row count.
- Edge cases: empty splits with parallelism=4, parallelism exceeding
  split count, invalid method argument and invalid option value each
  raise ValueError with a source-specific message.
- Reader error propagation: when one split's create_reader raises,
  the exception surfaces from to_pandas and sibling readers are
  cleaned up.
@TheR1sing3un TheR1sing3un force-pushed the py-pypaimon-parallel-to-pandas branch from b762e0a to 3cf7e53 Compare May 16, 2026 05:53
@TheR1sing3un TheR1sing3un requested a review from JingsongLi May 16, 2026 05:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants