Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,17 @@
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>

<!--
AmoroRunListener (registered via META-INF/services/org.junit.platform.launcher.TestExecutionListener)
requires the JUnit Platform launcher API at compile time. Surefire pulls it in transitively at
runtime, but it is not part of junit-jupiter, so we declare it explicitly.
-->
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -416,12 +427,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<properties>
<property>
<name>listener</name>
<value>org.apache.amoro.listener.AmoroRunListener</value>
</property>
</properties>
<argLine>-verbose:class</argLine>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED;

import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.MockAmoroManagementServer;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.TestAms;
import org.apache.amoro.UnifiedCatalog;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.catalog.TableTestBase;
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
import org.apache.amoro.flink.write.MixedFormatRowDataTaskWriterFactory;
import org.apache.amoro.io.reader.GenericKeyedDataReader;
import org.apache.amoro.mixed.CatalogLoader;
import org.apache.amoro.mixed.MixedFormatCatalog;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.scan.CombinedScanTask;
import org.apache.amoro.scan.KeyedTableScanTask;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
Expand All @@ -37,6 +44,14 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.KeyedTable;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableBuilder;
import org.apache.amoro.table.TableMetaStore;
import org.apache.amoro.table.UnkeyedTable;
import org.apache.amoro.utils.CatalogUtil;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
Expand Down Expand Up @@ -67,13 +82,15 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.nio.file.Files;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
Expand All @@ -84,14 +101,24 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;

