Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ project(':iceberg-core') {
}

implementation libs.aircompressor
implementation libs.lz4Java
implementation libs.httpcomponents.httpclient5
implementation platform(libs.jackson.bom)
implementation libs.jackson.core
Expand Down
43 changes: 37 additions & 6 deletions core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,23 @@
import io.airlift.compress.Compressor;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import net.jpountz.lz4.LZ4FrameInputStream;
import net.jpountz.lz4.LZ4FrameOutputStream;
import net.jpountz.lz4.LZ4FrameOutputStream.BLOCKSIZE;
import net.jpountz.lz4.LZ4FrameOutputStream.FLG;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.Pair;

Expand Down Expand Up @@ -108,9 +116,7 @@ static ByteBuffer compress(PuffinCompressionCodec codec, ByteBuffer input) {
case NONE:
return input.duplicate();
case LZ4:
// TODO requires LZ4 frame compressor, e.g.
// https://github.com/airlift/aircompressor/pull/142
break;
return compressLz4(input);
case ZSTD:
return compress(new ZstdCompressor(), input);
}
Expand All @@ -130,9 +136,7 @@ static ByteBuffer decompress(PuffinCompressionCodec codec, ByteBuffer input) {
return input.duplicate();

case LZ4:
// TODO requires LZ4 frame decompressor, e.g.
// https://github.com/airlift/aircompressor/pull/142
break;
return decompressLz4(input);

case ZSTD:
return decompressZstd(input);
Expand All @@ -141,6 +145,33 @@ static ByteBuffer decompress(PuffinCompressionCodec codec, ByteBuffer input) {
throw new UnsupportedOperationException("Unsupported codec: " + codec);
}

private static ByteBuffer compressLz4(ByteBuffer input) {
byte[] inputBytes = ByteBuffers.toByteArray(input);
ByteArrayOutputStream output = new ByteArrayOutputStream(inputBytes.length);
try (LZ4FrameOutputStream lz4Out =
new LZ4FrameOutputStream(
output,
BLOCKSIZE.SIZE_4MB,
inputBytes.length,
FLG.Bits.BLOCK_INDEPENDENCE,
FLG.Bits.CONTENT_SIZE)) {
lz4Out.write(inputBytes);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return ByteBuffer.wrap(output.toByteArray());
}

private static ByteBuffer decompressLz4(ByteBuffer input) {
byte[] inputBytes = ByteBuffers.toByteArray(input);
try (LZ4FrameInputStream lz4In =
new LZ4FrameInputStream(new ByteArrayInputStream(inputBytes))) {
return ByteBuffer.wrap(ByteStreams.toByteArray(lz4In));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static ByteBuffer decompressZstd(ByteBuffer input) {
byte[] inputBytes;
int inputOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@ private PuffinFormatTestUtil() {}
// footer size for v1/empty-puffin-uncompressed.bin
public static final long EMPTY_PUFFIN_UNCOMPRESSED_FOOTER_SIZE = 28;

// footer size for v1/empty-puffin-compressed-footer.bin
public static final long EMPTY_PUFFIN_COMPRESSED_FOOTER_SIZE = 51;

// footer size for v1/sample-metric-data-compressed-zstd.bin
public static final long SAMPLE_METRIC_DATA_COMPRESSED_ZSTD_FOOTER_SIZE = 314;

// footer size for v1/sample-metric-data-compressed-lz4.bin
public static final long SAMPLE_METRIC_DATA_COMPRESSED_LZ4_FOOTER_SIZE = 312;

static byte[] readTestResource(String resourceName) throws Exception {
return Resources.toByteArray(Resources.getResource(PuffinFormatTestUtil.class, resourceName));
}
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/java/org/apache/iceberg/puffin/TestPuffinFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.nio.ByteOrder;
import java.util.Arrays;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ByteBuffers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class TestPuffinFormat {
@Test
Expand Down Expand Up @@ -74,6 +77,28 @@ private void testReadIntegerLittleEndian(byte[] input, int offset, int expected)
assertThat(readIntegerLittleEndian(input, offset)).isEqualTo(expected);
}

@ParameterizedTest
@EnumSource(PuffinCompressionCodec.class)
public void testCompressDecompressRoundtrip(PuffinCompressionCodec codec) {
// mix of repeated runs (compressible) and varying bytes, including a NUL
byte[] original = new byte[2048];
for (int i = 0; i < original.length; i++) {
original[i] = (byte) ((i / 16) % 7 == 0 ? 0 : (i * 31 + (i / 13)));
}

ByteBuffer compressed = PuffinFormat.compress(codec, ByteBuffer.wrap(original));
ByteBuffer decompressed = PuffinFormat.decompress(codec, compressed.duplicate());
assertThat(ByteBuffers.toByteArray(decompressed)).isEqualTo(original);
}

@ParameterizedTest
@EnumSource(PuffinCompressionCodec.class)
public void testCompressDecompressEmpty(PuffinCompressionCodec codec) {
ByteBuffer compressed = PuffinFormat.compress(codec, ByteBuffer.wrap(new byte[0]));
ByteBuffer decompressed = PuffinFormat.decompress(codec, compressed.duplicate());
assertThat(ByteBuffers.toByteArray(decompressed)).isEmpty();
}

private byte[] bytes(int... unsignedBytes) {
byte[] bytes = new byte[unsignedBytes.length];
for (int i = 0; i < unsignedBytes.length; i++) {
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/java/org/apache/iceberg/puffin/TestPuffinReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.iceberg.puffin;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.iceberg.puffin.PuffinCompressionCodec.LZ4;
import static org.apache.iceberg.puffin.PuffinCompressionCodec.NONE;
import static org.apache.iceberg.puffin.PuffinCompressionCodec.ZSTD;
import static org.apache.iceberg.puffin.PuffinFormatTestUtil.EMPTY_PUFFIN_UNCOMPRESSED_FOOTER_SIZE;
import static org.apache.iceberg.puffin.PuffinFormatTestUtil.SAMPLE_METRIC_DATA_COMPRESSED_LZ4_FOOTER_SIZE;
import static org.apache.iceberg.puffin.PuffinFormatTestUtil.SAMPLE_METRIC_DATA_COMPRESSED_ZSTD_FOOTER_SIZE;
import static org.apache.iceberg.puffin.PuffinFormatTestUtil.readTestResource;
import static org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -98,6 +100,11 @@ public void testReadMetricDataCompressedZstd() throws Exception {
testReadMetricData("v1/sample-metric-data-compressed-zstd.bin", ZSTD);
}

@Test
public void testReadMetricDataCompressedLz4() throws Exception {
testReadMetricData("v1/sample-metric-data-compressed-lz4.bin", LZ4);
}

private void testReadMetricData(String resourceName, PuffinCompressionCodec expectedCodec)
throws Exception {
InMemoryInputFile inputFile = new InMemoryInputFile(readTestResource(resourceName));
Expand Down Expand Up @@ -154,4 +161,18 @@ public void testValidateFooterSizeValue() throws Exception {
.isEqualTo(ImmutableMap.of("created-by", "Test 1234"));
}
}

@Test
public void testValidateLz4FooterSizeValue() throws Exception {
// Ensure the definition of SAMPLE_METRIC_DATA_COMPRESSED_LZ4_FOOTER_SIZE remains accurate
InMemoryInputFile inputFile =
new InMemoryInputFile(readTestResource("v1/sample-metric-data-compressed-lz4.bin"));
try (PuffinReader reader =
Puffin.read(inputFile)
.withFooterSize(SAMPLE_METRIC_DATA_COMPRESSED_LZ4_FOOTER_SIZE)
.build()) {
assertThat(reader.fileMetadata().properties())
.isEqualTo(ImmutableMap.of("created-by", "Test 1234"));
}
}
}
23 changes: 16 additions & 7 deletions core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.iceberg.puffin;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.iceberg.puffin.PuffinCompressionCodec.LZ4;
import static org.apache.iceberg.puffin.PuffinCompressionCodec.NONE;
import static org.apache.iceberg.puffin.PuffinCompressionCodec.ZSTD;
import static org.apache.iceberg.puffin.PuffinFormatTestUtil.EMPTY_PUFFIN_COMPRESSED_FOOTER_SIZE;
import static org.apache.iceberg.puffin.PuffinFormatTestUtil.EMPTY_PUFFIN_UNCOMPRESSED_FOOTER_SIZE;
import static org.apache.iceberg.puffin.PuffinFormatTestUtil.readTestResource;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -46,19 +48,21 @@ public class TestPuffinWriter {
@TempDir private Path temp;

@Test
public void testEmptyFooterCompressed() {
public void testEmptyFooterCompressed() throws Exception {
InMemoryOutputFile outputFile = new InMemoryOutputFile();

PuffinWriter writer = Puffin.write(outputFile).compressFooter().build();
assertThatThrownBy(writer::footerSize)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Footer not written yet");
assertThatThrownBy(writer::finish)
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Unsupported codec: LZ4");
assertThatThrownBy(writer::close)
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Unsupported codec: LZ4");
writer.finish();
assertThat(writer.footerSize()).isEqualTo(EMPTY_PUFFIN_COMPRESSED_FOOTER_SIZE);
writer.close();
assertThat(outputFile.toByteArray())
.isEqualTo(readTestResource("v1/empty-puffin-compressed-footer.bin"));
// footerSize is still accessible after close()
assertThat(writer.footerSize()).isEqualTo(EMPTY_PUFFIN_COMPRESSED_FOOTER_SIZE);
assertThat(writer.writtenBlobsMetadata()).isEmpty();
}

@Test
Expand Down Expand Up @@ -98,6 +102,11 @@ public void testWriteMetricDataCompressedZstd() throws Exception {
testWriteMetric(ZSTD, "v1/sample-metric-data-compressed-zstd.bin");
}

@Test
public void testWriteMetricDataCompressedLz4() throws Exception {
testWriteMetric(LZ4, "v1/sample-metric-data-compressed-lz4.bin");
}

@ParameterizedTest
@CsvSource({"true, 158", "false, 122"})
public void testFileSizeCalculation(boolean isEncrypted, long expectedSize) throws Exception {
Expand Down
Binary file not shown.
Binary file not shown.
8 changes: 8 additions & 0 deletions flink/v1.20/flink-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors.
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Google Guava.

Copyright: 2006-2020 The Guava Authors
Expand Down
1 change: 1 addition & 0 deletions flink/v1.20/flink-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.21.3
com.fasterxml.jackson.core:jackson-databind:2.21.3
Expand Down
8 changes: 8 additions & 0 deletions flink/v2.0/flink-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors.
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Google Guava.

Copyright: 2006-2020 The Guava Authors
Expand Down
1 change: 1 addition & 0 deletions flink/v2.0/flink-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.21.3
com.fasterxml.jackson.core:jackson-databind:2.21.3
Expand Down
8 changes: 8 additions & 0 deletions flink/v2.1/flink-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors.
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Google Guava.

Copyright: 2006-2020 The Guava Authors
Expand Down
1 change: 1 addition & 0 deletions flink/v2.1/flink-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.21.3
com.fasterxml.jackson.core:jackson-databind:2.21.3
Expand Down
1 change: 1 addition & 0 deletions kafka-connect/kafka-connect-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.azure:azure-core-http-netty:1.16.3
com.azure:azure-core:1.57.1
com.azure:azure-identity:1.18.2
Expand Down
8 changes: 8 additions & 0 deletions spark/v3.4/spark-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors.
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product includes code from Cloudera Kite.

Copyright: 2013-2017 Cloudera Inc.
Expand Down
1 change: 1 addition & 0 deletions spark/v3.4/spark-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.14.2
com.fasterxml.jackson.core:jackson-databind:2.14.2
Expand Down
8 changes: 8 additions & 0 deletions spark/v3.5/spark-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors.
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product includes code from Cloudera Kite.

Copyright: 2013-2017 Cloudera Inc.
Expand Down
1 change: 1 addition & 0 deletions spark/v3.5/spark-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.15.2
com.fasterxml.jackson.core:jackson-databind:2.15.2
Expand Down
8 changes: 8 additions & 0 deletions spark/v4.0/spark-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors.
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product includes code from Cloudera Kite.

Copyright: 2013-2017 Cloudera Inc.
Expand Down
1 change: 1 addition & 0 deletions spark/v4.0/spark-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.15.2
com.fasterxml.jackson.core:jackson-databind:2.15.2
Expand Down
8 changes: 8 additions & 0 deletions spark/v4.1/spark-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors.
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product includes code from Cloudera Kite.

Copyright: 2013-2017 Cloudera Inc.
Expand Down
1 change: 1 addition & 0 deletions spark/v4.1/spark-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.15.2
com.fasterxml.jackson.core:jackson-databind:2.15.2
Expand Down