Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
from pyiceberg.table.puffin import PuffinFile
from pyiceberg.table.puffin import PuffinFile, _bitmaps_to_chunked_array, _deserialize_dv_blob
from pyiceberg.transforms import IdentityTransform, TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
from pyiceberg.types import (
Expand Down Expand Up @@ -192,6 +192,8 @@
logger = logging.getLogger(__name__)

ONE_MEGABYTE = 1024 * 1024
# Match Iceberg Java's Integer.MAX_VALUE limit before reading a DV content range into memory.
_MAX_DELETION_VECTOR_CONTENT_SIZE = 2**31 - 1
BUFFER_SIZE = "buffer-size"
ICEBERG_SCHEMA = b"iceberg.schema"
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
Expand Down Expand Up @@ -1116,6 +1118,27 @@ def _get_file_format(file_format: FileFormat, **kwargs: dict[str, Any]) -> ds.Fi
raise ValueError(f"Unsupported file format: {file_format}")


def _validate_deletion_vector(data_file: DataFile) -> tuple[int, int, str]:
content_offset = getattr(data_file, "content_offset", None)
content_size_in_bytes = getattr(data_file, "content_size_in_bytes", None)
referenced_data_file = getattr(data_file, "referenced_data_file", None)

if content_offset is None:
raise ValueError(f"Invalid deletion vector, content offset is missing: {data_file.file_path}")
if content_size_in_bytes is None:
raise ValueError(f"Invalid deletion vector, content size is missing: {data_file.file_path}")
if content_offset < 0:
raise ValueError(f"Invalid deletion vector, content offset cannot be negative: {content_offset}")
if content_size_in_bytes < 0:
raise ValueError(f"Invalid deletion vector, content size cannot be negative: {content_size_in_bytes}")
if content_size_in_bytes > _MAX_DELETION_VECTOR_CONTENT_SIZE:
raise ValueError(f"Cannot read deletion vector larger than 2GB: {content_size_in_bytes}")
if referenced_data_file is None:
raise ValueError(f"Invalid deletion vector, referenced data file is missing: {data_file.file_path}")

return content_offset, content_size_in_bytes, referenced_data_file


def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]:
if data_file.file_format == FileFormat.PARQUET:
with io.new_input(data_file.file_path).open() as fi:
Expand All @@ -1139,6 +1162,22 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]
}
elif data_file.file_format == FileFormat.PUFFIN:
with io.new_input(data_file.file_path).open() as fi:
content_offset = getattr(data_file, "content_offset", None)
content_size_in_bytes = getattr(data_file, "content_size_in_bytes", None)
if content_offset is not None or content_size_in_bytes is not None:
Comment on lines 1163 to +1167

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the file format is Puffin, these two fields are never None, right?

https://iceberg.apache.org/spec/#data-file-fields

The content_offset and content_size_in_bytes fields are used to reference a specific blob for direct access to a deletion vector. For deletion vectors, these values are required and must exactly match the offset and length stored in the Puffin footer for the deletion vector blob.

# A DV is declared as PUFFIN in the manifest, but the content range points directly
# to the serialized bitmap blob, so avoid parsing the entire file as a Puffin file.
content_offset, content_size_in_bytes, referenced_data_file = _validate_deletion_vector(data_file)

fi.seek(content_offset)
payload = fi.read(content_size_in_bytes)
if len(payload) != content_size_in_bytes:
raise ValueError(
f"Could not read deletion vector, expected {content_size_in_bytes} bytes, got {len(payload)}"
)
bitmaps = _deserialize_dv_blob(payload, data_file.record_count)
return {referenced_data_file: _bitmaps_to_chunked_array(bitmaps)}

payload = fi.read()

return PuffinFile(payload).to_vector()
Expand Down
12 changes: 12 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,18 @@ def equality_ids(self) -> list[int] | None:
def sort_order_id(self) -> int | None:
return self._data[15]

@property
def referenced_data_file(self) -> str | None:
return self._data[17] if len(self._data) > 17 else None

@property
def content_offset(self) -> int | None:
return self._data[18] if len(self._data) > 18 else None

@property
def content_size_in_bytes(self) -> int | None:
return self._data[19] if len(self._data) > 19 else None

# Spec ID should not be stored in the file
_spec_id: int

Expand Down
42 changes: 42 additions & 0 deletions pyiceberg/table/puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
import math
import struct
import zlib
from typing import TYPE_CHECKING, Literal