public class FlinkTestBase extends TableTestBase {
/**
* JUnit 5 base class for Flink integration tests in this module.
*
* <p>This class intentionally no longer extends {@code TableTestBase}/{@code CatalogTestBase}
* (which remain on JUnit 4 until the closing PR of the umbrella migration). The catalog/table
* lifecycle is re-implemented here against the same {@link CatalogTestHelper}/{@link
* TableTestHelper} contracts so that children can stay clean Jupiter classes; for parameterized
* children that cannot pass helpers through a constructor, call {@link
* #initFlinkTestBase(CatalogTestHelper, TableTestHelper)} from the {@code @ParameterizedTest}
* method body.
*/
public class FlinkTestBase {
private static final Logger LOG = LoggerFactory.getLogger(FlinkTestBase.class);

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
MiniClusterResource.createWithClassloaderCheckDisabled();
protected static final TestAms TEST_AMS = new TestAms();

@Rule public TestName name = new TestName();
protected static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
MiniClusterResource.createWithClassloaderCheckDisabled();

public static String metastoreUri;

Expand All @@ -113,14 +140,228 @@ public class FlinkTestBase extends TableTestBase {

public static InternalCatalogBuilder catalogBuilder;

private CatalogTestHelper catalogTestHelper;
private TableTestHelper tableTestHelper;
private CatalogMeta catalogMeta;
private MixedFormatCatalog mixedFormatCatalog;
private UnifiedCatalog unifiedCatalog;
private org.apache.iceberg.catalog.Catalog icebergCatalog;
private MixedTable mixedTable;
private TableMetaStore tableMetaStore;
private File tempRoot;

/**
* No-arg constructor for parameterized children that pass helpers via {@link #initFlinkTestBase}.
*/
public FlinkTestBase() {}

public FlinkTestBase(CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
super(catalogTestHelper, tableTestHelper);
this.catalogTestHelper = catalogTestHelper;
this.tableTestHelper = tableTestHelper;
}

@BeforeAll
public static void startFlinkBaseClassResources() throws Exception {
TEST_AMS.before();
MINI_CLUSTER_RESOURCE.before();
}

@Before
public void before() throws Exception {
@AfterAll
public static void stopFlinkBaseClassResources() {
try {
MINI_CLUSTER_RESOURCE.after();
} finally {
TEST_AMS.after();
}
}

@BeforeEach
public void setUpFlinkTestBaseLifecycle() throws Exception {
if (catalogTestHelper == null) {
// Parameterized child: the @ParameterizedTest body must call initFlinkTestBase(...).
return;
}
initLifecycle();
}

@AfterEach
public void tearDownFlinkTestBaseLifecycle() {
teardownLifecycle();
}

/**
* Initializer used by {@code @ParameterizedTest} children. Sets the helpers (since the parameters
* are received by the test method, not the constructor) and runs the catalog/table setup.
*/
protected void initFlinkTestBase(
CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) throws Exception {
this.catalogTestHelper = catalogTestHelper;
this.tableTestHelper = tableTestHelper;
initLifecycle();
}

private void initLifecycle() throws Exception {
tempRoot = Files.createTempDirectory("flink-test-base").toFile();
String baseDir = tempRoot.getPath();
if (!SystemUtils.IS_OS_UNIX) {
baseDir = "file:/" + baseDir.replace("\\", "/");
}
catalogMeta = catalogTestHelper.buildCatalogMeta(baseDir);
catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI, TEST_AMS.getServerUrl());
getAmsHandler().createCatalog(catalogMeta);
metastoreUri = getCatalogUri();
catalogBuilder = InternalCatalogBuilder.builder().amsUri(metastoreUri);
if (tableTestHelper != null) {
createTestTable();
}
}

private void teardownLifecycle() {
if (tableTestHelper != null && unifiedCatalog != null) {
try {
unifiedCatalog.dropTable(
tableTestHelper.id().getDatabase(), tableTestHelper.id().getTableName(), true);
} catch (Exception e) {
LOG.warn("dropTable failed", e);
}
try {
unifiedCatalog.dropDatabase(TableTestHelper.TEST_DB_NAME);
} catch (Exception e) {
// ignore
}
}
if (catalogMeta != null) {
try {
getAmsHandler().dropCatalog(catalogMeta.getCatalogName());
} catch (Exception e) {
LOG.warn("dropCatalog failed", e);
}
}
if (tempRoot != null) {
try {
FileUtils.deleteDirectory(tempRoot);
} catch (Exception e) {
LOG.warn("Failed to clean temp directory {}", tempRoot, e);
}
}
catalogMeta = null;
mixedFormatCatalog = null;
unifiedCatalog = null;
icebergCatalog = null;
mixedTable = null;
tableMetaStore = null;
tempRoot = null;
}

private void createTestTable() {
this.tableMetaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
getUnifiedCatalog().createDatabase(TableTestHelper.TEST_DB_NAME);
TableFormat format = getTestFormat();
if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
createMixedFormatTable();
} else if (TableFormat.ICEBERG.equals(format)) {
createIcebergFormatTable();
}
}

private void createMixedFormatTable() {
TableBuilder tableBuilder =
getMixedFormatCatalog()
.newTableBuilder(TableTestHelper.TEST_TABLE_ID, tableTestHelper.tableSchema());
tableBuilder.withProperties(tableTestHelper.tableProperties());
if (isKeyedTable()) {
tableBuilder.withPrimaryKeySpec(tableTestHelper.primaryKeySpec());
}
if (isPartitionedTable()) {
tableBuilder.withPartitionSpec(tableTestHelper.partitionSpec());
}
mixedTable = tableBuilder.create();
}

private void createIcebergFormatTable() {
getIcebergCatalog()
.createTable(
org.apache.iceberg.catalog.TableIdentifier.of(
TableTestHelper.TEST_DB_NAME, TableTestHelper.TEST_TABLE_NAME),
tableTestHelper.tableSchema(),
tableTestHelper.partitionSpec(),
tableTestHelper.tableProperties());
mixedTable =
(MixedTable)
getUnifiedCatalog()
.loadTable(TableTestHelper.TEST_DB_NAME, TableTestHelper.TEST_TABLE_NAME)
.originalTable();
}

public static MockAmoroManagementServer.AmsHandler getAmsHandler() {
return TEST_AMS.getAmsHandler();
}

protected MixedFormatCatalog getMixedFormatCatalog() {
if (mixedFormatCatalog == null) {
mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
}
return mixedFormatCatalog;
}

protected void refreshMixedFormatCatalog() {
this.mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
}

protected String getCatalogUri() {
return TEST_AMS.getServerUrl() + "/" + catalogMeta.getCatalogName();
}

protected CatalogMeta getCatalogMeta() {
return catalogMeta;
}

protected TableFormat getTestFormat() {
return catalogTestHelper.tableFormat();
}

protected org.apache.iceberg.catalog.Catalog getIcebergCatalog() {
if (icebergCatalog == null) {
icebergCatalog = catalogTestHelper.buildIcebergCatalog(catalogMeta);
}
return icebergCatalog;
}

protected UnifiedCatalog getUnifiedCatalog() {
if (unifiedCatalog == null) {
unifiedCatalog = catalogTestHelper.buildUnifiedCatalog(catalogMeta);
}
return unifiedCatalog;
}

protected MixedTable getMixedTable() {
return mixedTable;
}

protected UnkeyedTable getBaseStore() {
return MixedTableUtil.baseStore(mixedTable);
}

protected TableMetaStore getTableMetaStore() {
return this.tableMetaStore;
}

protected boolean isKeyedTable() {
return tableTestHelper.primaryKeySpec() != null
&& tableTestHelper.primaryKeySpec().primaryKeyExisted();
}

protected boolean isPartitionedTable() {
return tableTestHelper.partitionSpec() != null
&& tableTestHelper.partitionSpec().isPartitioned();
}

protected TableTestHelper tableTestHelper() {
return tableTestHelper;
}

protected CatalogTestHelper catalogTestHelper() {
return catalogTestHelper;
}

public void config() {
Expand All @@ -129,6 +370,10 @@ public void config() {
props.put(CatalogFactoryOptions.AMS_URI.key(), metastoreUri);
}

protected int defaultParallelism() {
return 1;
}

public static void prepare() throws Exception {
KAFKA_CONTAINER.start();
}
Expand Down Expand Up @@ -163,7 +408,7 @@ protected StreamExecutionEnvironment getEnv() {
env =
StreamExecutionEnvironment.getExecutionEnvironment(
MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.setParallelism(1);
env.setParallelism(defaultParallelism());
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(300);
env.getCheckpointConfig()
Expand Down Expand Up @@ -307,12 +552,18 @@ protected static void commit(KeyedTable keyedTable, WriteResult result, boolean
}
}

/**
* 3-arg variant of the keyed task writer factory with a default mask of 3. The 4-arg variant
* lives under a different name ({@link #createKeyedTaskWriterWithMask}) to avoid the inherited
* static signature clashing with the {@code FlinkTaskWriterBaseTest} interface's default method
* in classes that {@code implements} that interface.
*/
protected static TaskWriter<RowData> createKeyedTaskWriter(
KeyedTable keyedTable, RowType rowType, boolean base) {
return createKeyedTaskWriter(keyedTable, rowType, base, 3);
return createKeyedTaskWriterWithMask(keyedTable, rowType, base, 3);
}

protected static TaskWriter<RowData> createKeyedTaskWriter(
protected static TaskWriter<RowData> createKeyedTaskWriterWithMask(
KeyedTable keyedTable, RowType rowType, boolean base, long mask) {
MixedFormatRowDataTaskWriterFactory taskWriterFactory =
new MixedFormatRowDataTaskWriterFactory(keyedTable, rowType, base);
Expand Down
Loading
Loading