Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,14 @@ public void testCloseBlockingWaitingForFetcherShutdown() throws Exception {
closingThread.start();

waitUntil(
() -> findThread(SplitFetcherManager.THREAD_NAME_PREFIX).size() == 2,
() -> findThread(SplitFetcherManager.THREAD_NAME_PREFIX).size() >= 2,
Duration.ofSeconds(30),
"The element queue draining thread should have started.");
for (Thread t : findThread(SplitFetcherManager.THREAD_NAME_PREFIX)) {
waitUntil(
() ->
t.getState().equals(Thread.State.WAITING)
!t.isAlive()
|| t.getState().equals(Thread.State.WAITING)
Comment thread
Dennis-Mircea marked this conversation as resolved.
|| t.getState().equals(Thread.State.TIMED_WAITING),
Duration.ofSeconds(30),
"All the executor threads should be in waiting status.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand Down Expand Up @@ -86,6 +85,7 @@ public class AbstractAsyncRunnableStreamOperatorTest {
}

@Test
@SuppressWarnings({"rawtypes"})
void testCreateAsyncExecutionController() throws Exception {
try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
Expand Down Expand Up @@ -153,8 +153,8 @@ void testAsyncProcessWithKey() throws Exception {
new TestKeySelector(), ElementOrder.RECORD_ORDER);
AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0);
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
try {
try (testHarness) {
Comment thread
Dennis-Mircea marked this conversation as resolved.
Outdated
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
testHarness.open();
CompletableFuture<Void> future =
testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));
Expand All @@ -173,8 +173,6 @@ void testAsyncProcessWithKey() throws Exception {
// We don't have the mailbox executor actually running, so the new context is blocked
// and never triggered.
assertThat(testOperator.getProcessed()).isEqualTo(1);
} finally {
testHarness.close();
}
}

Expand All @@ -185,17 +183,14 @@ void testDirectAsyncProcess() throws Exception {
new TestKeySelector(), ElementOrder.RECORD_ORDER);
AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0);
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
try {
try (testHarness) {
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
testHarness.open();
CompletableFuture<Void> future =
testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));
testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));

testHarness.drainAsyncRequests();
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
assertThat(testOperator.getProcessed()).isEqualTo(1);
} finally {
testHarness.close();
}
}

Expand All @@ -209,8 +204,8 @@ void testManyAsyncProcessWithKey() throws Exception {
new TestKeySelector(), ElementOrder.RECORD_ORDER, requests);
AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0);
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
try {
try (testHarness) {
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
testHarness.open();

// Repeat twice
Expand All @@ -227,12 +222,11 @@ void testManyAsyncProcessWithKey() throws Exception {
// This ensures the order is correct according to the priority in AEC.
assertThat(testOperator.getProcessedOrders())
.isEqualTo(testOperator.getExpectedProcessedOrders());
} finally {
testHarness.close();
}
}

@Test
@SuppressWarnings({"rawtypes", "unchecked"})
void testCheckpointDrain() throws Exception {
try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
Expand All @@ -244,14 +238,20 @@ void testCheckpointDrain() throws Exception {
((AbstractAsyncRunnableStreamOperator<String>) testHarness.getOperator())
.setAsyncKeyedContextElement(
new StreamRecord<>(Tuple2.of(5, "5")), new TestKeySelector());
// Block the async processing supplier until we have observed the in-flight record,
// otherwise the request can complete before the assertion runs and make the test
// flaky.
CompletableFuture<Void> blocker = new CompletableFuture<>();
((AbstractAsyncRunnableStreamOperator<String>) testHarness.getOperator())
.asyncProcess(
() -> {
blocker.get();
return null;
});
((AbstractAsyncRunnableStreamOperator<String>) testHarness.getOperator())
.postProcessElement();
assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(1);
blocker.complete(null);
testHarness.drainAsyncRequests();
assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
}
Expand Down Expand Up @@ -325,9 +325,7 @@ void testWatermarkHooks() throws Exception {
});

testOperator.setPostProcessFunction(
(watermark) -> {
testOperator.output(watermark.getTimestamp() + 100L);
});
(watermark) -> testOperator.output(watermark.getTimestamp() + 100L));

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Integer, Long, Long, Long> testHarness =
Expand Down Expand Up @@ -372,8 +370,6 @@ void testWatermarkStatus() throws Exception {
createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
TestOperator testOperator = (TestOperator) testHarness.getOperator();
ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> processor =
RecordProcessorUtils.getRecordProcessor(testOperator);
testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));
testHarness.processWatermarkInternal(new Watermark(205L));
CompletableFuture<Void> future =
Expand Down Expand Up @@ -480,8 +476,7 @@ public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws
synchronized (objectToWait) {
objectToWait.wait();
}
asyncProcess(() -> processed.decrementAndGet())
.thenAccept((a) -> processed.incrementAndGet());
asyncProcess(processed::decrementAndGet).thenAccept((a) -> processed.incrementAndGet());
}

