Skip to content

Commit

Permalink
Connector contribution guide
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 7, 2023
1 parent 0c7b0eb commit 946f99f
Show file tree
Hide file tree
Showing 30 changed files with 1,754 additions and 2 deletions.
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ nav:
- 'Development Model': concepts/model.md
- 'Emitters and Channel' : concepts/emitter.md
- 'Connectors' : concepts/connectors.md
- 'Contributing Connectors' : concepts/contributing-connectors.md
- 'Acknowledgement': concepts/acknowledgement.md
- 'Blocking Processing': concepts/blocking.md
- 'Method Signatures': concepts/signatures.md
Expand Down
53 changes: 53 additions & 0 deletions documentation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@
</properties>

<dependencies>
<dependency>
<groupId>io.smallrye.config</groupId>
<artifactId>smallrye-config</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-connector-attribute-processor</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
Expand Down Expand Up @@ -87,6 +96,34 @@
<version>${camel.version}</version>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>test-common</artifactId>
<version>${project.version}</version>
<!-- used in snippets -->
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.jboss.weld.se</groupId>
<artifactId>weld-se-shaded</artifactId>
<version>${weld.version}</version>
<!-- used in snippets -->
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.jboss.weld</groupId>
<artifactId>weld-core-impl</artifactId>
<version>${weld.version}</version>
<!-- used in snippets -->
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<!-- used in snippets -->
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down Expand Up @@ -165,6 +202,22 @@
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<generatedSourcesDirectory>${project.build.directory}/generated-sources/</generatedSourcesDirectory>
<annotationProcessors>
<annotationProcessor>
io.smallrye.reactive.messaging.connector.ConnectorAttributeProcessor
</annotationProcessor>
<annotationProcessor>
org.jboss.logging.processor.apt.LoggingToolsProcessor
</annotationProcessor>
</annotationProcessors>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions documentation/src/main/docs/concepts/concepts.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Concepts

When dealing with event-driven or data streaming applications, there are
a few concepts and vocabulary to introduce.

Expand Down
3 changes: 1 addition & 2 deletions documentation/src/main/docs/concepts/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ mp.messaging.outgoing.data.acks=1
the connector’s name and set the `connector` attribute for each channel
managed by this connector.


# Connector attribute table
## Connector attribute table

In the connector documentation, you will find a table listing the
attribute supported by the connector. Be aware that attributes for
Expand Down
391 changes: 391 additions & 0 deletions documentation/src/main/docs/concepts/contributing-connectors.md

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions documentation/src/main/java/connectors/MyAckHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package connectors;

import java.util.concurrent.CompletionStage;

import connectors.api.BrokerClient;
import io.smallrye.mutiny.Uni;

public class MyAckHandler {

private final BrokerClient client;

static MyAckHandler create(BrokerClient client) {
return new MyAckHandler(client);
}

public MyAckHandler(BrokerClient client) {
this.client = client;
}

public CompletionStage<Void> handle(MyMessage<?> msg) {
return Uni.createFrom().completionStage(client.ack(msg.getConsumedMessage()))
.emitOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}
}
75 changes: 75 additions & 0 deletions documentation/src/main/java/connectors/MyConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package connectors;

import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import connectors.api.BrokerClient;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.vertx.mutiny.core.Vertx;

@ApplicationScoped
@Connector(MyConnector.CONNECTOR_NAME)
@ConnectorAttribute(name = "client-id", type = "string", direction = INCOMING_AND_OUTGOING, description = "The client id ", mandatory = true)
@ConnectorAttribute(name = "buffer-size", type = "int", direction = INCOMING, description = "The size buffer of incoming messages waiting to be processed", defaultValue = "128")
@ConnectorAttribute(name = "topic", type = "string", direction = OUTGOING, description = "The default topic to send the messages, defaults to channel name if not set")
@ConnectorAttribute(name = "maxPendingMessages", type = "int", direction = OUTGOING, description = "The maximum size of a queue holding pending messages", defaultValue = "1000")
@ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = OUTGOING, description = "Whether the outgoing channel waits for the write completion", defaultValue = "true")
public class MyConnector implements InboundConnector, OutboundConnector {

public static final String CONNECTOR_NAME = "smallrye-my-connector";

@Inject
ExecutionHolder executionHolder;

Vertx vertx;

List<MyIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>();
List<MyOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>();

@PostConstruct
void init() {
this.vertx = executionHolder.vertx();
}

@Override
public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
MyConnectorIncomingConfiguration ic = new MyConnectorIncomingConfiguration(config);
String channelName = ic.getChannel();
String clientId = ic.getClientId();
int bufferSize = ic.getBufferSize();
// ...
BrokerClient client = BrokerClient.create(clientId);
MyIncomingChannel channel = new MyIncomingChannel(vertx, ic, client);
incomingChannels.add(channel);
return channel.getStream();
}

