Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions pyiceberg/table/puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
import math
import zlib
from collections.abc import Iterable
from typing import TYPE_CHECKING, Literal

from pydantic import Field
Expand All @@ -27,6 +30,7 @@

# Short for: Puffin Fratercula arctica, version 1
MAGIC_BYTES = b"PFA1"
DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64"
EMPTY_BITMAP = FrozenBitMap()
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"
Expand Down Expand Up @@ -62,6 +66,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
return bitmaps


def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes:
"""
Serialize a dictionary of bitmaps into a byte array.

The format is:
- 8 bytes: number of bitmaps (little-endian)
- For each bitmap:
- 4 bytes: key (little-endian)
- n bytes: serialized bitmap
"""
with io.BytesIO() as out:
sorted_keys = sorted(bitmaps.keys())

# number of bitmaps
out.write(len(sorted_keys).to_bytes(8, "little"))

for key in sorted_keys:
if key < 0:
raise ValueError(f"Invalid unsigned key: {key}")
if key > MAX_JAVA_SIGNED:
raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")

# key
out.write(key.to_bytes(4, "little"))
# bitmap
out.write(bitmaps[key].serialize())
return out.getvalue()


class PuffinBlobMetadata(IcebergBaseModel):
type: Literal["deletion-vector-v1"] = Field()
fields: list[int] = Field()
Expand Down Expand Up @@ -114,3 +147,105 @@ def __init__(self, puffin: bytes) -> None:

def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()}


class PuffinWriter:
_blobs: list[PuffinBlobMetadata]
_blob_payloads: list[bytes]
_created_by: str | None

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please set the default value for the _created_by field using PyIceberg version {version}? You can obtain the version by using importlib.metadata.version.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done in 4ecfd18, the default is now PyIceberg version {importlib.metadata.version("pyiceberg")}.


def __init__(self, created_by: str | None = None) -> None:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about accepting an OutputFile or something, and writing the content to it? I think this is a better approach than returning bytes. Iceberg Java PuffinWriter also accepts an output file object.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done in eb81422. PuffinWriter now takes an OutputFile and finish() writes the content to it and returns the file size, following the Java PuffinWriter shape. One simplification compared to Java: the file is assembled in memory and written in one shot rather than streamed, which should be fine for DVs since they are small. Happy to revisit with a streaming implementation if needed.

self._blobs = []
self._blob_payloads = []
self._created_by = created_by

def set_blob(
self,
positions: Iterable[int],
referenced_data_file: str,
) -> None:
# We only support one blob at the moment
self._blobs = []
self._blob_payloads = []

# 1. Create bitmaps from positions

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would avoid using number prefixes. When we want to add a new operation, we need to adjust the subsequent numbers.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the numbered prefixes in 4ecfd18.

bitmaps: dict[int, BitMap] = {}
for pos in positions:
key = pos >> 32
low_bits = pos & 0xFFFFFFFF
if key not in bitmaps:
bitmaps[key] = BitMap()
bitmaps[key].add(low_bits)

# Calculate the cardinality from the bitmaps
cardinality = sum(len(bm) for bm in bitmaps.values())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: A comment for a simple single line seems excessive. It's evident when we read the code.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 4ecfd18.


# 2. Serialize bitmaps for the vector payload
vector_payload = _serialize_bitmaps(bitmaps)

# 3. Construct the full blob payload for deletion-vector-v1
with io.BytesIO() as blob_payload_buffer:
# Magic bytes for DV
blob_payload_buffer.write(DELETION_VECTOR_MAGIC)
# The vector itself
blob_payload_buffer.write(vector_payload)

# The content for CRC calculation
crc_content = blob_payload_buffer.getvalue()
crc32 = zlib.crc32(crc_content)

# The full blob to be stored in the Puffin file
with io.BytesIO() as full_blob_buffer:
# Combined length of the vector and magic bytes stored as 4 bytes, big-endian
full_blob_buffer.write(len(crc_content).to_bytes(4, "big"))
# The content (magic + vector)
full_blob_buffer.write(crc_content)
# A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian
full_blob_buffer.write(crc32.to_bytes(4, "big"))

self._blob_payloads.append(full_blob_buffer.getvalue())

# 4. Create blob metadata
properties = {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)}

self._blobs.append(
PuffinBlobMetadata(
type="deletion-vector-v1",
fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors
snapshot_id=-1,
sequence_number=-1,
offset=0, # TODO: Use DeleteFileIndex data
length=0, # TODO: Use DeleteFileIndex data
properties=properties,
compression_codec=None,
)
)