from pydantic import Field
Expand All @@ -30,6 +32,12 @@
EMPTY_BITMAP = FrozenBitMap()
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
_DV_BLOB_LENGTH = struct.Struct(">I")
_DV_BLOB_MAGIC = struct.Struct("<I")
_DV_BLOB_CRC = struct.Struct(">I")
_DV_BLOB_MAGIC_NUMBER = 1681511377
_ROARING_BITMAP_COUNT_SIZE_BYTES = 8
_DV_BLOB_MIN_SIZE_BYTES = _DV_BLOB_LENGTH.size + _DV_BLOB_MAGIC.size + _ROARING_BITMAP_COUNT_SIZE_BYTES + _DV_BLOB_CRC.size


def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
Expand Down Expand Up @@ -62,6 +70,40 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
return bitmaps


def _deserialize_dv_blob(blob: bytes, record_count: int | None = None) -> list[BitMap]:
# The DV blob encoding matches Iceberg Java's BitmapPositionDeleteIndex:
# 4-byte big-endian bitmap-data length, 4-byte little-endian magic number,
# portable Roaring bitmap data, and 4-byte big-endian CRC-32.
if len(blob) < _DV_BLOB_MIN_SIZE_BYTES:
raise ValueError(f"Invalid deletion vector blob length: {len(blob)}")

bitmap_data_length = _DV_BLOB_LENGTH.unpack_from(blob)[0]
expected_bitmap_data_length = len(blob) - _DV_BLOB_LENGTH.size - _DV_BLOB_CRC.size
if bitmap_data_length != expected_bitmap_data_length:
raise ValueError(f"Invalid bitmap data length: {bitmap_data_length}, expected {expected_bitmap_data_length}")

bitmap_data_offset = _DV_BLOB_LENGTH.size
crc_offset = bitmap_data_offset + bitmap_data_length
bitmap_data = blob[bitmap_data_offset:crc_offset]

magic_number = _DV_BLOB_MAGIC.unpack_from(bitmap_data)[0]
if magic_number != _DV_BLOB_MAGIC_NUMBER:
raise ValueError(f"Invalid magic number: {magic_number}, expected {_DV_BLOB_MAGIC_NUMBER}")

checksum = zlib.crc32(bitmap_data) & 0xFFFFFFFF
expected_checksum = _DV_BLOB_CRC.unpack_from(blob, crc_offset)[0]
if checksum != expected_checksum:
raise ValueError("Invalid CRC")

bitmaps = _deserialize_bitmap(bitmap_data[_DV_BLOB_MAGIC.size :])
if record_count is not None:
cardinality = sum(len(bitmap) for bitmap in bitmaps)
if cardinality != record_count:
raise ValueError(f"Invalid cardinality: {cardinality}, expected {record_count}")

return bitmaps


class PuffinBlobMetadata(IcebergBaseModel):
type: Literal["deletion-vector-v1"] = Field()
fields: list[int] = Field()
Expand Down
89 changes: 89 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=protected-access,unused-argument,redefined-outer-name
import json
import logging
import os
import struct
import tempfile
import uuid
import warnings
import zlib
from collections.abc import Iterator
from datetime import date, datetime, timezone
from pathlib import Path
Expand All @@ -34,6 +37,7 @@
import pytest
from packaging import version
from pyarrow.fs import AwsDefaultS3RetryStrategy, FileType, LocalFileSystem, S3FileSystem
from pyroaring import BitMap

