Skip to content

[feature] support python scalar udf:video_snapshot for video#336

Merged
JingsongLi merged 7 commits into
apache:mainfrom
jerry-024:python-scalar-udf-bridge
May 21, 2026
Merged

[feature] support python scalar udf:video_snapshot for video#336
JingsongLi merged 7 commits into
apache:mainfrom
jerry-024:python-scalar-udf-bridge

Conversation

@jerry-024
Copy link
Copy Markdown
Contributor

@jerry-024 jerry-024 commented May 20, 2026

Purpose

This pull request adds Python scalar UDF support for the PyPaimon Rust DataFusion SQL context and uses that bridge to expose a built-in video_snapshot(video[, timestamp_ms]) SQL function.

The generic API follows the DataFusion Python shape:

from pypaimon_rust.datafusion import PythonScalarUDF, SQLContext, udf

scalar_udf = udf(func, [pyarrow.int64()], pyarrow.int64(), "volatile", "plus_ten")
ctx.register_udf(scalar_udf)

SQLContext() auto-registers the video_snapshot built-in once at session creation time, so callers can use it directly from SQL:

ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})

batches = ctx.sql("""
    SELECT id, video_snapshot(video) AS cover_png
    FROM paimon.default.videos
""")

The optional second argument follows the OSS video/snapshot,t_<ms> shape: it is a millisecond timestamp and defaults to 0, so video_snapshot(video) captures the first decodable video frame, while video_snapshot(video, 5000) captures around 5 seconds. If the requested timestamp is beyond the video duration, the function returns the last decodable frame.

Built-in registration is best-effort during SQLContext() construction: if the package's own built-in module cannot be imported or registered, construction still succeeds and emits a RuntimeWarning. PyAV/Pillow are still imported lazily when video_snapshot(...) executes, not during construction.

video_snapshot accepts a binary video value. If the value is a serialized Paimon BlobDescriptor, it opens the referenced uri with a seekable/range-readable stream and restricts reads to the descriptor offset/length. Otherwise, it treats the binary value as inline video bytes. The current built-in emits PNG bytes for videos and returns NULL for still images or other values it cannot process as video.

For real Paimon tables, the DataFusion table provider registers the table's FileIO and table location prefix in a session-scoped blob reader registry. The built-in first tries that registry, using the canonical Rust BlobDescriptor parser and the same table FileIO that the scan uses. This lets BlobDescriptor reads reuse catalog/table storage configuration, including object-store options supported by the table FileIO such as OSS/S3 credentials. If no registered table location prefix matches a BlobDescriptor URI, the row returns NULL; Python does not directly open descriptor URIs.

Decode/IO failures are logged as warnings and return NULL for that row. Missing PyAV/Pillow still raises so dependency problems are visible. The v1 implementation decodes rows serially in Python, so this built-in is intended for cover/snapshot extraction after filtering to the desired rows rather than large unbounded scans.

video_snapshot Usage

video_snapshot is a SQL built-in registered automatically by SQLContext().

-- Capture the first decodable video frame. Equivalent to OSS video/snapshot,t_0.
SELECT id, video_snapshot(video) AS cover_png
FROM paimon.default.videos;

-- Capture around 5 seconds. The timestamp argument is in milliseconds.
SELECT id, video_snapshot(video, 5000) AS cover_png
FROM paimon.default.videos;

-- Timestamp columns are supported as INT or BIGINT.
SELECT id, video_snapshot(video, snapshot_ms) AS cover_png
FROM paimon.default.videos;

