Skip to content

Commit

Permalink
Refactor SenderProcessor and allow it's reuse in connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 7, 2023
1 parent 430ebcf commit b1ba9f1
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 125 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.smallrye.reactive.messaging.kafka.tracing.KafkaOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.kafka.tracing.KafkaTrace;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.helpers.SenderProcessor;

@SuppressWarnings("jol")
public class KafkaSink {
Expand All @@ -62,7 +63,7 @@ public class KafkaSink {
private final int deliveryTimeoutMs;

private final List<Throwable> failures = new ArrayList<>();
private final KafkaSenderProcessor processor;
private final SenderProcessor processor;
private final boolean writeAsBinaryCloudEvent;
private final boolean writeCloudEvents;
private final boolean mandatoryCloudEventAttributeSet;
Expand Down Expand Up @@ -124,7 +125,7 @@ public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafk
if (requests <= 0) {
requests = Long.MAX_VALUE;
}
this.processor = new KafkaSenderProcessor(requests, waitForWriteCompletion,
this.processor = new SenderProcessor(requests, waitForWriteCompletion,
writeMessageToKafka());
this.subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(f -> {
log.unableToDispatch(f);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.smallrye.reactive.messaging.pulsar;
package io.smallrye.reactive.messaging.providers.helpers;

import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarExceptions.ex;
import static io.smallrye.reactive.messaging.providers.i18n.ProviderExceptions.ex;

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
Expand All @@ -13,15 +13,15 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;

class PulsarSenderProcessor implements Processor<Message<?>, Message<?>>, Subscription {
public class SenderProcessor implements Processor<Message<?>, Message<?>>, Subscription {

private final long inflights;
private final boolean waitForCompletion;
private final Function<Message<?>, Uni<Void>> send;
private final AtomicReference<Subscription> subscription = new AtomicReference<>();
private final AtomicReference<Subscriber<? super Message<?>>> downstream = new AtomicReference<>();

public PulsarSenderProcessor(long inflights, boolean waitForCompletion, Function<Message<?>, Uni<Void>> send) {
public SenderProcessor(long inflights, boolean waitForCompletion, Function<Message<?>, Uni<Void>> send) {
this.inflights = inflights;
this.waitForCompletion = waitForCompletion;
this.send = send;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,10 @@ IllegalArgumentException illegalArgumentForWorkerConfigKey(String annotation, St

@Message(id = 1004, value = "Multiple Outgoings count does not match the number of split branches in %s. %d, %d")
IllegalStateException outgoingsDoesNotMatchMultiSplitterTarget(String method, int outgoings, int splitTarget);

@Message(id = 1005, value = "Expecting downstream to consume without back-pressure")
IllegalStateException illegalStateConsumeWithoutBackPressure();

@Message(id = 1006, value = "Only one subscriber allowed")
IllegalStateException illegalStateOnlyOneSubscriber();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.smallrye.reactive.messaging.providers.extension.*;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import io.smallrye.reactive.messaging.providers.locals.ContextDecorator;

@ApplicationScoped
public class Wiring {
Expand Down Expand Up @@ -517,7 +516,7 @@ private <T extends MessagePublisherProvider<?>> void registerEmitter(ChannelRegi
T emitter = (T) emitterFactory.createEmitter(configuration, def);
Multi<? extends Message<?>> publisher = Multi.createFrom().publisher(emitter.getPublisher());
for (PublisherDecorator decorator : getSortedInstances(decorators)) {
publisher = decorator.decorate(publisher, configuration.name(), false);
publisher = decorator.decorate(publisher, configuration.name(), false);
}
Class<T> type = (Class<T>) configuration.emitterType().value();
registry.register(configuration.name(), type, emitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.helpers.SenderProcessor;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarAttributesExtractor;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTraceTextMapSetter;
Expand All @@ -40,7 +41,7 @@
public class PulsarOutgoingChannel<T> {

private final Producer<T> producer;
private final PulsarSenderProcessor processor;
private final SenderProcessor processor;
private final Flow.Subscriber<? extends Message<?>> subscriber;
private final String channel;
private final boolean healthEnabled;
Expand Down Expand Up @@ -79,7 +80,7 @@ public PulsarOutgoingChannel(PulsarClient client, Schema<T> schema, PulsarConnec
requests = Long.MAX_VALUE;
}

processor = new PulsarSenderProcessor(requests, oc.getWaitForWriteCompletion(), this::sendMessage);
processor = new SenderProcessor(requests, oc.getWaitForWriteCompletion(), this::sendMessage);
subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(f -> {
log.unableToDispatch(f);
reportFailure(f);
Expand Down

0 comments on commit b1ba9f1

Please sign in to comment.