def finish(self) -> bytes:
with io.BytesIO() as out:
payload_buffer = io.BytesIO()
for blob_payload in self._blob_payloads:
payload_buffer.write(blob_payload)

updated_blobs_metadata: list[PuffinBlobMetadata] = []
current_offset = 4 # Start after file magic (4 bytes)
for i, blob_payload in enumerate(self._blob_payloads):
original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True)
original_metadata_dict["offset"] = current_offset
original_metadata_dict["length"] = len(blob_payload)
updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict))
current_offset += len(blob_payload)

footer = Footer(blobs=updated_blobs_metadata, properties={"created-by": self._created_by} if self._created_by else {})
footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8")

# Final assembly
out.write(MAGIC_BYTES)
out.write(payload_buffer.getvalue())
out.write(MAGIC_BYTES)
out.write(footer_payload_bytes)
out.write(len(footer_payload_bytes).to_bytes(4, "little"))
out.write((0).to_bytes(4, "little")) # flags
out.write(MAGIC_BYTES)

return out.getvalue()
93 changes: 93 additions & 0 deletions tests/integration/test_puffin_spark_interop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Licensed to the Apache Software Foundation (ASF) under one

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test passes without the changes made in this PR. Could you please extract a PR that adding this test?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted to #3476 and removed from this PR in 4ecfd18.

# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from pyspark.sql import SparkSession

from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.manifest import ManifestContent
from pyiceberg.table.puffin import PuffinFile


def run_spark_commands(spark: SparkSession, sqls: list[str]) -> None:
for sql in sqls:
spark.sql(sql)


