Skip to content

Commit 2702c01

Browse files
committed
final cleanup
1 parent a8ff09a commit 2702c01

File tree

1 file changed

+0
-299
lines changed

1 file changed

+0
-299
lines changed

streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

Lines changed: 0 additions & 299 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.kafka.clients.consumer.CommitFailedException;
2424
import org.apache.kafka.clients.consumer.Consumer;
2525
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
26-
import org.apache.kafka.clients.consumer.ConsumerRecord;
2726
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2827
import org.apache.kafka.common.KafkaException;
2928
import org.apache.kafka.common.KafkaFuture;
@@ -38,14 +37,12 @@
3837
import org.apache.kafka.common.utils.LogContext;
3938
import org.apache.kafka.common.utils.MockTime;
4039
import org.apache.kafka.common.utils.Time;
41-
import org.apache.kafka.streams.TopologyConfig;
4240
import org.apache.kafka.streams.errors.LockException;
4341
import org.apache.kafka.streams.errors.StreamsException;
4442
import org.apache.kafka.streams.errors.TaskCorruptedException;
4543
import org.apache.kafka.streams.errors.TaskMigratedException;
4644
import org.apache.kafka.streams.internals.StreamsConfigUtils;
4745
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
48-
import org.apache.kafka.streams.processor.StateStore;
4946
import org.apache.kafka.streams.processor.TaskId;
5047
import org.apache.kafka.streams.processor.assignment.ProcessId;
5148
import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
@@ -76,13 +73,10 @@
7673
import java.util.ArrayList;
7774
import java.util.Arrays;
7875
import java.util.Collections;
79-
import java.util.Deque;
8076
import java.util.HashMap;
8177
import java.util.HashSet;
82-
import java.util.LinkedList;
8378
import java.util.List;
8479
import java.util.Map;
85-
import java.util.Optional;
8680
import java.util.Set;
8781
import java.util.concurrent.CompletableFuture;
8882
import java.util.stream.Collectors;
@@ -123,7 +117,6 @@
123117
import static org.mockito.Mockito.lenient;
124118
import static org.mockito.Mockito.mock;
125119
import static org.mockito.Mockito.never;
126-
import static org.mockito.Mockito.spy;
127120
import static org.mockito.Mockito.times;
128121
import static org.mockito.Mockito.verify;
129122
import static org.mockito.Mockito.verifyNoInteractions;
@@ -4500,53 +4493,6 @@ public void shouldTransmitProducerMetrics() {
45004493
assertThat(taskManager.producerMetrics(), is(dummyProducerMetrics));
45014494
}
45024495