from pyiceberg.exceptions import ResolveError
from pyiceberg.expressions import (
Expand Down Expand Up @@ -91,6 +95,11 @@
from pyiceberg.table import FileScanTask, TableProperties
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.table.name_mapping import create_mapping_from_schema
from pyiceberg.table.puffin import (
_DV_BLOB_MAGIC_NUMBER,
MAGIC_BYTES,
PROPERTY_REFERENCED_DATA_FILE,
)
from pyiceberg.transforms import HourTransform, IdentityTransform
from pyiceberg.typedef import UTF8, Properties, Record, TableVersion
from pyiceberg.types import (
Expand Down Expand Up @@ -1820,6 +1829,86 @@ def test_read_deletes(deletes_file: str, request: pytest.FixtureRequest) -> None
assert list(deletes.values())[0] == pa.chunked_array([[1, 3, 5]])


def _deletion_vector_bitmap_payload() -> bytes:
return (1).to_bytes(8, byteorder="little") + (0).to_bytes(4, byteorder="little") + BitMap([1, 3, 5]).serialize()


def _deletion_vector_blob(bitmap_payload: bytes) -> bytes:
bitmap_data = struct.pack("<I", _DV_BLOB_MAGIC_NUMBER) + bitmap_payload
return struct.pack(">I", len(bitmap_data)) + bitmap_data + struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF)


def test_read_deletion_vector_from_puffin_file(tmp_path: Path) -> None:
referenced_data_file = f"{tmp_path}/data.parquet"
bitmap_payload = _deletion_vector_bitmap_payload()
footer_payload = json.dumps(
{
"blobs": [
{
"type": "deletion-vector-v1",
"fields": [2147483546],
"snapshot-id": 1,
"sequence-number": 1,
"offset": 0,
"length": len(bitmap_payload),
"properties": {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file},
}
],
"properties": {},
}
).encode()
puffin_payload = (
MAGIC_BYTES
+ b"\x00\x00\x00\x00"
+ bitmap_payload
+ footer_payload
+ len(footer_payload).to_bytes(4, byteorder="little")
+ b"\x00\x00\x00\x00"
+ MAGIC_BYTES
)
delete_file_path = f"{tmp_path}/deletes.puffin"

with open(delete_file_path, "wb") as f:
f.write(puffin_payload)

deletes = _read_deletes(
PyArrowFileIO(),
DataFile.from_args(
content=DataFileContent.POSITION_DELETES,
file_path=delete_file_path,
file_format=FileFormat.PUFFIN,
),
)

assert deletes == {referenced_data_file: pa.chunked_array([[1, 3, 5]])}


def test_read_deletion_vector_blob_from_content_range(tmp_path: Path) -> None:
referenced_data_file = f"{tmp_path}/data.parquet"
dv_blob = _deletion_vector_blob(_deletion_vector_bitmap_payload())
prefix = b"\x01not-a-puffin-file"
delete_file_path = f"{tmp_path}/deletes.bin"

with open(delete_file_path, "wb") as f:
f.write(prefix + dv_blob + b"trailing-bytes")

deletes = _read_deletes(
PyArrowFileIO(),
DataFile.from_args(
_table_format_version=3,
content=DataFileContent.POSITION_DELETES,
file_path=delete_file_path,
file_format=FileFormat.PUFFIN,
record_count=3,
referenced_data_file=referenced_data_file,
content_offset=len(prefix),
content_size_in_bytes=len(dv_blob),
),
)

assert deletes == {referenced_data_file: pa.chunked_array([[1, 3, 5]])}


def test_delete(deletes_file: str, request: pytest.FixtureRequest, table_schema_simple: Schema) -> None:
# Determine file format from the file extension
file_format = FileFormat.PARQUET if deletes_file.endswith(".parquet") else FileFormat.ORC
Expand Down
45 changes: 44 additions & 1 deletion tests/table/test_puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import struct
import zlib
from os import path

import pytest
from pyroaring import BitMap

from pyiceberg.table.puffin import _deserialize_bitmap
from pyiceberg.table.puffin import _DV_BLOB_MAGIC_NUMBER, _deserialize_bitmap, _deserialize_dv_blob


def _open_file(file: str) -> bytes:
Expand All @@ -28,6 +30,47 @@ def _open_file(file: str) -> bytes:
return f.read()


def _dv_blob(bitmap_payload: bytes) -> bytes:
bitmap_data = struct.pack("<I", _DV_BLOB_MAGIC_NUMBER) + bitmap_payload
return struct.pack(">I", len(bitmap_data)) + bitmap_data + struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF)


def _bitmap_payload() -> bytes:
return (1).to_bytes(8, byteorder="little") + (0).to_bytes(4, byteorder="little") + BitMap([1, 3, 5]).serialize()


def test_deserialize_deletion_vector_blob() -> None:
actual = _deserialize_dv_blob(_dv_blob(_bitmap_payload()), record_count=3)

assert actual == [BitMap([1, 3, 5])]


def test_deserialize_deletion_vector_blob_invalid_length() -> None:
with pytest.raises(ValueError, match="Invalid bitmap data length"):
_deserialize_dv_blob(_dv_blob(_bitmap_payload())[:-1])


def test_deserialize_deletion_vector_blob_invalid_magic() -> None:
bitmap_data = struct.pack("<I", _DV_BLOB_MAGIC_NUMBER + 1) + _bitmap_payload()
blob = struct.pack(">I", len(bitmap_data)) + bitmap_data + struct.pack(">I", zlib.crc32(bitmap_data) & 0xFFFFFFFF)

with pytest.raises(ValueError, match="Invalid magic number"):
_deserialize_dv_blob(blob)


def test_deserialize_deletion_vector_blob_invalid_crc() -> None:
blob = bytearray(_dv_blob(_bitmap_payload()))
blob[-1] ^= 1

with pytest.raises(ValueError, match="Invalid CRC"):
_deserialize_dv_blob(bytes(blob))


def test_deserialize_deletion_vector_blob_invalid_cardinality() -> None:
with pytest.raises(ValueError, match="Invalid cardinality"):
_deserialize_dv_blob(_dv_blob(_bitmap_payload()), record_count=4)


def test_map_empty() -> None:
puffin = _open_file("64mapempty.bin")

Expand Down
Loading