@pytest.mark.integration
def test_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: RestCatalog) -> None:
"""Verify pyiceberg can read Puffin DVs written by Spark."""
identifier = "default.spark_puffin_format_test"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces support for write operations, so we're interested in verifying that Spark can read Puffin files written by PyIceberg. There are no requested changes for now. I suppose this PR is a preparatory change, and we'll need another PR to use it during the write operations.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly right, this PR is preparatory. PyIceberg does not yet have a write path that commits DVs as delete files, so a Spark-reads-PyIceberg interop test is not possible in isolation. As follow-ups, I plan to (1) extend PuffinWriter to support one blob per referenced data file and expose per-blob offset/length for content_offset/content_size_in_bytes, and (2) wire it into the merge-on-read branch of Transaction.delete() for v3 tables (toward #1078), where the Spark-reads-PyIceberg interop test will live.


run_spark_commands(spark, [f"DROP TABLE IF EXISTS {identifier}"])
run_spark_commands(
spark,
[
f"""
CREATE TABLE {identifier} (id BIGINT)
USING iceberg
TBLPROPERTIES (
'format-version' = '3',
'write.delete.mode' = 'merge-on-read'
)
""",
],
)

df = spark.range(1, 51)
df.coalesce(1).writeTo(identifier).append()

files_before = spark.sql(f"SELECT * FROM {identifier}.files").collect()
assert len(files_before) == 1, f"Expected 1 file, got {len(files_before)}"

run_spark_commands(spark, [f"DELETE FROM {identifier} WHERE id IN (10, 20, 30, 40)"])

table = session_catalog.load_table(identifier)
current_snapshot = table.current_snapshot()
assert current_snapshot is not None

manifests = current_snapshot.manifests(table.io)
delete_manifests = [m for m in manifests if m.content == ManifestContent.DELETES]
assert len(delete_manifests) > 0, "Expected delete manifest with DVs"

delete_manifest = delete_manifests[0]
entries = list(delete_manifest.fetch_manifest_entry(table.io))
assert len(entries) > 0, "Expected at least one delete file entry"

delete_entry = entries[0]
puffin_path = delete_entry.data_file.file_path
assert puffin_path.endswith(".puffin"), f"Expected Puffin file, got: {puffin_path}"

input_file = table.io.new_input(puffin_path)
with input_file.open() as f:
puffin_bytes = f.read()

puffin = PuffinFile(puffin_bytes)

assert len(puffin.footer.blobs) == 1, "Expected exactly one blob"

blob = puffin.footer.blobs[0]
assert blob.type == "deletion-vector-v1"
assert "referenced-data-file" in blob.properties
assert blob.properties["cardinality"] == "4"

dv_dict = puffin.to_vector()
assert len(dv_dict) == 1, "Expected one data file's deletions"

for _data_file_path, chunked_array in dv_dict.items():
positions = chunked_array.to_pylist()
assert len(positions) == 4, f"Expected 4 deleted positions, got {len(positions)}"
assert sorted(positions) == [9, 19, 29, 39], f"Unexpected positions: {positions}"
104 changes: 103 additions & 1 deletion tests/table/test_puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import zlib
from os import path

import pytest
from pyroaring import BitMap

from pyiceberg.table.puffin import _deserialize_bitmap
from pyiceberg.table.puffin import (
DELETION_VECTOR_MAGIC,
MAGIC_BYTES,
PROPERTY_REFERENCED_DATA_FILE,
PuffinFile,
PuffinWriter,
_deserialize_bitmap,
)


def _open_file(file: str) -> bytes:
Expand Down Expand Up @@ -71,3 +79,97 @@ def test_map_high_vals() -> None:

with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"):
_ = _deserialize_bitmap(puffin)


def test_puffin_round_trip() -> None:
# Define some deletion positions for a file
deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate

file_path = "path/to/data.parquet"

# Write the Puffin file
writer = PuffinWriter(created_by="my-test-app")
writer.set_blob(positions=deletions, referenced_data_file=file_path)
puffin_bytes = writer.finish()

# Read the Puffin file back
reader = PuffinFile(puffin_bytes)

# Assert footer metadata
assert reader.footer.properties["created-by"] == "my-test-app"
assert len(reader.footer.blobs) == 1

blob_meta = reader.footer.blobs[0]
assert blob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file_path
assert blob_meta.properties["cardinality"] == str(len(set(deletions)))

# Assert the content of deletion vectors
read_vectors = reader.to_vector()

assert file_path in read_vectors
assert read_vectors[file_path].to_pylist() == sorted(set(deletions))


def test_write_and_read_puffin_file() -> None:
writer = PuffinWriter()
writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet")
writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet")
puffin_bytes = writer.finish()

reader = PuffinFile(puffin_bytes)

assert len(reader.footer.blobs) == 1
blob = reader.footer.blobs[0]

assert blob.properties["referenced-data-file"] == "file2.parquet"
assert blob.properties["cardinality"] == "3"
assert blob.type == "deletion-vector-v1"
# Reserved field id of the row position column (Java MetadataColumns.ROW_POSITION, INT_MAX - 2);
# required for Java/Spark interoperability.
assert blob.fields == [2147483645]
assert blob.snapshot_id == -1
assert blob.sequence_number == -1
assert blob.compression_codec is None

vectors = reader.to_vector()
assert len(vectors) == 1
assert "file1.parquet" not in vectors
assert vectors["file2.parquet"].to_pylist() == [4, 5, 6]


def test_deletion_vector_blob_framing_is_spec_compliant() -> None:
# PuffinFile reads only the serialized vector, skipping the blob's length prefix,
# deletion-vector magic and CRC-32. Assert that framing directly at the byte level so
# the bytes an external reader (Java/Spark) relies on stay spec-compliant.
positions = [0, 1, 5, (1 << 32) + 7]
writer = PuffinWriter()
writer.set_blob(positions=positions, referenced_data_file="file.parquet")
puffin_bytes = writer.finish()

# The Puffin file begins with the magic.
assert puffin_bytes[:4] == MAGIC_BYTES

blob = PuffinFile(puffin_bytes).footer.blobs[0]
blob_bytes = puffin_bytes[blob.offset : blob.offset + blob.length]

# Layout: length (4B big-endian) | DV magic (4B) | vector | CRC-32 (4B big-endian),
# where the length and CRC-32 both cover the magic bytes plus the vector.
length_prefix = int.from_bytes(blob_bytes[0:4], "big")
dv_magic = blob_bytes[4:8]
vector = blob_bytes[8 : 4 + length_prefix]
crc = int.from_bytes(blob_bytes[4 + length_prefix : 8 + length_prefix], "big")

assert dv_magic == DELETION_VECTOR_MAGIC
assert length_prefix == len(dv_magic) + len(vector)
assert blob.length == 4 + length_prefix + 4
assert crc == zlib.crc32(dv_magic + vector)


def test_puffin_file_with_no_blobs() -> None:
writer = PuffinWriter()
puffin_bytes = writer.finish()

reader = PuffinFile(puffin_bytes)
assert len(reader.footer.blobs) == 0
assert len(reader.to_vector()) == 0
assert "created-by" not in reader.footer.properties