Skip to content

Commit

Permalink
Merge pull request #2 from Reef3rm4n/feature/postgres-message-broker/…
Browse files Browse the repository at this point in the history
…durable_messages_and_topics

Feature/postgres message broker/durable messages and topics
  • Loading branch information
Reef3rm4n authored Aug 21, 2023
2 parents 088963a + c29f14b commit a036bee
Show file tree
Hide file tree
Showing 92 changed files with 2,625 additions and 2,517 deletions.
19 changes: 9 additions & 10 deletions es4j-core/src/main/java/io/es4j/launcher/AggregateDeployer.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class AggregateDeployer<T extends Aggregate> {
private final Class<T> aggregateClass;
private Infrastructure infrastructure;
private List<AggregateServices> aggregateServices;
private final Set<String> deployed = new HashSet<>();
private final Stack<String> deployed = new Stack<>();
private CronTaskDeployer cronTaskDeployer;
private TimerTaskDeployer timerTaskDeployer;

Expand Down Expand Up @@ -74,13 +74,12 @@ public void deploy(final Promise<Void> startPromise) {
addProjections();
final Supplier<Verticle> supplier = () -> new AggregateVerticle<>(es4jDeploymentConfiguration, aggregateClass, nodeDeploymentID);
return startChannel(vertx, es4jDeploymentConfiguration.aggregateClass(), nodeDeploymentID)
.flatMap(avoid -> vertx.deployVerticle(supplier, new DeploymentOptions()
.flatMap(avoid -> Multi.createBy().repeating().uni(() ->vertx.deployVerticle(supplier, new DeploymentOptions()
.setConfig(infrastructureConfiguration)
.setInstances(CpuCoreSensor.availableProcessors() * 2)
)
.map(deployed::add)
)
.call(avoid -> {
.map(deployed::push)
).atMost(CpuCoreSensor.availableProcessors() * 2L).collect().asList()
.call(__ -> {
this.aggregateServices = Es4jServiceLoader.loadAggregateServices();
return Es4jConfigurationHandler.fsConfigurations(vertx, es4jDeploymentConfiguration.fileBusinessRules())
.flatMap(av -> Multi.createFrom().iterable(aggregateServices)
Expand All @@ -91,6 +90,7 @@ public void deploy(final Promise<Void> startPromise) {
.replaceWithVoid()
);
}
)
);
}
)
Expand Down Expand Up @@ -174,18 +174,17 @@ public Uni<Void> close() {
if (Objects.nonNull(timerTaskDeployer)) {
timerTaskDeployer.close();
}
if (!aggregateServices.isEmpty()) {
if (Objects.nonNull(aggregateServices) && !aggregateServices.isEmpty()) {
closeUnis.addAll(aggregateServices.stream().map(AggregateServices::stop).toList());
}
if (!deployed.isEmpty()) {
closeUnis.add(Multi.createFrom().iterable(deployed)
closeUnis.add(Multi.createBy().repeating().supplier(deployed::pop).whilst(__ -> !deployed.isEmpty())
.onItem().transformToUniAndMerge(vertx::undeploy)
.collect().asList()
.invoke(avoid -> deployed.clear())
.replaceWithVoid()
);
}
if (infrastructure != null) {
if (Objects.nonNull(infrastructure)) {
closeUnis.add(infrastructure.stop());
}
if (!closeUnis.isEmpty()) {
Expand Down
2 changes: 1 addition & 1 deletion es4j-infrastructure/es4j-http-bridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4</version>
<version>2.13.4.1</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
@startuml
participant "Database" as D #LightBlue
participant "Es4j Postgres Message Broker" as EPMB

EPMB -> D : Claim partition
note over EPMB, D: The library claims a partition in the database

loop every 30 seconds
EPMB -> D : Heartbeat
note over EPMB, D: A heartbeat is sent every 30 seconds to maintain the partition session alive
end loop
@enduml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
@startuml
participant "Es4j Postgres Message Broker" as EPMB
participant "Database" as D #LightBlue

EPMB -> EPMB : Startup
note over EPMB: The broker starts up

EPMB -> D : Read available partitions
note over D, EPMB: The broker reads all available partitions from the database

D --> EPMB : Return partitions
note over EPMB: The database returns the list of available partitions

EPMB -> EPMB : Add partitions to hash ring
note over EPMB: The broker adds the partitions to its internal hash ring
@enduml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
@startuml
participant "Database" as D #LightBlue
participant "Es4j Postgres Message Broker 1" as EPMB1
participant "Es4j Postgres Message Broker 2" as EPMB2
participant "Business Logic" as C4
"EPMB1" -> "EPMB1": trigger message consumers timer task
C4 -> "EPMB1": sendMessage(partition-1)
EPMB1 -> D : Claim partition
note over EPMB1, D: The broker claims a partition in the database
note over EPMB1: TimerTask is trigger for checking the availability of messages
note over EPMB1: TimerTask is trigger heartbeat


loop every 30 seconds
alt Heartbeat Successful
EPMB1 -> D : Heartbeat
note over EPMB1, D: A heartbeat is sent every 30 seconds to maintain the connection
else Heartbeat Fails
EPMB1 -> D : Heartbeat
note over EPMB1, D: Heartbeat failed

EPMB2 -> D : Claim partition on channel signal
note over EPMB2, D: The second broker claims the partition
C4 -> EPMB2 : sendMessage(partition-1)
"EPMB1" -> "Database": insertMessage
"Database" -> EPMB2: message in partition
EPMB2 -> "Database": startSession(partition-1)
end
end loop
@enduml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
@startuml
participant "Business Logic" as C4
participant "Es4j Postgres Broker 1" as C1
participant "Es4j Postgres Broker 2" as C2
participant "Es4j Postgres Broker 3" as C3
participant "Database" as D #LightBlue

C4 -> C1 : send(Message)

C1 -> C1 : calculate_partition(Message)

C1 -> D : Message (partition-1)

D -> C1 : message(partition-1)
note over C1: startSession(partition-1)
note over C1: pollOnce = true

D -> C2 : message(partition-1)
note over C2: startSession(partition-1)
note over C2: partition-1 already claimed

D -> C3 : message(partition-1)
note over C3: startSession(partition-1)
note over C3: partition-1 already claimed

@enduml
4 changes: 4 additions & 0 deletions es4j-infrastructure/es4j-postgres-message-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>io.es4j</groupId>
<artifactId>es4j-http-bridge</artifactId>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import io.es4j.infrastructure.pgbroker.models.Message;
import io.es4j.infrastructure.pgbroker.models.ConsumerTransaction;
import io.es4j.infrastructure.pgbroker.models.RawMessage;
import io.es4j.sql.RepositoryHandler;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
Expand All @@ -14,5 +15,5 @@ public interface ConsumerTransactionProvider {

void start(RepositoryHandler repositoryHandler);

<M> Uni<Void> transaction(String processorClass, Message<M> message, BiFunction<Message<M>, ConsumerTransaction, Uni<Void>> function);
<T> Uni<T> transaction(String consumer, RawMessage message, BiFunction<RawMessage, ConsumerTransaction, Uni<T>> function);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.es4j.infrastructure.pgbroker;


import io.es4j.infrastructure.pgbroker.models.PgBrokerConfiguration;
import io.es4j.infrastructure.pgbroker.models.BrokerConfiguration;
import io.es4j.sql.LiquibaseHandler;
import io.es4j.sql.RepositoryHandler;
import io.es4j.sql.misc.EnvVars;
Expand All @@ -18,63 +18,56 @@

import java.util.*;
import java.util.function.Supplier;
import java.util.stream.IntStream;

public class PgBroker {
private static final Logger LOGGER = LoggerFactory.getLogger(PgBroker.class);
private static final List<String> deployments = new ArrayList<>();
public static final Stack<String> deployments = new Stack<>();

private PgBroker() {
}

public static Uni<Void> deploy(JsonObject configuration, Vertx vertx, Integer instances) {
try {
final var brokerConfiguration = PgBrokerConfiguration.defaultConfiguration();
final var brokerConfiguration = BrokerConfiguration.defaultConfiguration();
LOGGER.info("starting pg broker {}", brokerConfiguration);
LOGGER.info("broker infra configuration {}", configuration.encodePrettily());
final var repositoryHandler = RepositoryHandler.leasePool(configuration, vertx);
return LiquibaseHandler.liquibaseString(repositoryHandler, "queue.xml", Map.of("schema", repositoryHandler.configuration().getString("schema", EnvVars.SCHEMA)))
.flatMap(avoid -> {
Supplier<Verticle> supplier = () -> new PgBrokerVerticle(brokerConfiguration);
return repositoryHandler.vertx().deployVerticle(supplier, new DeploymentOptions().setInstances(instances).setConfig(configuration))
.map(deployments::add)
.replaceWithVoid();
}
);
return LiquibaseHandler.liquibaseString(repositoryHandler, "pg-broker.xml", Map.of("schema", repositoryHandler.configuration().getString("schema", EnvVars.SCHEMA)))
.flatMap(avoid -> deploy(configuration, vertx, instances, brokerConfiguration));
} catch (Exception e) {
return Uni.createFrom().failure(e);
}
}
public static Uni<Void> deploy(JsonObject configuration, Vertx vertx) {

private static Uni<Void> deploy(JsonObject configuration, Vertx vertx, Integer instances, BrokerConfiguration brokerConfiguration) {
return Multi.createBy().repeating().uni(
() -> vertx.deployVerticle(new PgBrokerVerticle(brokerConfiguration), new DeploymentOptions().setConfig(configuration))
.map(deployments::push)
).atMost(instances).collect().asList()
.replaceWithVoid();
}

public static Uni<Void> deploy(JsonObject configuration, Vertx vertx) {
try {
final var brokerConfiguration = PgBrokerConfiguration.defaultConfiguration();
final var brokerConfiguration = BrokerConfiguration.defaultConfiguration();
LOGGER.info("starting pg broker {}", brokerConfiguration);
LOGGER.info("broker infra configuration {}", configuration.encodePrettily());
final var repositoryHandler = RepositoryHandler.leasePool(configuration, vertx);
return LiquibaseHandler.liquibaseString(repositoryHandler, "queue.xml", Map.of("schema", repositoryHandler.configuration().getString("schema", EnvVars.SCHEMA)))
.flatMap(avoid -> {
Supplier<Verticle> supplier = () -> new PgBrokerVerticle(brokerConfiguration);
return repositoryHandler.vertx().deployVerticle(supplier, new DeploymentOptions().setInstances(1).setConfig(configuration))
.map(deployments::add)
.replaceWithVoid();
}
);
return LiquibaseHandler.liquibaseString(repositoryHandler, "pg-broker.xml", Map.of("schema", repositoryHandler.configuration().getString("schema", EnvVars.SCHEMA)))
.flatMap(avoid -> deploy(configuration,vertx,1,brokerConfiguration));
} catch (Exception e) {
return Uni.createFrom().failure(e);
}
}

public static Uni<Void> deploy(JsonObject configuration, Vertx vertx, PgBrokerConfiguration brokerConfiguration) {
public static Uni<Void> deploy(JsonObject configuration, Vertx vertx, BrokerConfiguration brokerConfiguration) {
try {
LOGGER.info("starting pg broker {}", brokerConfiguration);
LOGGER.info("broker infra configuration {}", configuration.encodePrettily());
final var repositoryHandler = RepositoryHandler.leasePool(configuration, vertx);
return LiquibaseHandler.liquibaseString(repositoryHandler, "queue.xml", Map.of("schema", repositoryHandler.configuration().getString("schema", EnvVars.SCHEMA)))
.flatMap(avoid -> {
Supplier<Verticle> supplier = () -> new PgBrokerVerticle(brokerConfiguration);
return repositoryHandler.vertx().deployVerticle(supplier, new DeploymentOptions().setInstances(CpuCoreSensor.availableProcessors()).setConfig(configuration))
.map(deployments::add)
.replaceWithVoid();
}
return LiquibaseHandler.liquibaseString(repositoryHandler, "pg-broker.xml", Map.of("schema", repositoryHandler.configuration().getString("schema", EnvVars.SCHEMA)))
.flatMap(avoid -> deploy(configuration,vertx,CpuCoreSensor.availableProcessors(),brokerConfiguration)
);
} catch (Exception e) {
return Uni.createFrom().failure(e);
Expand All @@ -83,8 +76,14 @@ public static Uni<Void> deploy(JsonObject configuration, Vertx vertx, PgBrokerCo

public static Uni<Void> undeploy(Vertx vertx) {
if (!deployments.isEmpty()) {
return Multi.createFrom().iterable(deployments)
.onItem().transformToUniAndMerge(vertx::undeploy)
return Multi.createBy().repeating().supplier(deployments::pop)
.whilst(__ -> !deployments.isEmpty())
.onItem().transformToUniAndMerge(deploymentID -> {
LOGGER.info("Dropping verticle -> {}", deploymentID);
return vertx.undeploy(deploymentID)
.onFailure().invoke(throwable -> LOGGER.error("Unable to drop {}",deploymentID, throwable));
})

.collect().asList()
.replaceWithVoid();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.es4j.infrastructure.pgbroker;

import io.es4j.http.HttpRoute;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.Router;

public class PgBrokerRoute implements HttpRoute {

@Override
public Uni<Void> start(Vertx vertx, JsonObject configuration) {
return null;
}

@Override
public void registerRoutes(Router router) {
// todo implement end-points
router.get("/postgres-message-broker/messages");
router.get("/postgres-message-broker/dlq");
router.get("/postgres-message-broker/tx");
router.post("/postgres-message-broker/message");
router.post("/postgres-message-broker/cancel");
router.post("/postgres-message-broker/retry");
}


}
Loading

0 comments on commit a036bee

Please sign in to comment.