Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,42 @@ void testDataWriterGenericWriteEngineRead(FileFormat fileFormat, DataGenerator d
assertEquals(schema, convertToEngineRecords(genericRecords, schema), readRecords);
}

/** Write with engine type T, read with engine type T */
@ParameterizedTest
@FieldSource("FORMAT_AND_GENERATOR")
void testDataWriterEngineWriteEngineRead(FileFormat fileFormat, DataGenerator dataGenerator)
throws IOException {
Schema schema = dataGenerator.schema();
FileWriterBuilder<DataWriter<T>, Object> writerBuilder =
FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(), encryptedFile);

DataWriter<T> writer = writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build();

List<Record> genericRecords = dataGenerator.generateRecords();
List<T> engineRecords = convertToEngineRecords(genericRecords, schema);

try (writer) {
engineRecords.forEach(writer::write);
}

DataFile dataFile = writer.toDataFile();

assertThat(dataFile).isNotNull();
assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size());
assertThat(dataFile.format()).isEqualTo(fileFormat);

InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
List<T> readRecords;
try (CloseableIterable<T> reader =
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
.project(schema)
.build()) {
readRecords = ImmutableList.copyOf(reader);
}

assertEquals(schema, engineRecords, readRecords);
}

/** Write with engine type T, read with Generic Record */
@ParameterizedTest
@FieldSource("FORMAT_AND_GENERATOR")
Expand Down
17 changes: 16 additions & 1 deletion data/src/test/java/org/apache/iceberg/data/DataGenerators.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
*/
class DataGenerators {

static final DataGenerator[] ALL = new DataGenerator[] {new StructOfPrimitive()};
static final DataGenerator[] ALL =
new DataGenerator[] {new StructOfPrimitive(), new StructWithDecimals()};

private DataGenerators() {}

Expand All @@ -50,6 +51,20 @@ public Schema schema() {
}
}

static class StructWithDecimals implements DataGenerator {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Decimals

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — there's no nested struct here, so StructWithDecimals was a misleading name. Renamed the class to Decimals (and updated the ALL array) in 5dc4b5d.

private final Schema schema =
new Schema(
required(1, "row_id", Types.StringType.get()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No — row_id isn't needed for the decimal write/read roundtrip and the generator has no identity semantics. Dropped the column and renumbered the decimal field IDs to 1/2/3 in 5dc4b5d.

required(2, "dec_9_2", Types.DecimalType.of(9, 2)),
required(3, "dec_15_3", Types.DecimalType.of(15, 3)),
required(4, "dec_38_10", Types.DecimalType.of(38, 10)));

@Override
public Schema schema() {
return schema;
}
}

static class DefaultSchema implements DataGenerator {
private final Schema schema =
new Schema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ private static class BinaryDecimalReader
public DecimalData read(DecimalData ignored) {
Binary binary = column.nextBinary();
BigDecimal bigDecimal = new BigDecimal(new BigInteger(binary.getBytes()), scale);
// TODO: need a unit test to write-read-validate decimal via FlinkParquetWrite/Reader
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ private static class BinaryDecimalReader
public DecimalData read(DecimalData ignored) {
Binary binary = column.nextBinary();
BigDecimal bigDecimal = new BigDecimal(new BigInteger(binary.getBytes()), scale);
// TODO: need a unit test to write-read-validate decimal via FlinkParquetWrite/Reader
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ private static class BinaryDecimalReader
public DecimalData read(DecimalData ignored) {
Binary binary = column.nextBinary();
BigDecimal bigDecimal = new BigDecimal(new BigInteger(binary.getBytes()), scale);
// TODO: need a unit test to write-read-validate decimal via FlinkParquetWrite/Reader
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
}
}
Expand Down