Behavior:

  • Signature: video_snapshot(video BINARY) -> BINARY and video_snapshot(video BINARY, timestamp_ms INT/BIGINT) -> BINARY.
  • The return value is PNG bytes.
  • timestamp_ms defaults to 0, matching the OSS video/snapshot,t_<ms> shape.
  • If timestamp_ms is beyond the video duration, the function returns the last decodable frame.
  • If video is a Paimon BlobDescriptor, the function first reads it through the table FileIO registered by the DataFusion table provider when the descriptor URI is under a registered table location prefix, so catalog/table storage credentials such as OSS/S3 options are reused.
  • If a BlobDescriptor URI does not resolve through a registered table location prefix, the function returns NULL; there is no direct Python HTTP/file URI fallback.
  • If video is not a BlobDescriptor, the function treats the binary value as inline video bytes.
  • NULL video, NULL timestamp, negative timestamp, still images, non-video bytes, unreadable blobs, or decode failures return NULL.
  • Missing PyAV/Pillow raises an error because the runtime dependency is not available.
  • Decode work is sequential per input batch in Python in this v1 implementation, so callers should filter/limit rows before calling video_snapshot for large tables.
  • The current implementation always emits PNG; output format, resize/crop, and OSS-style m_fast are left for future extension.

Tests

cargo fmt --check -p paimon-datafusion -p pypaimon_rust
cargo check -p paimon-datafusion -p pypaimon_rust
cargo test -p paimon-datafusion blob_reader::tests::resolves_file_blob_descriptor_with_file_io
uv sync --group dev --no-install-project
uv run --no-sync maturin develop
uv run --no-sync pytest tests/test_datafusion.py -q -k "video_snapshot or blob_descriptor"
uv run --no-sync pytest tests/test_datafusion.py -q -k "not query_simple_table_via_catalog_provider"

Note: the full test_datafusion.py suite still requires the external /tmp/paimon-warehouse fixture for test_query_simple_table_via_catalog_provider; without that fixture it fails with table 'paimon.default.simple_log_table' not found.

API and Format

Adds Python scalar UDF APIs to pypaimon_rust.datafusion:

  • PythonScalarUDF
  • udf(...)
  • SQLContext.register_udf(...)

Adds BlobDescriptor stream helpers in pypaimon_rust.functions used by the built-in. video_snapshot itself is registered automatically by SQLContext().

The callable receives PyArrow arrays and must return a PyArrow array with the declared return type and matching row count. input_fields and return_field accept PyArrow DataType or Field values; string type names remain accepted for compatibility. If no UDF name is provided, the default name is derived from the callable and sanitized to a SQL-friendly identifier.

No storage format changes.

Dependencies

The built-in imports PyAV/Pillow lazily when video_snapshot(...) executes. The package exposes a video extra and the Python dev test environment includes av and pillow.

@jerry-024 jerry-024 changed the title [feature] support python scalar udf [feature] support register python scalar udf May 20, 2026
@jerry-024 jerry-024 force-pushed the python-scalar-udf-bridge branch from 5cf96de to 3dabbfe Compare May 20, 2026 06:10
Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should take a look to datafusion-python register_udf API.

@jerry-024 jerry-024 force-pushed the python-scalar-udf-bridge branch from 3927ae2 to 633e27c Compare May 20, 2026 08:19
@jerry-024 jerry-024 changed the title [feature] support register python scalar udf [feature] support python scalar udf:first_frame for video May 20, 2026
@jerry-024 jerry-024 force-pushed the python-scalar-udf-bridge branch 6 times, most recently from 1e9d894 to d9a094e Compare May 20, 2026 10:02
@JingsongLi
Copy link
Copy Markdown
Contributor

Find a better name for first_frame.

@jerry-024 jerry-024 force-pushed the python-scalar-udf-bridge branch from d9a094e to eafe6b7 Compare May 20, 2026 10:34
@jerry-024 jerry-024 changed the title [feature] support python scalar udf:first_frame for video [feature] support python scalar udf:video_snapshot for video May 20, 2026
@jerry-024 jerry-024 force-pushed the python-scalar-udf-bridge branch 5 times, most recently from 4c231bc to aeec8b0 Compare May 21, 2026 03:45
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract UDF-related types into udf.rs and blob reader/stream types into blob.rs. Keep context.rs focused on catalog and SQLContext.

@jerry-024 jerry-024 force-pushed the python-scalar-udf-bridge branch from aeec8b0 to 3b8a42c Compare May 21, 2026 05:03
Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit cd9b90d into apache:main May 21, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants