Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand All @@ -36,6 +37,7 @@
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -148,6 +150,17 @@ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
// test new assignment add functions
FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);

// Use CountDownLatch to park the background thread after first processing and before re-stubbing
CountDownLatch firstProcessed = new java.util.concurrent.CountDownLatch(1);
CountDownLatch release = new java.util.concurrent.CountDownLatch(1);

// On first processing of message1, block and wait for re-stubbing
doAnswer(inv -> {
firstProcessed.countDown();
release.await(5, TimeUnit.SECONDS);
return null;
}).when(functionRuntimeManager).processAssignmentMessage(eq(message1));

FunctionAssignmentTailer functionAssignmentTailer =
spy(new FunctionAssignmentTailer(functionRuntimeManager, readerBuilder, workerConfig, errorNotifier));

Expand All @@ -157,12 +170,17 @@ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
verify(errorNotifier, times(0)).triggerError(any());

messageList.add(message1);
Assert.assertTrue(firstProcessed.await(5, TimeUnit.SECONDS),
"First processing did not reach the blocking point");

verify(errorNotifier, times(0)).triggerError(any());

// trigger an error to be thrown
doThrow(new RuntimeException("test")).when(functionRuntimeManager).processAssignmentMessage(any());

// Release the first processing
release.countDown();

messageList.add(message2);

try {
Expand Down
Loading