4503-
private Map<TaskId, StateMachineTask> handleAssignment(final Map<TaskId, Set<TopicPartition>> runningActiveAssignment,
4504-
final Map<TaskId, Set<TopicPartition>> standbyAssignment,
4505-
final Map<TaskId, Set<TopicPartition>> restoringActiveAssignment) {
4506-
final Set<Task> runningTasks = runningActiveAssignment.entrySet().stream()
4507-
.map(t -> new StateMachineTask(t.getKey(), t.getValue(), true, stateManager))
4508-
.collect(Collectors.toSet());
4509-
final Set<Task> standbyTasks = standbyAssignment.entrySet().stream()
4510-
.map(t -> new StateMachineTask(t.getKey(), t.getValue(), false, stateManager))
4511-
.collect(Collectors.toSet());
4512-
final Set<Task> restoringTasks = restoringActiveAssignment.entrySet().stream()
4513-
.map(t -> new StateMachineTask(t.getKey(), t.getValue(), true, stateManager))
4514-
.collect(Collectors.toSet());
4515-
// give the restoring tasks some uncompleted changelog partitions so they'll stay in restoring
4516-
restoringTasks.forEach(t -> ((StateMachineTask) t).setChangelogOffsets(singletonMap(new TopicPartition("changelog", 0), 0L)));
4517-
4518-
// Initially assign only the active tasks we want to complete restoration
4519-
final Map<TaskId, Set<TopicPartition>> allActiveTasksAssignment = new HashMap<>(runningActiveAssignment);
4520-
allActiveTasksAssignment.putAll(restoringActiveAssignment);
4521-
final Set<Task> allActiveTasks = new HashSet<>(runningTasks);
4522-
allActiveTasks.addAll(restoringTasks);
4523-
4524-
when(standbyTaskCreator.createTasks(standbyAssignment)).thenReturn(standbyTasks);
4525-
when(activeTaskCreator.createTasks(any(), eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
4526-
4527-
lenient().when(consumer.assignment()).thenReturn(assignment);
4528-
4529-
taskManager.handleAssignment(allActiveTasksAssignment, standbyAssignment);
4530-
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
4531-
4532-
final Map<TaskId, StateMachineTask> allTasks = new HashMap<>();
4533-
4534-
// Just make sure all tasks ended up in the expected state
4535-
for (final Task task : runningTasks) {
4536-
assertThat(task.state(), is(Task.State.RUNNING));
4537-
allTasks.put(task.id(), (StateMachineTask) task);
4538-
}
4539-
for (final Task task : restoringTasks) {
4540-
assertThat(task.state(), is(Task.State.RESTORING));
4541-
allTasks.put(task.id(), (StateMachineTask) task);
4542-
}
4543-
for (final Task task : standbyTasks) {
4544-
assertThat(task.state(), is(Task.State.RUNNING));
4545-
allTasks.put(task.id(), (StateMachineTask) task);
4546-
}
4547-
return allTasks;
4548-
}
4549-
45504496
private void expectLockObtainedFor(final TaskId... tasks) {
45514497
for (final TaskId task : tasks) {
45524498
when(stateDirectory.lock(task)).thenReturn(true);
@@ -4974,249 +4920,4 @@ private void writeCheckpointFile(final TaskId task, final Map<TopicPartition, Lo
49744920
private File getCheckpointFile(final TaskId task) {
49754921
return new File(new File(testFolder.toAbsolutePath().toString(), task.toString()), StateManagerUtil.CHECKPOINT_FILE_NAME);
49764922
}
4977-
4978-
private static ConsumerRecord<byte[], byte[]> getConsumerRecord(final TopicPartition topicPartition, final long offset) {
4979-
return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), offset, null, null);
4980-
}
4981-
4982-
private static class StateMachineTask extends AbstractTask implements Task {
4983-
private final boolean active;
4984-
4985-
// TODO: KAFKA-12569 clean up usage of these flags and use the new commitCompleted flag where appropriate
4986-
private boolean commitNeeded = false;
4987-
private boolean commitRequested = false;
4988-
private boolean commitPrepared = false;
4989-
private boolean commitCompleted = false;
4990-
private Map<TopicPartition, OffsetAndMetadata> committableOffsets = Collections.emptyMap();
4991-
private Map<TopicPartition, Long> purgeableOffsets;
4992-
private Map<TopicPartition, Long> changelogOffsets = Collections.emptyMap();
4993-
private Set<TopicPartition> partitionsForOffsetReset = Collections.emptySet();
4994-
private Long timeout = null;
4995-
4996-
private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>> queue = new HashMap<>();
4997-
4998-
StateMachineTask(final TaskId id,
4999-
final Set<TopicPartition> partitions,
5000-
final boolean active,
5001-
final ProcessorStateManager processorStateManager) {
5002-
super(id, null, null, processorStateManager, partitions, (new TopologyConfig(new DummyStreamsConfig())).getTaskConfig(), "test-task", StateMachineTask.class);
5003-
this.active = active;
5004-
}
5005-
5006-
@Override
5007-
public void initializeIfNeeded() {
5008-
if (state() == State.CREATED) {
5009-
transitionTo(State.RESTORING);
5010-
if (!active) {
5011-
transitionTo(State.RUNNING);
5012-
}
5013-
}
5014-
}
5015-
5016-
@Override
5017-
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
5018-
this.partitionsForOffsetReset = partitionsForOffsetReset;
5019-
}
5020-
5021-
@Override
5022-
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
5023-
if (state() == State.RUNNING) {
5024-
return;
5025-
}
5026-
transitionTo(State.RUNNING);
5027-
}
5028-
5029-
public void setCommitNeeded() {
5030-
commitNeeded = true;
5031-
}
5032-
5033-
@Override
5034-
public boolean commitNeeded() {
5035-
return commitNeeded;
5036-
}
5037-
5038-
public void setCommitRequested() {
5039-
commitRequested = true;
5040-
}
5041-
5042-
@Override
5043-
public boolean commitRequested() {
5044-
return commitRequested;
5045-
}
5046-
5047-
@Override
5048-
public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) {
5049-
commitPrepared = true;
5050-
5051-
if (commitNeeded) {
5052-
if (!clean) {
5053-
return null;
5054-
}
5055-
return committableOffsets;
5056-
} else {
5057-
return Collections.emptyMap();
5058-
}
5059-
}
5060-
5061-
@Override
5062-
public void postCommit(final boolean enforceCheckpoint) {
5063-
commitNeeded = false;
5064-
commitCompleted = true;
5065-
}
5066-
5067-
@Override
5068-
public void suspend() {
5069-
if (state() == State.CLOSED) {
5070-
throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
5071-
} else if (state() == State.SUSPENDED) {
5072-
// do nothing
5073-
} else {
5074-
transitionTo(State.SUSPENDED);
5075-
}
5076-
}
5077-
5078-
@Override
5079-
public void resume() {
5080-
if (state() == State.SUSPENDED) {
5081-
transitionTo(State.RUNNING);
5082-
}
5083-
}
5084-
5085-
@Override
5086-
public void revive() {
5087-
//TODO: KAFKA-12569 move clearing of commit-required statuses to closeDirty/Clean/AndRecycle methods
5088-
commitNeeded = false;
5089-
commitRequested = false;
5090-
super.revive();
5091-
}
5092-
5093-
@Override
5094-
public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
5095-
final Exception cause) {
5096-
timeout = currentWallClockMs;
5097-
}
5098-
5099-
@Override
5100-
public void clearTaskTimeout() {
5101-
timeout = null;
5102-
}
5103-
5104-
@Override
5105-
public void recordRestoration(final Time time, final long numRecords, final boolean initRemaining) {
5106-
// do nothing
5107-
}
5108-
5109-
@Override
5110-
public void closeClean() {
5111-
transitionTo(State.CLOSED);
5112-
}
5113-
5114-
@Override
5115-
public void closeDirty() {
5116-
transitionTo(State.CLOSED);
5117-
}
5118-
5119-
@Override
5120-
public void prepareRecycle() {
5121-
transitionTo(State.CLOSED);
5122-
}
5123-
5124-
@Override
5125-
public void resumePollingForPartitionsWithAvailableSpace() {
5126-
// noop
5127-
}
5128-
5129-
@Override
5130-
public void updateLags() {
5131-
// noop
5132-
}
5133-
5134-
@Override
5135-
public void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
5136-
inputPartitions = topicPartitions;
5137-
}
5138-
5139-
void setCommittableOffsetsAndMetadata(final Map<TopicPartition, OffsetAndMetadata> committableOffsets) {
5140-
if (!active) {
5141-
throw new IllegalStateException("Cannot set CommittableOffsetsAndMetadate for StandbyTasks");
5142-
}
5143-
this.committableOffsets = committableOffsets;
5144-
}
5145-
5146-
@Override
5147-
public StateStore store(final String name) {
5148-
return null;
5149-
}
5150-
5151-
@Override
5152-
public Set<TopicPartition> changelogPartitions() {
5153-
return changelogOffsets.keySet();
5154-
}
5155-
5156-
public boolean isActive() {
5157-
return active;
5158-
}
5159-
5160-
void setPurgeableOffsets(final Map<TopicPartition, Long> purgeableOffsets) {
5161-
this.purgeableOffsets = purgeableOffsets;
5162-
}
5163-
5164-
@Override
5165-
public Map<TopicPartition, Long> purgeableOffsets() {
5166-
return purgeableOffsets;
5167-
}
5168-
5169-
void setChangelogOffsets(final Map<TopicPartition, Long> changelogOffsets) {
5170-
this.changelogOffsets = changelogOffsets;
5171-
}
5172-
5173-
@Override
5174-
public Map<TopicPartition, Long> changelogOffsets() {
5175-
return changelogOffsets;
5176-
}
5177-
5178-
@Override
5179-
public Map<TopicPartition, Long> committedOffsets() {
5180-
return Collections.emptyMap();
5181-
}
5182-
5183-
@Override
5184-
public Map<TopicPartition, Long> highWaterMark() {
5185-
return Collections.emptyMap();
5186-
}
5187-
5188-
@Override
5189-
public Optional<Long> timeCurrentIdlingStarted() {
5190-
return Optional.empty();
5191-
}
5192-
5193-
@Override
5194-
public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
5195-
if (isActive()) {
5196-
final Deque<ConsumerRecord<byte[], byte[]>> partitionQueue =
5197-
queue.computeIfAbsent(partition, k -> new LinkedList<>());
5198-
5199-
for (final ConsumerRecord<byte[], byte[]> record : records) {
5200-
partitionQueue.add(record);
5201-
}
5202-
} else {
5203-
throw new IllegalStateException("Can't add records to an inactive task.");
5204-
}
5205-
}
5206-
5207-
@Override
5208-
public boolean process(final long wallClockTime) {
5209-
if (isActive() && state() == State.RUNNING) {
5210-
for (final LinkedList<ConsumerRecord<byte[], byte[]>> records : queue.values()) {
5211-
final ConsumerRecord<byte[], byte[]> record = records.poll();
5212-
if (record != null) {
5213-
return true;
5214-
}
5215-
}
5216-
return false;
5217-
} else {
5218-
throw new IllegalStateException("Can't process an inactive or non-running task.");
5219-
}
5220-
}
5221-
}
52224923
}

0 commit comments

Comments
 (0)