Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions pyiceberg/table/puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import importlib.metadata
import io
import math
import zlib
from collections.abc import Iterable
from typing import TYPE_CHECKING, Literal

from pydantic import Field
Expand All @@ -27,9 +31,13 @@

# Short for: Puffin Fratercula arctica, version 1
MAGIC_BYTES = b"PFA1"
DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64"
EMPTY_BITMAP = FrozenBitMap()
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
# Reserved field id of the row position (_pos) metadata column, referenced by
# deletion-vector-v1 blob metadata (Java: MetadataColumns.ROW_POSITION)
ROW_POSITION_FIELD_ID = 2147483645


def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
Expand Down Expand Up @@ -62,6 +70,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
return bitmaps


def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes:
"""
Serialize a dictionary of bitmaps into a byte array.

The format is:
- 8 bytes: number of bitmaps (little-endian)
- For each bitmap:
- 4 bytes: key (little-endian)
- n bytes: serialized bitmap
"""
with io.BytesIO() as out:
sorted_keys = sorted(bitmaps.keys())

# number of bitmaps
out.write(len(sorted_keys).to_bytes(8, "little"))

for key in sorted_keys:
if key < 0:
raise ValueError(f"Invalid unsigned key: {key}")
if key > MAX_JAVA_SIGNED:
raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")

# key
out.write(key.to_bytes(4, "little"))
# bitmap
out.write(bitmaps[key].serialize())
return out.getvalue()


class PuffinBlobMetadata(IcebergBaseModel):
type: Literal["deletion-vector-v1"] = Field()
fields: list[int] = Field()
Expand Down Expand Up @@ -114,3 +151,92 @@ def __init__(self, puffin: bytes) -> None:

def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()}


class PuffinWriter:
"""Writes a Puffin file containing a single deletion-vector-v1 blob."""

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks misleading. This writer doesn't write a file in my understanding.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it didn't. Addressed in eb81422 together with the suggestion below: PuffinWriter now accepts an OutputFile and finish() writes the file, so the docstring matches the behavior now.


_blobs: list[PuffinBlobMetadata]
_blob_payloads: list[bytes]
_created_by: str

def __init__(self, created_by: str | None = None) -> None:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about accepting an OutputFile or something, and writing the content to it? I think this is a better approach than returning bytes. Iceberg Java PuffinWriter also accepts an output file object.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done in eb81422. PuffinWriter now takes an OutputFile and finish() writes the content to it and returns the file size, following the Java PuffinWriter shape. One simplification compared to Java: the file is assembled in memory and written in one shot rather than streamed, which should be fine for DVs since they are small. Happy to revisit with a streaming implementation if needed.

self._blobs = []
self._blob_payloads = []
self._created_by = (
created_by if created_by is not None else f"PyIceberg version {importlib.metadata.version('pyiceberg')}"
)

def set_blob(
self,
positions: Iterable[int],
referenced_data_file: str,
) -> None:
"""Set the deletion vector blob for a data file, replacing any previously set blob.

Args:
positions: Zero-based positions of the deleted rows in the referenced data file.
referenced_data_file: Location of the data file the deletion vector applies to.
"""
# We only support one blob at the moment
self._blobs = []
self._blob_payloads = []

bitmaps: dict[int, BitMap] = {}
for pos in positions:
if pos < 0:
raise ValueError(f"Invalid position: {pos}, positions must be non-negative")
key = pos >> 32
low_bits = pos & 0xFFFFFFFF
if key not in bitmaps:
bitmaps[key] = BitMap()
bitmaps[key].add(low_bits)

if not bitmaps:
raise ValueError("Deletion vector must contain at least one position")

cardinality = sum(len(bm) for bm in bitmaps.values())
vector_payload = _serialize_bitmaps(bitmaps)

# deletion-vector-v1 blob layout: combined length of magic and vector (4 bytes, big-endian),
# the DV magic bytes, the serialized vector, and a CRC-32 checksum of magic + vector (4 bytes, big-endian)
blob_content = DELETION_VECTOR_MAGIC + vector_payload
self._blob_payloads.append(
len(blob_content).to_bytes(4, "big") + blob_content + zlib.crc32(blob_content).to_bytes(4, "big")
)

self._blobs.append(
PuffinBlobMetadata(
type="deletion-vector-v1",
fields=[ROW_POSITION_FIELD_ID],
# -1 means the snapshot id and sequence number are inherited at commit time
snapshot_id=-1,
sequence_number=-1,
# offset and length are placeholders; finish() fills them in when assembling the file
offset=0,
length=0,
properties={PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)},
compression_codec=None,
)
)

def finish(self) -> bytes:
"""Serialize the Puffin file and return its contents as bytes."""
with io.BytesIO() as out:
out.write(MAGIC_BYTES)

blobs_metadata: list[PuffinBlobMetadata] = []
for blob_metadata, blob_payload in zip(self._blobs, self._blob_payloads, strict=True):
blobs_metadata.append(blob_metadata.model_copy(update={"offset": out.tell(), "length": len(blob_payload)}))
out.write(blob_payload)

footer = Footer(blobs=blobs_metadata, properties={"created-by": self._created_by})
footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8")

