diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 7cfa428d337a7..63638d12c9bb0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; @@ -38,14 +37,12 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.internals.StreamsConfigUtils; import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory; @@ -76,13 +73,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Deque; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -123,7 +117,6 @@ import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -212,16 +205,16 @@ public class TaskManagerTest { @BeforeEach public void setUp() { - taskManager = setUpTaskManagerWithoutStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false); + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false); } - private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode processingMode, final TasksRegistry tasks) { - return setUpTaskManagerWithStateUpdater(processingMode, tasks, false); + private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TasksRegistry tasks) { + return setUpTaskManager(processingMode, tasks, false); } - private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode processingMode, - final TasksRegistry tasks, - final boolean processingThreadsEnabled) { + private TaskManager setUpTaskManager(final ProcessingMode processingMode, + final TasksRegistry tasks, + final boolean processingThreadsEnabled) { topologyMetadata = new TopologyMetadata(topologyBuilder, new DummyStreamsConfig(processingMode)); final TaskManager taskManager = new TaskManager( time, @@ -241,35 +234,13 @@ private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode proces return taskManager; } - private TaskManager setUpTaskManagerWithoutStateUpdater(final ProcessingMode processingMode, - final TasksRegistry tasks, - final boolean processingThreadsEnabled) { - topologyMetadata = new TopologyMetadata(topologyBuilder, new DummyStreamsConfig(processingMode)); - final TaskManager taskManager = new TaskManager( - time, - changeLogReader, - ProcessId.randomProcessId(), - "taskManagerTest", - activeTaskCreator, - standbyTaskCreator, - tasks != null ? tasks : new Tasks(new LogContext()), - topologyMetadata, - adminClient, - stateDirectory, - null, - processingThreadsEnabled ? schedulingTaskManager : null - ); - taskManager.setMainConsumer(consumer); - return taskManager; - } - @Test public void shouldLockAllTasksOnCorruptionWithProcessingThreads() { final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01)); when(tasks.task(taskId00)).thenReturn(activeTask1); final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); @@ -291,7 +262,7 @@ public void shouldLockCommitableTasksOnCorruptionWithProcessingThreads() { .inState(State.RUNNING) .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); @@ -304,7 +275,7 @@ public void shouldLockCommitableTasksOnCorruptionWithProcessingThreads() { @Test public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() { final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01)); final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); @@ -327,7 +298,7 @@ public void shouldLockAffectedTasksOnHandleRevocation() { .inState(State.RUNNING) .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2)); final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); @@ -347,7 +318,7 @@ public void shouldLockTasksOnClose() { .inState(State.RUNNING) .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2)); final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); @@ -367,7 +338,7 @@ public void shouldResumePollingForPartitionsWithAvailableSpaceForAllActiveTasks( .inState(State.RUNNING) .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2)); taskManager.resumePollingForPartitionsWithAvailableSpace(); @@ -385,7 +356,7 @@ public void shouldUpdateLagForAllActiveTasks() { .inState(State.RUNNING) .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2)); taskManager.updateLags(); @@ -400,7 +371,7 @@ public void shouldRemoveUnusedActiveTaskFromStateUpdaterAndCloseCleanly() { .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future); @@ -420,7 +391,7 @@ public void shouldRemoveUnusedFailedActiveTaskFromStateUpdaterAndCloseDirty() { .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future); @@ -441,7 +412,7 @@ public void shouldRemoveUnusedStandbyTaskFromStateUpdaterAndCloseCleanly() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future); @@ -461,7 +432,7 @@ public void shouldRemoveUnusedFailedStandbyTaskFromStateUpdaterAndCloseDirty() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future); @@ -482,7 +453,7 @@ public void shouldCollectFailedTaskFromStateUpdaterAndRethrow() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTask)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(failedStandbyTask.id())).thenReturn(future); @@ -509,7 +480,7 @@ public void shouldUpdateInputPartitionOfActiveTaskInStateUpdater() { .withInputPartitions(taskId03Partitions).build(); final Set newInputPartitions = taskId02Partitions; final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(activeTaskToUpdateInputPartitions.id())).thenReturn(future); @@ -537,7 +508,7 @@ public void shouldRecycleActiveTaskInStateUpdater() { .inState(State.CREATED) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle)); when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId03Partitions)) .thenReturn(recycledStandbyTask); @@ -561,7 +532,7 @@ public void shouldHandleExceptionThrownDuringRecyclingActiveTask() { .inState(State.RESTORING) .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle)); when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, activeTaskToRecycle.inputPartitions())) .thenThrow(new RuntimeException()); @@ -591,7 +562,7 @@ public void shouldRecycleStandbyTaskInStateUpdater() { .inState(State.CREATED) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle)); when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, taskId03Partitions, consumer)) .thenReturn(recycledActiveTask); @@ -615,7 +586,7 @@ public void shouldHandleExceptionThrownDuringRecyclingStandbyTask() { .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle)); when(activeTaskCreator.createActiveTaskFromStandby( standbyTaskToRecycle, @@ -645,7 +616,7 @@ public void shouldKeepReassignedActiveTaskInStateUpdater() { .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask)); taskManager.handleAssignment( @@ -664,7 +635,7 @@ public void shouldMoveReassignedSuspendedActiveTaskToStateUpdater() { .inState(State.SUSPENDED) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask)); taskManager.handleAssignment( @@ -684,7 +655,7 @@ public void shouldAddFailedActiveTaskToRecycleDuringAssignmentToTaskRegistry() { .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToRecycle)); final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!"); when(stateUpdater.remove(failedActiveTaskToRecycle.id())) @@ -714,7 +685,7 @@ public void shouldAddFailedStandbyTaskToRecycleDuringAssignmentToTaskRegistry() .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTaskToRecycle)); final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!"); when(stateUpdater.remove(failedStandbyTaskToRecycle.id())) @@ -744,7 +715,7 @@ public void shouldAddFailedActiveTasksToReassignWithDifferentInputPartitionsDuri .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToReassign)); final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!"); when(stateUpdater.remove(failedActiveTaskToReassign.id())) @@ -777,7 +748,7 @@ public void shouldFirstHandleTasksInStateUpdaterThenSuspendedActiveTasksInTaskRe .inState(State.RESTORING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1)); when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2)); when(stateUpdater.remove(reassignedActiveTask2.id())) @@ -803,7 +774,7 @@ public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions)); taskManager.handleAssignment( @@ -821,7 +792,7 @@ public void shouldKeepReassignedStandbyTaskInStateUpdater() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(reassignedStandbyTask)); taskManager.handleAssignment( @@ -845,7 +816,7 @@ public void shouldAssignMultipleTasksInStateUpdater() { .inState(State.CREATED) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose, standbyTaskToRecycle)); final CompletableFuture futureForActiveTaskToClose = new CompletableFuture<>(); when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose); @@ -880,7 +851,7 @@ public void shouldReturnRunningTasksStateUpdaterTasksAndTasksToInitInAllTasks() .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater)); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, runningActiveTask))); @@ -904,7 +875,7 @@ public void shouldNotReturnStateUpdaterTasksInOwnedTasks() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask))); assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, activeTask))); @@ -916,7 +887,7 @@ public void shouldCreateActiveTaskDuringAssignment() { .inState(State.CREATED) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final Set createdTasks = Set.of(activeTaskToBeCreated); final Map> tasksToBeCreated = mkMap( mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions())); @@ -934,7 +905,7 @@ public void shouldCreateStandbyTaskDuringAssignment() { .inState(State.CREATED) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final Set createdTasks = Set.of(standbyTaskToBeCreated); when(standbyTaskCreator.createTasks(mkMap( mkEntry(standbyTaskToBeCreated.id(), standbyTaskToBeCreated.inputPartitions()))) @@ -950,7 +921,7 @@ public void shouldCreateStandbyTaskDuringAssignment() { } @Test - public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdaterEnabled() { + public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInit() { final StreamTask activeTaskToRecycle = statefulTask(taskId01, taskId01ChangelogPartitions) .withInputPartitions(taskId01Partitions) .inState(State.RUNNING).build(); @@ -961,7 +932,7 @@ public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithState when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle)); when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId01Partitions)) .thenReturn(standbyTask); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions))); @@ -973,13 +944,13 @@ public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithState } @Test - public void shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistryWithStateUpdaterEnabled() { + public void shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistry() { final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, taskId03ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final IllegalStateException illegalStateException = assertThrows( IllegalStateException.class, @@ -995,12 +966,12 @@ public void shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegis } @Test - public void shouldAssignActiveTaskInTasksRegistryToBeClosedCleanlyWithStateUpdaterEnabled() { + public void shouldAssignActiveTaskInTasksRegistryToBeClosedCleanly() { final StreamTask activeTaskToClose = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose)); taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()); @@ -1013,12 +984,12 @@ public void shouldAssignActiveTaskInTasksRegistryToBeClosedCleanlyWithStateUpdat } @Test - public void shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistryWithStateUpdaterEnabled() { + public void shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistry() { final StandbyTask standbyTaskToClose = standbyTask(taskId03, taskId03ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose)); final IllegalStateException illegalStateException = assertThrows( @@ -1032,13 +1003,13 @@ public void shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistr } @Test - public void shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitionsWithStateUpdaterEnabled() { + public void shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitions() { final StreamTask activeTaskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final Set newInputPartitions = taskId02Partitions; final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions)); when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, newInputPartitions)).thenReturn(true); @@ -1053,12 +1024,12 @@ public void shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitionsWithStat } @Test - public void shouldResumeActiveRunningTaskInTasksRegistryWithStateUpdaterEnabled() { + public void shouldResumeActiveRunningTaskInTasksRegistry() { final StreamTask activeTaskToResume = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume)); taskManager.handleAssignment( @@ -1076,7 +1047,7 @@ public void shouldResumeActiveSuspendedTaskInTasksRegistryAndAddToStateUpdater() .inState(State.SUSPENDED) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume)); taskManager.handleAssignment( @@ -1092,13 +1063,13 @@ public void shouldResumeActiveSuspendedTaskInTasksRegistryAndAddToStateUpdater() } @Test - public void shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFoundInTasksRegistryWithStateUpdaterEnabled() { + public void shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFoundInTasksRegistry() { final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final Set newInputPartitions = taskId03Partitions; final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions)); final IllegalStateException illegalStateException = assertThrows( @@ -1115,7 +1086,7 @@ public void shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFou } @Test - public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() { + public void shouldAssignMultipleTasksInTasksRegistry() { final StreamTask activeTaskToClose = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); @@ -1123,7 +1094,7 @@ public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() { .inState(State.CREATED) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose)); taskManager.handleAssignment( @@ -1149,7 +1120,7 @@ public void shouldAddTasksToStateUpdater() { .inState(State.RUNNING).build(); final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01)); - taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); @@ -1171,7 +1142,7 @@ public void shouldRetryInitializationWhenLockExceptionInStateUpdater() { when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01)); final LockException lockException = new LockException("Where are my keys??"); doThrow(lockException).when(task00).initializeIfNeeded(); - taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); @@ -1198,7 +1169,7 @@ public void shouldRetryInitializationWhenTimeoutExceptionInStateUpdater() { when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01)); final TimeoutException timeoutException = new TimeoutException("Timed out!"); doThrow(timeoutException).when(task00).initializeIfNeeded(); - taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); @@ -1225,7 +1196,7 @@ public void shouldRetryInitializationWithBackoffWhenInitializationFails() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01)); doThrow(new LockException("Lock Exception!")).when(task00).initializeIfNeeded(); - taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); @@ -1263,7 +1234,7 @@ public void shouldRetryInitializationWithBackoffWhenInitializationFails() { } @Test - public void shouldRethrowRuntimeExceptionInInitTaskWithStateUpdater() { + public void shouldRethrowRuntimeExceptionInInitTask() { final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) .withInputPartitions(taskId00Partitions) .inState(State.CREATED).build(); @@ -1271,7 +1242,7 @@ public void shouldRethrowRuntimeExceptionInInitTaskWithStateUpdater() { when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00)); final RuntimeException runtimeException = new RuntimeException("KABOOM!"); doThrow(runtimeException).when(task00).initializeIfNeeded(); - taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); final StreamsException streamsException = assertThrows( StreamsException.class, @@ -1297,7 +1268,7 @@ public void shouldRethrowTaskCorruptedExceptionFromInitialization() { .inState(State.CREATED) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks, false); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, false); when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(statefulTask0, statefulTask1, statefulTask2)); doThrow(new TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded(); doThrow(new TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded(); @@ -1318,7 +1289,7 @@ public void shouldRethrowTaskCorruptedExceptionFromInitialization() { public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() { when(stateUpdater.restoresActiveTasks()).thenReturn(true); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)); } @@ -1327,7 +1298,7 @@ public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() { public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.hasPendingTasksToInit()).thenReturn(true); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)); } @@ -1335,7 +1306,7 @@ public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAn @Test public void shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingInit() { final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)); } @@ -1555,7 +1526,7 @@ public void shouldCloseTasksWhenRemoveFailedActiveTasksFromStateUpdaterOnPartiti private TaskManager setupForRevocationAndLost(final Set tasksInStateUpdater, final TasksRegistry tasks) { - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(tasksInStateUpdater); return taskManager; @@ -1624,12 +1595,12 @@ private TaskManager setUpTransitionToRunningOfRestoredTask(final Set when(stateUpdater.restoresActiveTasks()).thenReturn(true); when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(statefulTasks); - return setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); } @Test - public void shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() { - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false); + public void shouldReturnCorrectBooleanWhenTryingToCompleteRestoration() { + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, null, false); when(stateUpdater.restoresActiveTasks()).thenReturn(false); assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); @@ -1647,7 +1618,7 @@ public void shouldRethrowStreamsExceptionFromStateUpdater() { when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks)); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final StreamsException thrown = assertThrows( StreamsException.class, @@ -1674,7 +1645,7 @@ public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() { when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0, exceptionAndTasks1)); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final TaskCorruptedException thrown = assertThrows( TaskCorruptedException.class, @@ -1746,7 +1717,7 @@ public void shouldUnlockEmptyDirsAtRebalanceStart() throws Exception { public void shouldPauseAllTopicsOnRebalanceComplete() { final Set assigned = Set.of(t1p0, t1p1); when(consumer.assignment()).thenReturn(assigned); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, null); taskManager.handleRebalanceComplete(); verify(consumer).pause(assigned); @@ -1758,7 +1729,7 @@ public void shouldNotPauseReadyTasksOnRebalanceComplete() { .inState(State.RUNNING) .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0)); final Set assigned = Set.of(t1p0, t1p1); when(consumer.assignment()).thenReturn(assigned); @@ -1780,7 +1751,7 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask, restoringStatefulTask)); when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask)); @@ -1814,7 +1785,7 @@ public void shouldComputeOffsetSumForRunningStatefulTask() { ); when(runningStatefulTask.changelogOffsets()).thenReturn(changelogOffsets); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); assertThat( @@ -1837,7 +1808,7 @@ public void shouldComputeOffsetSumForNonRunningActiveTask() throws Exception { when(restoringStatefulTask.changelogOffsets()) .thenReturn(changelogOffsets); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask)); assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); @@ -1857,7 +1828,7 @@ public void shouldComputeOffsetSumForRestoringActiveTask() throws Exception { final Map changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L)); writeCheckpointFile(taskId00, changelogOffsetInCheckpoint); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask)); taskManager.handleRebalanceStart(singleton("topic")); @@ -1875,7 +1846,7 @@ public void shouldComputeOffsetSumForRestoringStandbyTask() throws Exception { final Map changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L)); writeCheckpointFile(taskId00, changelogOffsetInCheckpoint); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask)); taskManager.handleRebalanceStart(singleton("topic")); @@ -1900,7 +1871,7 @@ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask() { when(restoringStandbyTask.changelogOffsets()) .thenReturn(mkMap(mkEntry(t1p2changelog, changelogOffsetOfRestoringStandbyTask))); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask, restoringStatefulTask)); @@ -1925,7 +1896,7 @@ public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() { mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN) )); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, restoringStatefulTask))); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask)); @@ -1952,7 +1923,7 @@ public void shouldComputeOffsetSumForStandbyTask() throws Exception { when(standbyTask.changelogOffsets()).thenReturn(changelogOffsets); final TasksRegistry tasks = mock(TasksRegistry.class); - taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask)); @@ -2000,7 +1971,7 @@ public void shouldComputeOffsetSumFromCheckpointFileForCreatedAndClosedTasks(fin final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, task))); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); expectLockObtainedFor(taskId00); makeTaskFolders(taskId00.toString()); @@ -2060,7 +2031,7 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleAssignment(emptyMap(), emptyMap()); @@ -2081,7 +2052,7 @@ public void shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final RuntimeException thrown = assertThrows( RuntimeException.class, @@ -2124,7 +2095,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() { expectLockObtainedFor(taskId00, taskId01); expectDirectoryNotEmpty(taskId00, taskId01); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRebalanceStart(emptySet()); assertThat(taskManager.lockedTaskDirectories(), is(Set.of(taskId00, taskId01))); @@ -2155,7 +2126,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() { @Test public void shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() { - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, null, false); taskManager.handleLostAll(); @@ -2171,7 +2142,7 @@ public void shouldReAddRevivedTasksToStateUpdater() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.task(taskId03)).thenReturn(corruptedActiveTask); when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask); @@ -2208,7 +2179,7 @@ public void shouldReviveCorruptTasks() { when(consumer.assignment()).thenReturn(taskId00Partitions); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleCorruption(singleton(taskId00)); @@ -2239,7 +2210,7 @@ public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() { when(task00.changelogPartitions()).thenReturn(taskId00ChangelogPartitions); doThrow(new RuntimeException("oops")).when(task00).suspend(); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleCorruption(singleton(taskId00)); @@ -2280,7 +2251,7 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { when(consumer.assignment()).thenReturn(taskId00Partitions); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleCorruption(Set.of(taskId00)); @@ -2308,7 +2279,7 @@ public void shouldNotCommitNonCorruptedRestoringActiveTasksAndNotCommitRunningSt final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, corruptedTask))); when(tasks.task(taskId02)).thenReturn(corruptedTask); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(consumer.assignment()).thenReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions)); taskManager.handleCorruption(Set.of(taskId02)); @@ -2347,7 +2318,7 @@ public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorrupte doNothing().when(corruptedStandby).suspend(); doNothing().when(corruptedStandby).postCommit(anyBoolean()); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertThrows(TaskMigratedException.class, () -> taskManager.handleCorruption(singleton(taskId00))); @@ -2392,7 +2363,7 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() { when(consumer.assignment()).thenReturn(taskId00Partitions); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRebalanceStart(singleton(topic1)); assertThat(taskManager.rebalanceInProgress(), is(true)); @@ -2459,7 +2430,7 @@ public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCo doThrow(new TimeoutException()).when(producer).commitTransaction(offsets, groupMetadata); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks); taskManager.handleCorruption(singleton(taskId00)); @@ -2532,7 +2503,7 @@ public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCo doThrow(new TimeoutException()).when(consumer).commitSync(offsets); when(consumer.assignment()).thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleCorruption(singleton(taskId00)); @@ -2614,7 +2585,7 @@ public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCo expectedCommittedOffsets.putAll(unrevokedTaskOffsets); doThrow(new TimeoutException()).when(consumer).commitSync(expectedCommittedOffsets); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRevocation(taskId00Partitions); @@ -2707,7 +2678,7 @@ public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCo expectedCommittedOffsets.putAll(unrevokedTaskOffsets); doThrow(new TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, groupMetadata); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks); taskManager.handleRevocation(taskId00Partitions); @@ -2751,7 +2722,7 @@ public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.drainPendingTasksToInit()).thenReturn(emptySet()); - taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(task00)); @@ -2788,7 +2759,7 @@ public void shouldAddNonResumedSuspendedTasks() { when(tasks.drainPendingTasksToInit()).thenReturn(emptySet()); when(tasks.hasPendingTasksToInit()).thenReturn(false); - taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(task01)); when(stateUpdater.restoresActiveTasks()).thenReturn(false); @@ -2824,7 +2795,7 @@ public void shouldUpdateInputPartitionsAfterRebalance() { when(tasks.hasPendingTasksToInit()).thenReturn(false); when(tasks.updateActiveTaskInputPartitions(task00, newPartitionsSet)).thenReturn(true); - taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(emptySet()); when(stateUpdater.restoresActiveTasks()).thenReturn(false); @@ -2850,7 +2821,7 @@ public void shouldAddNewActiveTasks() { final Map> assignment = taskId00Assignment; final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); // first, we need to handle assignment -- creates tasks and adds to pending initialization when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00)); @@ -2888,7 +2859,7 @@ public void shouldNotCompleteRestorationIfTasksCannotInitialize() { .build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final Map> assignment = mkMap( mkEntry(taskId00, taskId00Partitions), mkEntry(taskId01, taskId01Partitions) @@ -2931,7 +2902,7 @@ public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() { .build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.restoresActiveTasks()).thenReturn(true); when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of(task00)); @@ -2962,7 +2933,7 @@ public void shouldSuspendActiveTasksDuringRevocation() { when(task00.commitNeeded()).thenReturn(true); when(task00.prepareCommit(true)).thenReturn(offsets); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRevocation(taskId00Partitions); @@ -3028,7 +2999,7 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo expectedCommittedOffsets.putAll(offsets00); expectedCommittedOffsets.putAll(offsets01); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks); taskManager.handleRevocation(taskId00Partitions); @@ -3093,7 +3064,7 @@ public void shouldCommitAllNeededTasksOnHandleRevocation() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, task03)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRevocation(taskId00Partitions); @@ -3148,7 +3119,7 @@ public void shouldNotCommitIfNoRevokedTasksNeedCommitting(final ProcessingMode p when(task01.commitNeeded()).thenReturn(true); // only task01 needs commit when(task02.commitNeeded()).thenReturn(false); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(processingMode, tasks); + final TaskManager taskManager = setUpTaskManager(processingMode, tasks); taskManager.handleRevocation(taskId00Partitions); @@ -3171,7 +3142,7 @@ public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() { .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00)); when(stateUpdater.tasks()).thenReturn(Set.of(task01)); @@ -3205,7 +3176,7 @@ public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() { .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00)); when(stateUpdater.tasks()).thenReturn(Set.of(task01)); @@ -3238,7 +3209,7 @@ public void shouldNotCommitCreatedTasksOnRevocationOrClosure() { .build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(activeTaskCreator.createTasks(consumer, taskId00Assignment)) .thenReturn(singletonList(task00)); @@ -3277,7 +3248,7 @@ public void shouldPassUpIfExceptionDuringSuspend() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertThrows(RuntimeException.class, () -> taskManager.handleRevocation(taskId00Partitions)); @@ -3308,7 +3279,7 @@ private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final P .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(processingMode, tasks); + final TaskManager taskManager = setUpTaskManager(processingMode, tasks); doThrow(new TaskMigratedException("migrated", new RuntimeException("cause"))) .when(task01).suspend(); @@ -3348,7 +3319,7 @@ public void shouldCloseActiveTasksAndPropagateStreamsProducerExceptionsOnCleanSh .build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); doThrow(new RuntimeException("whatever")).when(activeTaskCreator).close(); @@ -3373,14 +3344,14 @@ public void shouldCloseActiveTasksAndPropagateStreamsProducerExceptionsOnCleanSh @SuppressWarnings("unchecked") @Test public void shouldCloseTasksIfStateUpdaterTimesOutOnRemove() throws Exception { - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false); - final Map> assignment = mkMap( - mkEntry(taskId00, taskId00Partitions) - ); - final Task task00 = spy(new StateMachineTask(taskId00, taskId00Partitions, true, stateManager)); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); - when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00)); - taskManager.handleAssignment(assignment, emptyMap()); + final TasksRegistry tasks = mock(TasksRegistry.class); + + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false); when(stateUpdater.tasks()).thenReturn(singleton(task00)); final CompletableFuture future = mock(CompletableFuture.class); @@ -3406,7 +3377,7 @@ public void shouldPropagateSuspendExceptionWhenRevokingStandbyTask() { doThrow(new RuntimeException("task 0_1 suspend boom!")).when(task01).suspend(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(task00, task01)); @@ -3450,7 +3421,7 @@ public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() { .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02)); @@ -3481,7 +3452,7 @@ public void shouldCloseActiveTasksAndIgnoreExceptionsOnUncleanShutdown() { .build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); doThrow(new TaskMigratedException("migrated", new RuntimeException("cause"))) .when(task01).suspend(); @@ -3527,7 +3498,7 @@ public void shouldCloseStandbyTasksOnShutdown() { final CompletableFuture futureForStandbyTask = new CompletableFuture<>(); when(stateUpdater.remove(taskId00)).thenReturn(futureForStandbyTask); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); futureForStandbyTask.complete(new StateUpdater.RemovedTaskResult(standbyTask00)); // simulate successful removal @@ -3560,7 +3531,7 @@ public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() { new ExceptionAndTask(new RuntimeException(), failedStandbyTask)) ) .thenReturn(Collections.emptyList()); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.shutdown(true); @@ -3574,7 +3545,7 @@ public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() { @Test public void shouldShutdownSchedulingTaskManager() { final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); taskManager.shutdown(true); @@ -3624,7 +3595,7 @@ public void shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() { new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStatefulTaskDuringRemoval), new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStandbyTaskDuringRemoval)) ).thenReturn(Collections.emptyList()); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); futureForRemovedStatefulTask.complete(new StateUpdater.RemovedTaskResult(removedStatefulTask)); futureForRemovedStandbyTask.complete(new StateUpdater.RemovedTaskResult(removedStandbyTask)); futureForRemovedFailedStatefulTask @@ -3664,7 +3635,7 @@ public void shouldInitializeNewStandbyTasks() { final Map> assignment = taskId01Assignment; final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task01)); @@ -3714,7 +3685,7 @@ public void shouldCommitActiveAndStandbyTasks() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00, task01)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertThat(taskManager.commitAll(), equalTo(2)); @@ -3774,7 +3745,7 @@ public void shouldCommitProvidedTasksIfNeeded() { final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertThat(taskManager.commit(Set.of(task00, task02, task03, task05)), equalTo(2)); @@ -3810,7 +3781,7 @@ public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertThat(taskManager.commitAll(), equalTo(1)); @@ -3838,7 +3809,7 @@ public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00, task01)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleRebalanceStart(emptySet()); @@ -3867,7 +3838,7 @@ public void shouldCommitViaConsumerIfEosDisabled() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task01)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertThat(taskManager.commitAll(), equalTo(1)); @@ -3912,7 +3883,7 @@ public void shouldCommitViaProducerIfEosV2Enabled() { final ConsumerGroupMetadata groupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(groupMetadata); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks); taskManager.commitAll(); @@ -3939,7 +3910,7 @@ public void shouldPropagateExceptionFromActiveCommit() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final RuntimeException thrown = assertThrows(RuntimeException.class, taskManager::commitAll); @@ -3962,7 +3933,7 @@ public void shouldPropagateExceptionFromStandbyCommit() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task01)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final RuntimeException thrown = assertThrows(RuntimeException.class, () -> taskManager.commitAll()); @@ -3994,7 +3965,7 @@ public void shouldSendPurgeData() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.maybePurgeCommittedRecords(); // no-op taskManager.maybePurgeCommittedRecords(); // sends purge for offset 5L @@ -4024,7 +3995,7 @@ public void shouldNotSendPurgeDataIfPreviousNotDone() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.maybePurgeCommittedRecords(); taskManager.maybePurgeCommittedRecords(); @@ -4052,7 +4023,7 @@ public void shouldIgnorePurgeDataErrors() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.maybePurgeCommittedRecords(); taskManager.maybePurgeCommittedRecords(); @@ -4109,7 +4080,7 @@ public void shouldMaybeCommitAllActiveTasksThatNeedCommit() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, task03)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); // maybeCommitActiveTasksPerUserRequested checks if any task has both commitRequested AND commitNeeded // If found, commits all active running tasks that have commitNeeded @@ -4169,7 +4140,7 @@ public void shouldProcessActiveTasks() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.activeTasks()).thenReturn(Set.of(task00, task01)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); // check that we should be processing at most max num records assertThat(taskManager.process(3, time), is(6)); @@ -4214,7 +4185,7 @@ public void shouldNotFailOnTimeoutException() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); // should only process 2 records, because task01 throws TimeoutException assertThat(taskManager.process(1, time), is(2)); @@ -4241,7 +4212,7 @@ public void shouldPropagateTaskMigratedExceptionsInProcessActiveTasks() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.activeTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertThrows(TaskMigratedException.class, () -> taskManager.process(1, time)); } @@ -4258,7 +4229,7 @@ public void shouldWrapRuntimeExceptionsInProcessActiveTasksAndSetTaskId() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.activeTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final StreamsException exception = assertThrows(StreamsException.class, () -> taskManager.process(1, time)); assertThat(exception.taskId().isPresent(), is(true)); @@ -4279,7 +4250,7 @@ public void shouldPropagateTaskMigratedExceptionsInPunctuateActiveTasks() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.activeTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertThrows(TaskMigratedException.class, taskManager::punctuate); } @@ -4296,7 +4267,7 @@ public void shouldPropagateKafkaExceptionsInPunctuateActiveTasks() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.activeTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); assertThrows(KafkaException.class, taskManager::punctuate); } @@ -4314,7 +4285,7 @@ public void shouldPunctuateActiveTasks() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.activeTasks()).thenReturn(Set.of(task00)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); // one for stream and one for system time assertThat(taskManager.punctuate(), equalTo(2)); @@ -4326,7 +4297,7 @@ public void shouldPunctuateActiveTasks() { @Test public void shouldReturnFalseWhenThereAreStillNonRunningTasks() { final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); // mock that the state updater is still restoring active tasks when(stateUpdater.restoresActiveTasks()).thenReturn(true); @@ -4347,7 +4318,7 @@ public void shouldHaveRemainingPartitionsUncleared() { when(task00.prepareCommit(false)).thenReturn(offsets); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allTasks()).thenReturn(Set.of(task00)); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class)) { @@ -4385,7 +4356,7 @@ public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() { .when(migratedTask02).suspend(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01, migratedTask02)); @@ -4431,7 +4402,7 @@ public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTask .when(migratedTask02).suspend(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01, migratedTask02)); @@ -4476,7 +4447,7 @@ public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() { .when(migratedTask02).suspend(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01, migratedTask02)); @@ -4522,53 +4493,6 @@ public void shouldTransmitProducerMetrics() { assertThat(taskManager.producerMetrics(), is(dummyProducerMetrics)); } - private Map handleAssignment(final Map> runningActiveAssignment, - final Map> standbyAssignment, - final Map> restoringActiveAssignment) { - final Set runningTasks = runningActiveAssignment.entrySet().stream() - .map(t -> new StateMachineTask(t.getKey(), t.getValue(), true, stateManager)) - .collect(Collectors.toSet()); - final Set standbyTasks = standbyAssignment.entrySet().stream() - .map(t -> new StateMachineTask(t.getKey(), t.getValue(), false, stateManager)) - .collect(Collectors.toSet()); - final Set restoringTasks = restoringActiveAssignment.entrySet().stream() - .map(t -> new StateMachineTask(t.getKey(), t.getValue(), true, stateManager)) - .collect(Collectors.toSet()); - // give the restoring tasks some uncompleted changelog partitions so they'll stay in restoring - restoringTasks.forEach(t -> ((StateMachineTask) t).setChangelogOffsets(singletonMap(new TopicPartition("changelog", 0), 0L))); - - // Initially assign only the active tasks we want to complete restoration - final Map> allActiveTasksAssignment = new HashMap<>(runningActiveAssignment); - allActiveTasksAssignment.putAll(restoringActiveAssignment); - final Set allActiveTasks = new HashSet<>(runningTasks); - allActiveTasks.addAll(restoringTasks); - - when(standbyTaskCreator.createTasks(standbyAssignment)).thenReturn(standbyTasks); - when(activeTaskCreator.createTasks(any(), eq(allActiveTasksAssignment))).thenReturn(allActiveTasks); - - lenient().when(consumer.assignment()).thenReturn(assignment); - - taskManager.handleAssignment(allActiveTasksAssignment, standbyAssignment); - taskManager.tryToCompleteRestoration(time.milliseconds(), null); - - final Map allTasks = new HashMap<>(); - - // Just make sure all tasks ended up in the expected state - for (final Task task : runningTasks) { - assertThat(task.state(), is(Task.State.RUNNING)); - allTasks.put(task.id(), (StateMachineTask) task); - } - for (final Task task : restoringTasks) { - assertThat(task.state(), is(Task.State.RESTORING)); - allTasks.put(task.id(), (StateMachineTask) task); - } - for (final Task task : standbyTasks) { - assertThat(task.state(), is(Task.State.RUNNING)); - allTasks.put(task.id(), (StateMachineTask) task); - } - return allTasks; - } - private void expectLockObtainedFor(final TaskId... tasks) { for (final TaskId task : tasks) { when(stateDirectory.lock(task)).thenReturn(true); @@ -4602,7 +4526,7 @@ public void shouldThrowTaskMigratedExceptionOnCommitFailed() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task01)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); doThrow(new CommitFailedException()).when(consumer).commitSync(offsets); @@ -4640,7 +4564,7 @@ public void shouldNotFailForTimeoutExceptionOnConsumerCommit() { when(task01.commitNeeded()).thenReturn(false); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); doThrow(new TimeoutException("KABOOM!")).doNothing().when(consumer).commitSync(any(Map.class)); @@ -4688,7 +4612,7 @@ public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks); final TaskCorruptedException exception = assertThrows( TaskCorruptedException.class, @@ -4717,7 +4641,7 @@ public void shouldStreamsExceptionOnCommitError() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task01)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); doThrow(new KafkaException()).when(consumer).commitSync(offsets); @@ -4748,7 +4672,7 @@ public void shouldFailOnCommitFatal() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task01)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); doThrow(new RuntimeException("KABOOM")).when(consumer).commitSync(offsets); @@ -4780,7 +4704,7 @@ public void shouldSuspendAllTasksButSkipCommitIfSuspendingFailsDuringRevocation( when(tasks.allTasks()).thenReturn(Set.of(task00, task01)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); final RuntimeException thrown = assertThrows( RuntimeException.class, @@ -4804,7 +4728,7 @@ public void shouldConvertActiveTaskToStandbyTask() { .inState(State.CREATED) .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(activeTaskCreator.createTasks(consumer, taskId00Assignment)).thenReturn(singletonList(activeTaskToRecycle)); when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId00Partitions)) @@ -4836,7 +4760,7 @@ public void shouldConvertStandbyTaskToActiveTask() { .inState(State.CREATED) .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTaskToRecycle)); when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, taskId00Partitions, consumer)) @@ -4874,7 +4798,7 @@ public void shouldListNotPausedTasks() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasks()).thenReturn(Set.of(task00, task01)); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Collections.emptySet()); @@ -4890,7 +4814,7 @@ public void shouldListNotPausedTasks() { @Test public void shouldRecycleStartupTasksFromStateDirectoryAsActive() { final Tasks taskRegistry = new Tasks(new LogContext()); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry); final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions).build(); @@ -4928,7 +4852,7 @@ public void shouldRecycleStartupTasksFromStateDirectoryAsActive() { @Test public void shouldUseStartupTasksFromStateDirectoryAsStandby() { final Tasks taskRegistry = new Tasks(new LogContext()); - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry); final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false); @@ -4960,7 +4884,7 @@ public void shouldUseStartupTasksFromStateDirectoryAsStandby() { @Test public void shouldStartStateUpdaterOnInit() { - final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, null); taskManager.init(); verify(stateUpdater).start(); } @@ -4996,249 +4920,4 @@ private void writeCheckpointFile(final TaskId task, final Map getConsumerRecord(final TopicPartition topicPartition, final long offset) { - return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), offset, null, null); - } - - private static class StateMachineTask extends AbstractTask implements Task { - private final boolean active; - - // TODO: KAFKA-12569 clean up usage of these flags and use the new commitCompleted flag where appropriate - private boolean commitNeeded = false; - private boolean commitRequested = false; - private boolean commitPrepared = false; - private boolean commitCompleted = false; - private Map committableOffsets = Collections.emptyMap(); - private Map purgeableOffsets; - private Map changelogOffsets = Collections.emptyMap(); - private Set partitionsForOffsetReset = Collections.emptySet(); - private Long timeout = null; - - private final Map>> queue = new HashMap<>(); - - StateMachineTask(final TaskId id, - final Set partitions, - final boolean active, - final ProcessorStateManager processorStateManager) { - super(id, null, null, processorStateManager, partitions, (new TopologyConfig(new DummyStreamsConfig())).getTaskConfig(), "test-task", StateMachineTask.class); - this.active = active; - } - - @Override - public void initializeIfNeeded() { - if (state() == State.CREATED) { - transitionTo(State.RESTORING); - if (!active) { - transitionTo(State.RUNNING); - } - } - } - - @Override - public void addPartitionsForOffsetReset(final Set partitionsForOffsetReset) { - this.partitionsForOffsetReset = partitionsForOffsetReset; - } - - @Override - public void completeRestoration(final java.util.function.Consumer> offsetResetter) { - if (state() == State.RUNNING) { - return; - } - transitionTo(State.RUNNING); - } - - public void setCommitNeeded() { - commitNeeded = true; - } - - @Override - public boolean commitNeeded() { - return commitNeeded; - } - - public void setCommitRequested() { - commitRequested = true; - } - - @Override - public boolean commitRequested() { - return commitRequested; - } - - @Override - public Map prepareCommit(final boolean clean) { - commitPrepared = true; - - if (commitNeeded) { - if (!clean) { - return null; - } - return committableOffsets; - } else { - return Collections.emptyMap(); - } - } - - @Override - public void postCommit(final boolean enforceCheckpoint) { - commitNeeded = false; - commitCompleted = true; - } - - @Override - public void suspend() { - if (state() == State.CLOSED) { - throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id); - } else if (state() == State.SUSPENDED) { - // do nothing - } else { - transitionTo(State.SUSPENDED); - } - } - - @Override - public void resume() { - if (state() == State.SUSPENDED) { - transitionTo(State.RUNNING); - } - } - - @Override - public void revive() { - //TODO: KAFKA-12569 move clearing of commit-required statuses to closeDirty/Clean/AndRecycle methods - commitNeeded = false; - commitRequested = false; - super.revive(); - } - - @Override - public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs, - final Exception cause) { - timeout = currentWallClockMs; - } - - @Override - public void clearTaskTimeout() { - timeout = null; - } - - @Override - public void recordRestoration(final Time time, final long numRecords, final boolean initRemaining) { - // do nothing - } - - @Override - public void closeClean() { - transitionTo(State.CLOSED); - } - - @Override - public void closeDirty() { - transitionTo(State.CLOSED); - } - - @Override - public void prepareRecycle() { - transitionTo(State.CLOSED); - } - - @Override - public void resumePollingForPartitionsWithAvailableSpace() { - // noop - } - - @Override - public void updateLags() { - // noop - } - - @Override - public void updateInputPartitions(final Set topicPartitions, final Map> allTopologyNodesToSourceTopics) { - inputPartitions = topicPartitions; - } - - void setCommittableOffsetsAndMetadata(final Map committableOffsets) { - if (!active) { - throw new IllegalStateException("Cannot set CommittableOffsetsAndMetadate for StandbyTasks"); - } - this.committableOffsets = committableOffsets; - } - - @Override - public StateStore store(final String name) { - return null; - } - - @Override - public Set changelogPartitions() { - return changelogOffsets.keySet(); - } - - public boolean isActive() { - return active; - } - - void setPurgeableOffsets(final Map purgeableOffsets) { - this.purgeableOffsets = purgeableOffsets; - } - - @Override - public Map purgeableOffsets() { - return purgeableOffsets; - } - - void setChangelogOffsets(final Map changelogOffsets) { - this.changelogOffsets = changelogOffsets; - } - - @Override - public Map changelogOffsets() { - return changelogOffsets; - } - - @Override - public Map committedOffsets() { - return Collections.emptyMap(); - } - - @Override - public Map highWaterMark() { - return Collections.emptyMap(); - } - - @Override - public Optional timeCurrentIdlingStarted() { - return Optional.empty(); - } - - @Override - public void addRecords(final TopicPartition partition, final Iterable> records) { - if (isActive()) { - final Deque> partitionQueue = - queue.computeIfAbsent(partition, k -> new LinkedList<>()); - - for (final ConsumerRecord record : records) { - partitionQueue.add(record); - } - } else { - throw new IllegalStateException("Can't add records to an inactive task."); - } - } - - @Override - public boolean process(final long wallClockTime) { - if (isActive() && state() == State.RUNNING) { - for (final LinkedList> records : queue.values()) { - final ConsumerRecord record = records.poll(); - if (record != null) { - return true; - } - } - return false; - } else { - throw new IllegalStateException("Can't process an inactive or non-running task."); - } - } - } }