Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft - AWS - SQS support #2402

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
<module>smallrye-reactive-messaging-rabbitmq</module>
<module>smallrye-reactive-messaging-gcp-pubsub</module>
<module>smallrye-reactive-messaging-pulsar</module>
<module>smallrye-reactive-messaging-aws</module>
<module>examples/quickstart</module>
<module>examples/kafka-quickstart</module>
<module>examples/kafka-quickstart-kotlin</module>
Expand Down
163 changes: 163 additions & 0 deletions smallrye-reactive-messaging-aws/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging</artifactId>
<version>4.14.0-SNAPSHOT</version>
</parent>

<artifactId>smallrye-reactive-messaging-aws</artifactId>

<name>SmallRye Reactive Messaging : Connector :: AWS</name>

<packaging>pom</packaging>

<properties>
<awssdk.version>2.21.37</awssdk.version>
</properties>

<modules>
<module>smallrye-reactive-messaging-aws-core</module>
<module>smallrye-reactive-messaging-aws-test</module>
<module>smallrye-reactive-messaging-aws-sqs</module>
</modules>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>${awssdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>${project.version}</version>
</dependency>


<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-connector-attribute-processor</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test config -->
<dependency>
<groupId>io.smallrye.config</groupId>
<artifactId>smallrye-config</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>test-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.weld.se</groupId>
<artifactId>weld-se-shaded</artifactId>
<version>${weld.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.weld</groupId>
<artifactId>weld-core-impl</artifactId>
<version>${weld.version}</version>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>software.amazon.awssdk</groupId>-->
<!-- <artifactId>aws-crt-client</artifactId>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jakarta.json.bind</groupId>
<artifactId>jakarta.json.bind-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse</groupId>
<artifactId>yasson</artifactId>
<version>${yasson.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<generatedSourcesDirectory>${project.build.directory}/generated-sources/</generatedSourcesDirectory>
<annotationProcessors>
<annotationProcessor>
io.smallrye.reactive.messaging.connector.ConnectorAttributeProcessor
</annotationProcessor>
<annotationProcessor>
org.jboss.logging.processor.apt.LoggingToolsProcessor
</annotationProcessor>
</annotationProcessors>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>coverage</id>
<properties>
<argLine>@{jacocoArgLine}</argLine>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-aws</artifactId>
<version>4.14.0-SNAPSHOT</version>
</parent>

<artifactId>smallrye-reactive-messaging-aws-core</artifactId>

<name>SmallRye Reactive Messaging : Connector :: AWS Core</name>

<dependencies>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.smallrye.reactive.messaging.aws.client;

import io.smallrye.reactive.messaging.aws.serialization.Deserializer;
import io.smallrye.reactive.messaging.aws.serialization.Serializer;
import io.vertx.mutiny.core.Vertx;

public class ClientHolder<CLIENT, CONFIG> {

private final CLIENT client;
private final Vertx vertx;
private final CONFIG config;
private final Serializer serializer;
private final Deserializer deserializer;

public ClientHolder(CLIENT client, Vertx vertx, CONFIG config, Serializer serializer, Deserializer deserializer) {
this.client = client;
this.vertx = vertx;
this.config = config;
this.serializer = serializer;
this.deserializer = deserializer;
}

public CLIENT getClient() {
return client;
}

public Vertx getVertx() {
return vertx;
}

public CONFIG getConfig() {
return config;
}

public Serializer getSerializer() {
return serializer;
}

public Deserializer getDeserializer() {
return deserializer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.smallrye.reactive.messaging.aws.config;

import java.util.*;

public class ConfigHelper {

public static List<String> parseToList(String valueToParse) {
List<String> result = new ArrayList<>();

Arrays.stream(valueToParse.split(","))
.map(String::trim)
.forEach(result::add);

return result;
}

public static Map<String, String> parseToMap(String valueToParse) {
Map<String, String> result = new HashMap<>();

Arrays.stream(valueToParse.split(","))
.map(String::trim)
.forEach(keyValue -> {
String[] keyValueSplit = keyValue.split(":");
if (keyValueSplit.length == 2) {
String key = keyValueSplit[0];
String value = keyValueSplit[1];
result.put(key, value);
}
});

return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.smallrye.reactive.messaging.aws.i18n;

import jakarta.enterprise.inject.AmbiguousResolutionException;

import org.jboss.logging.Messages;
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageBundle;

@MessageBundle(projectCode = "SRMSG", length = 5)
public interface AwsExceptions {

AwsExceptions ex = Messages.getBundle(AwsExceptions.class);

@Message(id = 18012, value = "Unable to select the Deserializer named `%s` for channel `%s` - too many matches (%d)")
AmbiguousResolutionException unableToFindDeserializer(String name, String channel, int count);

@Message(id = 18014, value = "Unable to select the Serializer named `%s` for channel `%s` - too many matches (%d)")
AmbiguousResolutionException unableToFindSerializer(String name, String channel, int count);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.smallrye.reactive.messaging.aws.serialization;

/**
* This interface describes how a {@link String} is deserialized to an object
*/
public interface Deserializer {

/**
* Deserialize a message to an object.
*
* @param payload message to deserialize
* @return the deserialized message
* TODO: ex
* @throws RuntimeException in case the deserialization fails
*/
Object deserialize(String payload);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.smallrye.reactive.messaging.aws.serialization;

import static io.smallrye.reactive.messaging.aws.i18n.AwsExceptions.ex;

import java.util.Map;
import java.util.function.Function;

import jakarta.enterprise.inject.AmbiguousResolutionException;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;

public class SerializationResolver {

public static Serializer resolveSerializer(Instance<Serializer> messageSerializer,
String name, String channel,
JsonMapping jsonMapping) {
return resolve(messageSerializer, name, defaultSerializer(jsonMapping),
count -> ex.unableToFindSerializer(name, channel, count));
}

private static Serializer defaultSerializer(JsonMapping jsonMapping) {
return payload -> {
if (jsonMapping != null) {
if (payload instanceof String) {
return (String) payload;
}
return jsonMapping.toJson(payload);
} else {
return String.valueOf(payload);
}
};
}

public static Deserializer resolveDeserializer(Instance<Deserializer> messageDeserializer,
String name, String channel,
JsonMapping jsonMapping) {
return resolve(messageDeserializer, name, defaultDeserializer(jsonMapping),
count -> ex.unableToFindDeserializer(name, channel, count));
}

private static Deserializer defaultDeserializer(JsonMapping jsonMapping) {
return payload -> {
if (jsonMapping != null) {
return jsonMapping.fromJson(payload, Map.class);
} else {
return String.valueOf(payload);
}
};
}

private static <T> T resolve(Instance<T> instances, String name,
T defaultValue,
Function<Integer, AmbiguousResolutionException> ambiguous) {
Instance<T> instance = instances.select(Identifier.Literal.of(name));

if (instance.isUnsatisfied()) {
// this `if` block should be removed when support for the `@Named` annotation is removed
instance = instances.select(NamedLiteral.of(name));
if (!instance.isUnsatisfied()) {
ProviderLogging.log.deprecatedNamed();
}
}

if (instance.isUnsatisfied()) {
return defaultValue;
} else if (instance.stream().count() > 1) {
throw ambiguous.apply((int) instance.stream().count());
} else {
return instance.get();
}
}
}
Loading