@Override
Expand All @@ -499,15 +494,15 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(
new StreamRecord<>(
"EventTimer-" + timer.getKey() + "-" + timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(
new StreamRecord<>(
Expand Down Expand Up @@ -538,11 +533,7 @@ private static class TestOperatorWithAsyncProcessWithKey extends TestOperator {

@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
asyncProcessWithKey(
element.getValue().f0,
() -> {
processed.incrementAndGet();
});
asyncProcessWithKey(element.getValue().f0, processed::incrementAndGet);
synchronized (objectToWait) {
objectToWait.wait();
}
Expand All @@ -558,7 +549,7 @@ private static class TestOperatorWithDirectAsyncProcess extends TestOperator {
}

@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
public void processElement(StreamRecord<Tuple2<Integer, String>> element) {
asyncProcess(processed::decrementAndGet).thenAccept((e) -> processed.addAndGet(2));
}
}
Expand All @@ -579,7 +570,7 @@ private static class TestOperatorWithMultipleDirectAsyncProcess extends TestOper
}

@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
public void processElement(StreamRecord<Tuple2<Integer, String>> element) {
for (int i = 0; i < numAsyncProcesses; i++) {
final int finalI = i;
if (i < numAsyncProcesses - 1) {
Expand Down Expand Up @@ -625,17 +616,17 @@ private static class TestOperatorWithAsyncProcessTimer extends TestOperator {
}

@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
public void processElement(StreamRecord<Tuple2<Integer, String>> element) {
processed.incrementAndGet();
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) {
asyncProcessWithKey(timer.getKey(), () -> super.onEventTime(timer));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) {
asyncProcessWithKey(timer.getKey(), () -> super.onProcessingTime(timer));
}
}
Expand Down Expand Up @@ -691,23 +682,23 @@ public Watermark postProcessWatermark(Watermark watermark) throws Exception {
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(new StreamRecord<>(timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
}

@Override
public void processElement1(StreamRecord<Long> element) throws Exception {
public void processElement1(StreamRecord<Long> element) {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue());
}

@Override
public void processElement2(StreamRecord<Long> element) throws Exception {
public void processElement2(StreamRecord<Long> element) {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,21 @@ void testRecordNonTerminatedRescaleMergingWithNewRecoverableFailureTriggerCause(

waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM);

if (enabledRescaleHistory(configuration)) {
// The rescale-history bookkeeping (merging the still-open UPDATE_REQUIREMENT rescale
// with the new RECOVERABLE_FAILOVER one) is recorded asynchronously by the scheduler
// and is not synchronized with the parallelism/RUNNING signal we waited for above.
// Poll until the expected merged state is observed to avoid flakiness.
waitUntilConditionWithTimeout(
() -> {
List<Rescale> rescaleHistory = getRescaleHistory(miniCluster, jobGraph);
return rescaleHistory.size() == 2
&& rescaleHistory.get(0).getTriggerCause()
== TriggerCause.RECOVERABLE_FAILOVER;
},
10000);
}

final ExecutionGraphInfo executionGraphInfo =
miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
runAdaptedParameterizedAssertion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ void testMultipleJobVertexFinishedTaskExceedRatio() throws Exception {
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
ev23.getCurrentExecutionAttempt().markFinished();

// Ensure that the still-running tasks have accumulated a strictly larger execution time
// than the just-finished baseline tasks before invoking the detector. Without this wait,
// on fast machines all of {start, markFinished, findSlowTasks} can happen within the
// same millisecond, leaving the running tasks with execution time <= baseline and
// making the test flaky.
Thread.sleep(10);
Comment thread
Dennis-Mircea marked this conversation as resolved.
Outdated

final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks =
slowTaskDetector.findSlowTasks(executionGraph);

Expand Down