diff --git a/parallel-consumer-mutiny/pom.xml b/parallel-consumer-mutiny/pom.xml new file mode 100644 index 000000000..08fc730e0 --- /dev/null +++ b/parallel-consumer-mutiny/pom.xml @@ -0,0 +1,57 @@ + + + + + parallel-consumer-parent + io.confluent.parallelconsumer + 0.5.3.4-SNAPSHOT + + 4.0.0 + + Confluent Parallel Consumer SmallRye Mutiny + parallel-consumer-mutiny + + + + io.confluent.parallelconsumer + parallel-consumer-core + ${project.version} + + + io.confluent.parallelconsumer + parallel-consumer-core + ${project.version} + tests + test + + + io.smallrye.reactive + mutiny + 2.9.4 + + + com.google.guava + guava + + + me.tongfei + progressbar + test + + + org.junit.jupiter + junit-jupiter-params + test + + + org.junit-pioneer + junit-pioneer + test + + + + diff --git a/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java b/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java new file mode 100644 index 000000000..776ba7b7e --- /dev/null +++ b/parallel-consumer-mutiny/src/main/java/io/confluent/parallelconsumer/mutiny/MutinyProcessor.java @@ -0,0 +1,151 @@ +package io.confluent.parallelconsumer.mutiny; + +/*- + * Copyright (C) 2020-2025 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.PCRetriableException; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContext; +import io.confluent.parallelconsumer.PollContextInternal; +import io.confluent.parallelconsumer.internal.ExternalEngine; +import io.confluent.parallelconsumer.state.WorkContainer; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.subscription.Cancellable; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import pl.tlinkowski.unij.api.UniLists; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun; + +/** + * Adapter for using Mutiny as the asynchronous execution engine. + */ +@Slf4j +public class MutinyProcessor extends ExternalEngine { + + /** + * @see WorkContainer#getWorkType() + */ + private static final String MUTINY_TYPE = "mutiny.x-type"; + + private final Executor executor; + + public MutinyProcessor(ParallelConsumerOptions options, Supplier newExecutorSupplier) { + super(options); + this.executor = (newExecutorSupplier == null) ? Infrastructure.getDefaultWorkerPool() : newExecutorSupplier.get(); + } + + public MutinyProcessor(ParallelConsumerOptions options) { + this(options, null); + } + + @Override + protected boolean isAsyncFutureWork(List resultsFromUserFunction) { + for (Object object : resultsFromUserFunction) { + return (object instanceof io.smallrye.mutiny.subscription.Cancellable); + } + return false; + } + + @Override + public void close(Duration timeout, DrainingMode drainMode) { + super.close(timeout, drainMode); + } + + /** + * Register a function to be applied to polled messages. + *

+ * Make sure that you do any work immediately - do not block this thread. + *

+ * + * @param mutinyFunction user function that takes a PollContext and returns a Uni + * @see #onRecord(Function) + * @see ParallelConsumerOptions + * @see ParallelConsumerOptions#batchSize + * @see io.confluent.parallelconsumer.ParallelStreamProcessor#poll + */ + + /** + * Register a function to be applied to polled messages. + * This must return a Uni to signal async completion. + * + * @param mutinyFunction user function that takes a PollContext and returns a Uni + */ + public void onRecord(Function, Uni> mutinyFunction) { + + Function, List> wrappedUserFunc = pollContext -> { + + if (log.isTraceEnabled()) { + log.trace("Record list ({}), executing void function...", + pollContext.streamConsumerRecords() + .map(ConsumerRecord::offset) + .collect(Collectors.toList()) + ); + } + + pollContext.streamWorkContainers() + .forEach(x -> x.setWorkType(MUTINY_TYPE)); + + Cancellable uni = Uni.createFrom().deferred(() -> + carefullyRun(mutinyFunction, pollContext.getPollContext()) + ) + .onItem() + .transformToMulti(result -> { + if(result == null) { + return Multi.createFrom().empty(); + } + else if (result instanceof Multi multi) { + return multi; // unwrap Multi + } else { + return Multi.createFrom().item(result); // wrap single item as Multi + } + }) + .onItem() + .invoke(signal -> log.trace("onItem {}", signal)) + .runSubscriptionOn(this.executor) + .subscribe().with( + ignored -> {}, + throwable -> onError(pollContext, throwable), + () -> onComplete(pollContext) + ); + + log.trace("asyncPoll - user function finished ok."); + return UniLists.of(uni); + }; + + // + Consumer voidCallBack = ignored -> log.trace("Void callback applied."); + supervisorLoop(wrappedUserFunc, voidCallBack); + } + + private void onComplete(PollContextInternal pollContext) { + log.debug("Mutiny success"); + pollContext.streamWorkContainers().forEach(wc -> { + wc.onUserFunctionSuccess(); + addToMailbox(pollContext, wc); + }); + } + + private void onError(PollContextInternal pollContext, Throwable throwable) { + if (throwable instanceof PCRetriableException) { + log.debug("Mutiny fail signal", throwable); + } else { + log.error("Mutiny fail signal", throwable); + } + pollContext.streamWorkContainers().forEach(wc -> { + wc.onUserFunctionFailure(throwable); + addToMailbox(pollContext, wc); + }); + } +} diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java new file mode 100644 index 000000000..67a6b0e8d --- /dev/null +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyBatchTest.java @@ -0,0 +1,103 @@ +package io.confluent.parallelconsumer.mutiny; + +/*- + * Copyright (C) 2020-2025 Confluent, Inc. + */ + +import io.confluent.csid.utils.KafkaTestUtils; +import io.confluent.parallelconsumer.BatchTestBase; +import io.confluent.parallelconsumer.BatchTestMethods; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContext; +import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; +import io.confluent.parallelconsumer.internal.RateLimiter; +import io.smallrye.mutiny.Uni; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.confluent.csid.utils.StringUtils.msg; + +@Slf4j +public class MutinyBatchTest extends MutinyUnitTestBase implements BatchTestBase { + + BatchTestMethods> batchTestMethods; + + @BeforeEach + void setup() { + batchTestMethods = new BatchTestMethods<>(this) { + + @Override + protected KafkaTestUtils getKtu() { + return ktu; + } + + @SneakyThrows + @Override + protected Uni averageBatchSizeTestPollStep(PollContext recordList) { + return Uni.createFrom() + .item(msg("Saw batch or records: {}", recordList.getOffsetsFlattened())) + .onItem().delayIt().by(Duration.ofMillis(30)); + } + + @Override + protected void averageBatchSizeTestPoll(AtomicInteger numBatches, AtomicInteger numRecords, RateLimiter statusLogger) { + mutinyPC.onRecord(recordList -> + averageBatchSizeTestPollInner(numBatches, numRecords, statusLogger, recordList) + ); + } + + @Override + protected AbstractParallelEoSStreamProcessor getPC() { + return mutinyPC; + } + + @Override + public void simpleBatchTestPoll(List> batchesReceived) { + mutinyPC.onRecord(recordList -> { + String msg = msg("Saw batch or records: {}", recordList.getOffsetsFlattened()); + log.debug(msg); + batchesReceived.add(recordList); + return Uni.createFrom().item(msg); + }); + } + + @Override + protected void batchFailPoll(List> batchesReceived) { + mutinyPC.onRecord(recordList -> { + batchFailPollInner(recordList); + batchesReceived.add(recordList); + return Uni.createFrom().item(msg("Saw batch or records: {}", recordList.getOffsetsFlattened())); + }); + } + }; + } + + @Test + public void averageBatchSizeTest() { + batchTestMethods.averageBatchSizeTest(10000); + } + + @ParameterizedTest + @EnumSource + @Override + public void simpleBatchTest(ParallelConsumerOptions.ProcessingOrder order) { + batchTestMethods.simpleBatchTest(order); + } + + @ParameterizedTest + @EnumSource + @Override + public void batchFailureTest(ParallelConsumerOptions.ProcessingOrder order) { + batchTestMethods.batchFailureTest(order); + } + +} + diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java new file mode 100644 index 000000000..8c1723022 --- /dev/null +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyPCTest.java @@ -0,0 +1,143 @@ +package io.confluent.parallelconsumer.mutiny; + +/*- + * Copyright (C) 2020-2025 Confluent, Inc. + */ + +import io.confluent.csid.utils.LatchTestUtils; +import io.confluent.csid.utils.ProgressBarUtils; +import io.smallrye.mutiny.Uni; +import lombok.extern.slf4j.Slf4j; +import me.tongfei.progressbar.ProgressBar; +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.truth.Truth.assertWithMessage; +import static io.confluent.parallelconsumer.truth.LongPollingMockConsumerSubject.assertThat; +import static org.awaitility.Awaitility.await; + +@Slf4j +class MutinyPCTest extends MutinyUnitTestBase { + + /** + * The percent of the max concurrency tolerance allowed + */ + public static final Percentage MAX_CONCURRENCY_OVERFLOW_ALLOWANCE = Percentage.withPercentage(1.2); + + @BeforeEach + public void setupData() { + super.primeFirstRecord(); + } + + @Test + void kickTires() { + primeFirstRecord(); + primeFirstRecord(); + primeFirstRecord(); + + ConcurrentLinkedQueue msgs = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue threads = new ConcurrentLinkedQueue<>(); + + mutinyPC.onRecord(ctx -> { + log.info("Mutiny user function: {}", ctx); + msgs.add(ctx); + threads.add(Thread.currentThread().getName()); + // return a Uni for async processing + return Uni.createFrom().item(String.format("result: %d:%s", ctx.getSingleConsumerRecord().offset(), ctx.getSingleConsumerRecord().value())); + }); + + await() + .atMost(defaultTimeout) + .untilAsserted(() -> { + assertWithMessage("Processed records collection so far") + .that(msgs.size()) + .isEqualTo(4); + + assertThat(consumerSpy) + .hasCommittedToPartition(topicPartition) + .atLeastOffset(4); + + assertWithMessage("The user-defined function should be executed by the scheduler") + .that(threads.stream().allMatch(thread -> thread.startsWith("pool"))) + .isTrue(); + }); + } + + @Test + void concurrencyTest() throws InterruptedException { + int quantity = 100_000; + var consumerRecords = ktu.generateRecords(quantity - 1); // -1 because 1 is already primed + ktu.send(consumerSpy, consumerRecords); + log.info("Finished priming records"); + + ProgressBar bar = ProgressBarUtils.getNewMessagesBar(log, quantity); + + ConcurrentLinkedQueue msgs = new ConcurrentLinkedQueue<>(); + AtomicInteger finishedCount = new AtomicInteger(0); + AtomicInteger maxConcurrentRecordsSeen = new AtomicInteger(0); + CountDownLatch completeOrProblem = new CountDownLatch(1); + int maxConcurrency = MAX_CONCURRENCY; + + mutinyPC.onRecord(ctx -> { + var record = ctx.getSingleConsumerRecord(); + return Uni.createFrom().item(String.format("result: %d:%s", record.offset(), record.value())) + .onItem().invoke(ignore -> { + // add that our uni processing has started + log.trace("Mutiny user function executing: {}", ctx); + msgs.add(ctx); + if (msgs.size() > maxConcurrency) { + log.error("More records submitted for processing than max concurrency settings ({} vs {})", msgs.size(), maxConcurrency); + completeOrProblem.countDown(); + } + }) + // simulate async delay + .onItem().delayIt().by(Duration.ofMillis(Math.max(1, (int) (100 * Math.random())))) + .onItem().invoke(s -> { + log.trace("User function after delay. Records pending: {}, removing from processing: {}", msgs.size(), ctx); + int currentConcurrentRecords = msgs.size(); + int highestSoFar = Math.max(currentConcurrentRecords, maxConcurrentRecordsSeen.get()); + maxConcurrentRecordsSeen.set(highestSoFar); + + boolean removed = msgs.remove(ctx); + assertWithMessage("record was present and removed") + .that(removed).isTrue(); + + int numberOfFinishedRecords = finishedCount.incrementAndGet(); + if (numberOfFinishedRecords > quantity - 1) { + completeOrProblem.countDown(); + } + + bar.step(); + }); + }); + + // block until all messages processed + LatchTestUtils.awaitLatch(completeOrProblem, defaultTimeoutSeconds); + + int maxConcurrencyAllowedThreshold = (int) (maxConcurrency * MAX_CONCURRENCY_OVERFLOW_ALLOWANCE.value); + assertWithMessage("Max concurrency should never be exceeded") + .that(maxConcurrentRecordsSeen.get()).isLessThan(maxConcurrencyAllowedThreshold); + + await() + .atMost(defaultTimeout) + .failFast("Max concurrency exceeded", () -> msgs.size() > maxConcurrencyAllowedThreshold) + .untilAsserted(() -> { + assertWithMessage("Number of completed messages") + .that(finishedCount.get()).isEqualTo(quantity); + + assertThat(consumerSpy) + .hasCommittedToPartition(topicPartition) + .offset(quantity); + }); + + bar.close(); + log.info("Max concurrency was {}", maxConcurrentRecordsSeen.get()); + } +} + diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyTest.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyTest.java new file mode 100644 index 000000000..142021070 --- /dev/null +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyTest.java @@ -0,0 +1,38 @@ +package io.confluent.parallelconsumer.mutiny; + +/*- + * Copyright (C) 2020-2025 Confluent, Inc. + */ + +import io.smallrye.mutiny.Multi; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +class MutinyTest { + + @Test + void emitOnExample() { + ExecutorService executor = Executors.newFixedThreadPool(4); + + Multi multi = Multi.createFrom().range(1, 3) // 1..2 inclusive + .map(i -> 10 + i) + .emitOn(executor) // similar to publishOn + .map(i -> "value " + i); + + multi.subscribe().with(System.out::println, Throwable::printStackTrace); + } + + @Test + void runSubscriptionOnExample() { + ExecutorService executor = Executors.newFixedThreadPool(4); + + Multi multi = Multi.createFrom().range(1, 3) + .map(i -> 10 + i) + .runSubscriptionOn(executor) // similar to subscribeOn + .map(i -> "value " + i); + + multi.subscribe().with(System.out::println, Throwable::printStackTrace); + } +} diff --git a/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java new file mode 100644 index 000000000..ce92b03f7 --- /dev/null +++ b/parallel-consumer-mutiny/src/test/java/io/confluent/parallelconsumer/mutiny/MutinyUnitTestBase.java @@ -0,0 +1,30 @@ +package io.confluent.parallelconsumer.mutiny; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase; +import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; + +import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC; + +public class MutinyUnitTestBase extends ParallelEoSStreamProcessorTestBase { + + protected MutinyProcessor mutinyPC; + + protected static final int MAX_CONCURRENCY = 1000; + + @Override + protected AbstractParallelEoSStreamProcessor initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions) { + var build = parallelConsumerOptions.toBuilder() + .commitMode(PERIODIC_CONSUMER_SYNC) + .maxConcurrency(MAX_CONCURRENCY) + .build(); + + mutinyPC = new MutinyProcessor<>(build); + + return mutinyPC; + } +} \ No newline at end of file diff --git a/parallel-consumer-mutiny/src/test/resources/logback-test.xml b/parallel-consumer-mutiny/src/test/resources/logback-test.xml new file mode 100644 index 000000000..3aec1d52e --- /dev/null +++ b/parallel-consumer-mutiny/src/test/resources/logback-test.xml @@ -0,0 +1,83 @@ + + + + + + + + + + + + + %d{mm:ss.SSS} %yellow(%X{pcId}) %highlight(%-5level) %yellow([%thread]) %X{offset} %cyan(\(%file:%line\)#%M) %msg%n + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 472dbc8b9..e0f804dd9 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,7 @@ parallel-consumer-core parallel-consumer-vertx parallel-consumer-reactor + parallel-consumer-mutiny parallel-consumer-examples