From 118c561136f424c0e024aee017302ab78bf4f1e3 Mon Sep 17 00:00:00 2001 From: KaiqiJinWow Date: Thu, 11 Jun 2026 21:26:16 +0000 Subject: [PATCH] Support range-based reads for deletion vectors --- pyiceberg/io/pyarrow.py | 41 +++++++++++++++++- pyiceberg/manifest.py | 12 +++++ pyiceberg/table/puffin.py | 42 ++++++++++++++++++ tests/io/test_pyarrow.py | 89 ++++++++++++++++++++++++++++++++++++++ tests/table/test_puffin.py | 45 ++++++++++++++++++- 5 files changed, 227 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4ec7a73afe..720e3174d2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -147,7 +147,7 @@ from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping -from pyiceberg.table.puffin import PuffinFile +from pyiceberg.table.puffin import PuffinFile, _bitmaps_to_chunked_array, _deserialize_dv_blob from pyiceberg.transforms import IdentityTransform, TruncateTransform from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion from pyiceberg.types import ( @@ -192,6 +192,8 @@ logger = logging.getLogger(__name__) ONE_MEGABYTE = 1024 * 1024 +# Match Iceberg Java's Integer.MAX_VALUE limit before reading a DV content range into memory. +_MAX_DELETION_VECTOR_CONTENT_SIZE = 2**31 - 1 BUFFER_SIZE = "buffer-size" ICEBERG_SCHEMA = b"iceberg.schema" # The PARQUET: in front means that it is Parquet specific, in this case the field_id @@ -1116,6 +1118,27 @@ def _get_file_format(file_format: FileFormat, **kwargs: dict[str, Any]) -> ds.Fi raise ValueError(f"Unsupported file format: {file_format}") +def _validate_deletion_vector(data_file: DataFile) -> tuple[int, int, str]: + content_offset = getattr(data_file, "content_offset", None) + content_size_in_bytes = getattr(data_file, "content_size_in_bytes", None) + referenced_data_file = getattr(data_file, "referenced_data_file", None) + + if content_offset is None: + raise ValueError(f"Invalid deletion vector, content offset is missing: {data_file.file_path}") + if content_size_in_bytes is None: + raise ValueError(f"Invalid deletion vector, content size is missing: {data_file.file_path}") + if content_offset < 0: + raise ValueError(f"Invalid deletion vector, content offset cannot be negative: {content_offset}") + if content_size_in_bytes < 0: + raise ValueError(f"Invalid deletion vector, content size cannot be negative: {content_size_in_bytes}") + if content_size_in_bytes > _MAX_DELETION_VECTOR_CONTENT_SIZE: + raise ValueError(f"Cannot read deletion vector larger than 2GB: {content_size_in_bytes}") + if referenced_data_file is None: + raise ValueError(f"Invalid deletion vector, referenced data file is missing: {data_file.file_path}") + + return content_offset, content_size_in_bytes, referenced_data_file + + def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]: if data_file.file_format == FileFormat.PARQUET: with io.new_input(data_file.file_path).open() as fi: @@ -1139,6 +1162,22 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray] } elif data_file.file_format == FileFormat.PUFFIN: with io.new_input(data_file.file_path).open() as fi: + content_offset = getattr(data_file, "content_offset", None) + content_size_in_bytes = getattr(data_file, "content_size_in_bytes", None) + if content_offset is not None or content_size_in_bytes is not None: + # A DV is declared as PUFFIN in the manifest, but the content range points directly + # to the serialized bitmap blob, so avoid parsing the entire file as a Puffin file. + content_offset, content_size_in_bytes, referenced_data_file = _validate_deletion_vector(data_file) + + fi.seek(content_offset) + payload = fi.read(content_size_in_bytes) + if len(payload) != content_size_in_bytes: + raise ValueError( + f"Could not read deletion vector, expected {content_size_in_bytes} bytes, got {len(payload)}" + ) + bitmaps = _deserialize_dv_blob(payload, data_file.record_count) + return {referenced_data_file: _bitmaps_to_chunked_array(bitmaps)} + payload = fi.read() return PuffinFile(payload).to_vector() diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 3811a9d894..33cd2b4b66 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -531,6 +531,18 @@ def equality_ids(self) -> list[int] | None: def sort_order_id(self) -> int | None: return self._data[15] + @property + def referenced_data_file(self) -> str | None: + return self._data[17] if len(self._data) > 17 else None + + @property + def content_offset(self) -> int | None: + return self._data[18] if len(self._data) > 18 else None + + @property + def content_size_in_bytes(self) -> int | None: + return self._data[19] if len(self._data) > 19 else None + # Spec ID should not be stored in the file _spec_id: int diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 917d387f45..454400b0fc 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. import math +import struct +import zlib from typing import TYPE_CHECKING, Literal from pydantic import Field @@ -30,6 +32,12 @@ EMPTY_BITMAP = FrozenBitMap() MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" +_DV_BLOB_LENGTH = struct.Struct(">I") +_DV_BLOB_MAGIC = struct.Struct("I") +_DV_BLOB_MAGIC_NUMBER = 1681511377 +_ROARING_BITMAP_COUNT_SIZE_BYTES = 8 +_DV_BLOB_MIN_SIZE_BYTES = _DV_BLOB_LENGTH.size + _DV_BLOB_MAGIC.size + _ROARING_BITMAP_COUNT_SIZE_BYTES + _DV_BLOB_CRC.size def _deserialize_bitmap(pl: bytes) -> list[BitMap]: @@ -62,6 +70,40 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: return bitmaps +def _deserialize_dv_blob(blob: bytes, record_count: int | None = None) -> list[BitMap]: + # The DV blob encoding matches Iceberg Java's BitmapPositionDeleteIndex: + # 4-byte big-endian bitmap-data length, 4-byte little-endian magic number, + # portable Roaring bitmap data, and 4-byte big-endian CRC-32. + if len(blob) < _DV_BLOB_MIN_SIZE_BYTES: + raise ValueError(f"Invalid deletion vector blob length: {len(blob)}") + + bitmap_data_length = _DV_BLOB_LENGTH.unpack_from(blob)[0] + expected_bitmap_data_length = len(blob) - _DV_BLOB_LENGTH.size - _DV_BLOB_CRC.size + if bitmap_data_length != expected_bitmap_data_length: + raise ValueError(f"Invalid bitmap data length: {bitmap_data_length}, expected {expected_bitmap_data_length}") + + bitmap_data_offset = _DV_BLOB_LENGTH.size + crc_offset = bitmap_data_offset + bitmap_data_length + bitmap_data = blob[bitmap_data_offset:crc_offset] + + magic_number = _DV_BLOB_MAGIC.unpack_from(bitmap_data)[0] + if magic_number != _DV_BLOB_MAGIC_NUMBER: + raise ValueError(f"Invalid magic number: {magic_number}, expected {_DV_BLOB_MAGIC_NUMBER}") + + checksum = zlib.crc32(bitmap_data) & 0xFFFFFFFF + expected_checksum = _DV_BLOB_CRC.unpack_from(blob, crc_offset)[0] + if checksum != expected_checksum: + raise ValueError("Invalid CRC") + + bitmaps = _deserialize_bitmap(bitmap_data[_DV_BLOB_MAGIC.size :]) + if record_count is not None: + cardinality = sum(len(bitmap) for bitmap in bitmaps) + if cardinality != record_count: + raise ValueError(f"Invalid cardinality: {cardinality}, expected {record_count}") + + return bitmaps + + class PuffinBlobMetadata(IcebergBaseModel): type: Literal["deletion-vector-v1"] = Field() fields: list[int] = Field() diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 2f36661a1f..38654ef59e 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -15,11 +15,14 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=protected-access,unused-argument,redefined-outer-name +import json import logging import os +import struct import tempfile import uuid import warnings +import zlib from collections.abc import Iterator from datetime import date, datetime, timezone from pathlib import Path @@ -34,6 +37,7 @@ import pytest from packaging import version from pyarrow.fs import AwsDefaultS3RetryStrategy, FileType, LocalFileSystem, S3FileSystem +from pyroaring import BitMap from pyiceberg.exceptions import ResolveError from pyiceberg.expressions import ( @@ -91,6 +95,11 @@ from pyiceberg.table import FileScanTask, TableProperties from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.table.name_mapping import create_mapping_from_schema +from pyiceberg.table.puffin import ( + _DV_BLOB_MAGIC_NUMBER, + MAGIC_BYTES, + PROPERTY_REFERENCED_DATA_FILE, +) from pyiceberg.transforms import HourTransform, IdentityTransform from pyiceberg.typedef import UTF8, Properties, Record, TableVersion from pyiceberg.types import ( @@ -1820,6 +1829,86 @@ def test_read_deletes(deletes_file: str, request: pytest.FixtureRequest) -> None assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]]) +def _deletion_vector_bitmap_payload() -> bytes: + return (1).to_bytes(8, byteorder="little") + (0).to_bytes(4, byteorder="little") + BitMap([1, 3, 5]).serialize() + + +def _deletion_vector_blob(bitmap_payload: bytes) -> bytes: + bitmap_data = struct.pack("I", len(bitmap_data)) + bitmap_data + struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF) + + +def test_read_deletion_vector_from_puffin_file(tmp_path: Path) -> None: + referenced_data_file = f"{tmp_path}/data.parquet" + bitmap_payload = _deletion_vector_bitmap_payload() + footer_payload = json.dumps( + { + "blobs": [ + { + "type": "deletion-vector-v1", + "fields": [2147483546], + "snapshot-id": 1, + "sequence-number": 1, + "offset": 0, + "length": len(bitmap_payload), + "properties": {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file}, + } + ], + "properties": {}, + } + ).encode() + puffin_payload = ( + MAGIC_BYTES + + b"\x00\x00\x00\x00" + + bitmap_payload + + footer_payload + + len(footer_payload).to_bytes(4, byteorder="little") + + b"\x00\x00\x00\x00" + + MAGIC_BYTES + ) + delete_file_path = f"{tmp_path}/deletes.puffin" + + with open(delete_file_path, "wb") as f: + f.write(puffin_payload) + + deletes = _read_deletes( + PyArrowFileIO(), + DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path=delete_file_path, + file_format=FileFormat.PUFFIN, + ), + ) + + assert deletes == {referenced_data_file: pa.chunked_array([[1, 3, 5]])} + + +def test_read_deletion_vector_blob_from_content_range(tmp_path: Path) -> None: + referenced_data_file = f"{tmp_path}/data.parquet" + dv_blob = _deletion_vector_blob(_deletion_vector_bitmap_payload()) + prefix = b"\x01not-a-puffin-file" + delete_file_path = f"{tmp_path}/deletes.bin" + + with open(delete_file_path, "wb") as f: + f.write(prefix + dv_blob + b"trailing-bytes") + + deletes = _read_deletes( + PyArrowFileIO(), + DataFile.from_args( + _table_format_version=3, + content=DataFileContent.POSITION_DELETES, + file_path=delete_file_path, + file_format=FileFormat.PUFFIN, + record_count=3, + referenced_data_file=referenced_data_file, + content_offset=len(prefix), + content_size_in_bytes=len(dv_blob), + ), + ) + + assert deletes == {referenced_data_file: pa.chunked_array([[1, 3, 5]])} + + def test_delete(deletes_file: str, request: pytest.FixtureRequest, table_schema_simple: Schema) -> None: # Determine file format from the file extension file_format = FileFormat.PARQUET if deletes_file.endswith(".parquet") else FileFormat.ORC diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index bf8c82014c..eedd165057 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -14,12 +14,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import struct +import zlib from os import path import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap +from pyiceberg.table.puffin import _DV_BLOB_MAGIC_NUMBER, _deserialize_bitmap, _deserialize_dv_blob def _open_file(file: str) -> bytes: @@ -28,6 +30,47 @@ def _open_file(file: str) -> bytes: return f.read() +def _dv_blob(bitmap_payload: bytes) -> bytes: + bitmap_data = struct.pack("I", len(bitmap_data)) + bitmap_data + struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF) + + +def _bitmap_payload() -> bytes: + return (1).to_bytes(8, byteorder="little") + (0).to_bytes(4, byteorder="little") + BitMap([1, 3, 5]).serialize() + + +def test_deserialize_deletion_vector_blob() -> None: + actual = _deserialize_dv_blob(_dv_blob(_bitmap_payload()), record_count=3) + + assert actual == [BitMap([1, 3, 5])] + + +def test_deserialize_deletion_vector_blob_invalid_length() -> None: + with pytest.raises(ValueError, match="Invalid bitmap data length"): + _deserialize_dv_blob(_dv_blob(_bitmap_payload())[:-1]) + + +def test_deserialize_deletion_vector_blob_invalid_magic() -> None: + bitmap_data = struct.pack("I", len(bitmap_data)) + bitmap_data + struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF) + + with pytest.raises(ValueError, match="Invalid magic number"): + _deserialize_dv_blob(blob) + + +def test_deserialize_deletion_vector_blob_invalid_crc() -> None: + blob = bytearray(_dv_blob(_bitmap_payload())) + blob[-1] ^= 1 + + with pytest.raises(ValueError, match="Invalid CRC"): + _deserialize_dv_blob(bytes(blob)) + + +def test_deserialize_deletion_vector_blob_invalid_cardinality() -> None: + with pytest.raises(ValueError, match="Invalid cardinality"): + _deserialize_dv_blob(_dv_blob(_bitmap_payload()), record_count=4) + + def test_map_empty() -> None: puffin = _open_file("64mapempty.bin")