@Override
public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
MyConnectorOutgoingConfiguration oc = new MyConnectorOutgoingConfiguration(config);
String channelName = oc.getChannel();
String clientId = oc.getClientId();
int pendingMessages = oc.getMaxPendingMessages();
// ...
BrokerClient client = BrokerClient.create(clientId);
MyOutgoingChannel channel = new MyOutgoingChannel(vertx, oc, client);
outgoingChannels.add(channel);
return channel.getSubscriber();
}
}
106 changes: 106 additions & 0 deletions documentation/src/main/java/connectors/MyConnectorWithPartials.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package connectors;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import connectors.api.BrokerClient;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.vertx.mutiny.core.Vertx;

// <health-report>
@ApplicationScoped
@Connector(MyConnectorWithPartials.CONNECTOR_NAME)
public class MyConnectorWithPartials implements InboundConnector, OutboundConnector, HealthReporter {

public static final String CONNECTOR_NAME = "smallrye-my-connector";

List<MyIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>();
List<MyOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>();

@Override
public HealthReport getReadiness() {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
for (MyIncomingChannel channel : incomingChannels) {
builder.add(channel.getChannel(), true);
}
for (MyOutgoingChannel channel : outgoingChannels) {
builder.add(channel.getChannel(), true);
}
return builder.build();
}

@Override
public HealthReport getLiveness() {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
for (MyIncomingChannel channel : incomingChannels) {
builder.add(channel.getChannel(), true);
}
for (MyOutgoingChannel channel : outgoingChannels) {
builder.add(channel.getChannel(), true);
}
return builder.build();
}

@Override
public HealthReport getStartup() {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
for (MyIncomingChannel channel : incomingChannels) {
builder.add(channel.getChannel(), true);
}
for (MyOutgoingChannel channel : outgoingChannels) {
builder.add(channel.getChannel(), true);
}
return builder.build();
}

// </health-report>

@Inject
ExecutionHolder executionHolder;

Vertx vertx;

@PostConstruct
void init() {
this.vertx = executionHolder.vertx();
}

@Override
public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
MyConnectorIncomingConfiguration ic = new MyConnectorIncomingConfiguration(config);
String channelName = ic.getChannel();
String clientId = ic.getClientId();
int bufferSize = ic.getBufferSize();
// ...
BrokerClient client = BrokerClient.create(clientId);
MyIncomingChannel channel = new MyIncomingChannel(vertx, ic, client);
incomingChannels.add(channel);
return channel.getStream();
}

@Override
public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
MyConnectorOutgoingConfiguration oc = new MyConnectorOutgoingConfiguration(config);
String channelName = oc.getChannel();
String clientId = oc.getClientId();
int pendingMessages = oc.getMaxPendingMessages();
// ...
BrokerClient client = BrokerClient.create(clientId);
MyOutgoingChannel channel = new MyOutgoingChannel(vertx, oc, client);
outgoingChannels.add(channel);
return channel.getSubscriber();
}
}
27 changes: 27 additions & 0 deletions documentation/src/main/java/connectors/MyFailureHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package connectors;

import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import connectors.api.BrokerClient;
import io.smallrye.mutiny.Uni;

public class MyFailureHandler {

private final BrokerClient client;

static MyFailureHandler create(BrokerClient client) {
return new MyFailureHandler(client);
}

public MyFailureHandler(BrokerClient client) {
this.client = client;
}

public CompletionStage<Void> handle(MyMessage<?> msg, Throwable reason, Metadata metadata) {
return Uni.createFrom().completionStage(() -> client.reject(msg.getConsumedMessage(), reason.getMessage()))
.emitOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}
}
57 changes: 57 additions & 0 deletions documentation/src/main/java/connectors/MyIncomingChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package connectors;

import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.microprofile.reactive.messaging.Message;

import connectors.api.BrokerClient;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

public class MyIncomingChannel {

private final String channel;
private final BrokerClient client;
private final Context context;
private final MyAckHandler ackHandler;
private final MyFailureHandler failureHandler;
private final AtomicBoolean closed = new AtomicBoolean(false);
private Flow.Publisher<? extends Message<?>> stream;

public MyIncomingChannel(Vertx vertx, MyConnectorIncomingConfiguration cfg, BrokerClient client) {
// create and configure the client with MyConnectorIncomingConfiguration
this.channel = cfg.getChannel();
this.client = client;
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.ackHandler = MyAckHandler.create(this.client);
this.failureHandler = MyFailureHandler.create(this.client);
this.stream = Multi.createBy().repeating()
.uni(() -> Uni.createFrom().completionStage(this.client.poll()))
.until(__ -> closed.get())
.emitOn(context::runOnContext)
.map(consumed -> new MyMessage<>(consumed, ackHandler, failureHandler));
}

public String getChannel() {
return channel;
}

public Flow.Publisher<? extends Message<?>> getStream() {
return this.stream;
}

public void close() {
closed.compareAndSet(false, true);
client.close();
}

void isReady(HealthReport.HealthReportBuilder healthReportBuilder) {

}

}
Loading

0 comments on commit 946f99f

Please sign in to comment.