out.write(MAGIC_BYTES)
out.write(footer_payload_bytes)
out.write(len(footer_payload_bytes).to_bytes(4, "little"))
out.write((0).to_bytes(4, "little")) # flags
out.write(MAGIC_BYTES)

return out.getvalue()
123 changes: 122 additions & 1 deletion tests/table/test_puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import importlib.metadata
import zlib
from os import path

import pytest
from pyroaring import BitMap

from pyiceberg.table.puffin import _deserialize_bitmap
from pyiceberg.table.puffin import (
DELETION_VECTOR_MAGIC,
MAGIC_BYTES,
PROPERTY_REFERENCED_DATA_FILE,
PuffinFile,
PuffinWriter,
_deserialize_bitmap,
)


def _open_file(file: str) -> bytes:
Expand Down Expand Up @@ -71,3 +80,115 @@ def test_map_high_vals() -> None:

with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"):
_ = _deserialize_bitmap(puffin)


def test_puffin_round_trip() -> None:
# Define some deletion positions for a file
deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate

file_path = "path/to/data.parquet"

# Write the Puffin file
writer = PuffinWriter(created_by="my-test-app")
writer.set_blob(positions=deletions, referenced_data_file=file_path)
puffin_bytes = writer.finish()

# Read the Puffin file back
reader = PuffinFile(puffin_bytes)

# Assert footer metadata
assert reader.footer.properties["created-by"] == "my-test-app"
assert len(reader.footer.blobs) == 1

blob_meta = reader.footer.blobs[0]
assert blob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file_path
assert blob_meta.properties["cardinality"] == str(len(set(deletions)))

# Assert the content of deletion vectors
read_vectors = reader.to_vector()

assert file_path in read_vectors
assert read_vectors[file_path].to_pylist() == sorted(set(deletions))


def test_write_and_read_puffin_file() -> None:
writer = PuffinWriter()
writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet")
writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet")
puffin_bytes = writer.finish()

reader = PuffinFile(puffin_bytes)

assert len(reader.footer.blobs) == 1
blob = reader.footer.blobs[0]

assert blob.properties["referenced-data-file"] == "file2.parquet"
assert blob.properties["cardinality"] == "3"
assert blob.type == "deletion-vector-v1"
# Reserved field id of the row position column (Java MetadataColumns.ROW_POSITION, INT_MAX - 2);
# required for Java/Spark interoperability.
assert blob.fields == [2147483645]
assert blob.snapshot_id == -1
assert blob.sequence_number == -1
assert blob.compression_codec is None

vectors = reader.to_vector()
assert len(vectors) == 1
assert "file1.parquet" not in vectors
assert vectors["file2.parquet"].to_pylist() == [4, 5, 6]


def test_deletion_vector_blob_framing_is_spec_compliant() -> None:
# PuffinFile reads only the serialized vector, skipping the blob's length prefix,
# deletion-vector magic and CRC-32. Assert that framing directly at the byte level so
# the bytes an external reader (Java/Spark) relies on stay spec-compliant.
positions = [0, 1, 5, (1 << 32) + 7]
writer = PuffinWriter()
writer.set_blob(positions=positions, referenced_data_file="file.parquet")
puffin_bytes = writer.finish()

# The Puffin file begins with the magic.
assert puffin_bytes[:4] == MAGIC_BYTES

blob = PuffinFile(puffin_bytes).footer.blobs[0]
blob_bytes = puffin_bytes[blob.offset : blob.offset + blob.length]

# Layout: length (4B big-endian) | DV magic (4B) | vector | CRC-32 (4B big-endian),
# where the length and CRC-32 both cover the magic bytes plus the vector.
length_prefix = int.from_bytes(blob_bytes[0:4], "big")
dv_magic = blob_bytes[4:8]
vector = blob_bytes[8 : 4 + length_prefix]
crc = int.from_bytes(blob_bytes[4 + length_prefix : 8 + length_prefix], "big")

assert dv_magic == DELETION_VECTOR_MAGIC
assert length_prefix == len(dv_magic) + len(vector)
assert blob.length == 4 + length_prefix + 4
assert crc == zlib.crc32(dv_magic + vector)


def test_puffin_file_with_no_blobs() -> None:
writer = PuffinWriter()
puffin_bytes = writer.finish()

reader = PuffinFile(puffin_bytes)
assert len(reader.footer.blobs) == 0
assert len(reader.to_vector()) == 0


def test_puffin_writer_default_created_by() -> None:
puffin_bytes = PuffinWriter().finish()

reader = PuffinFile(puffin_bytes)
assert reader.footer.properties["created-by"] == f"PyIceberg version {importlib.metadata.version('pyiceberg')}"


def test_set_blob_rejects_negative_positions() -> None:
writer = PuffinWriter()
with pytest.raises(ValueError, match="Invalid position: -1"):
writer.set_blob(positions=[1, -1], referenced_data_file="file.parquet")


def test_set_blob_rejects_empty_positions() -> None:
writer = PuffinWriter()
with pytest.raises(ValueError, match="Deletion vector must contain at least one position"):
writer.set_blob(positions=[], referenced_data_file="file.parquet")