Skip to content

Commit

Permalink
Update Stream Ack Manager unit test and code refactor (#4383)
Browse files Browse the repository at this point in the history
* Update Stream Ack Manager unit test

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

* Move CheckpointStatus to stream sub package and make it package protected

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

---------

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
  • Loading branch information
dinujoh authored Apr 2, 2024
1 parent b7c63bc commit 8cb5a84
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void run() {
int lastRecordNumberProcessed = 0;
final List<String> records = new ArrayList<>();
try (MongoCursor<Document> cursor = col.find(query).iterator()) {
while (cursor.hasNext()) {
while (cursor.hasNext() && !Thread.currentThread().isInterrupted()) {
if (shouldStop) {
partitionCheckpoint.checkpoint(lastRecordNumberProcessed);
LOG.warn("Loading data query {} was interrupted by a shutdown signal, giving up ownership of " +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package org.opensearch.dataprepper.plugins.mongo.model;
package org.opensearch.dataprepper.plugins.mongo.stream;

public class CheckpointStatus {
class CheckpointStatus {
private final String resumeToken;
private final long recordCount;
private boolean acknowledged;
private final long createTimestamp;
private Long acknowledgedTimestamp;

public CheckpointStatus(final String resumeToken, final long recordCount, final long createTimestamp) {
CheckpointStatus(final String resumeToken, final long recordCount, final long createTimestamp) {
this.resumeToken = resumeToken;
this.recordCount = recordCount;
this.acknowledged = false;
Expand Down Expand Up @@ -40,6 +40,4 @@ public long getCreateTimestamp() {
public long getAcknowledgedTimestamp() {
return acknowledgedTimestamp;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.mongo.model.CheckpointStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.mongo.model.CheckpointStatus;

import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -83,7 +82,7 @@ public void createAcknowledgementSet_enabled_ackSetWithAck() {

@Test
public void createAcknowledgementSet_enabled_multipleAckSetWithAck() {
when(timeout.getSeconds()).thenReturn(10_000L);
lenient().when(timeout.getSeconds()).thenReturn(10_000L);
streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0);
streamAckManager.init(stopWorkerConsumer);
final String resumeToken1 = UUID.randomUUID().toString();
Expand Down Expand Up @@ -174,5 +173,5 @@ public void createAcknowledgementSet_enabled_ackSetWithNoAck() {
await()
.atMost(Duration.ofSeconds(10)).untilAsserted(() ->
verify(stopWorkerConsumer).accept(null));
}
}
}

0 comments on commit 8cb5a84

Please sign in to comment.