Skip to content

Commit

Permalink
#27 split commands into producer/consumer sub-projects
Browse files Browse the repository at this point in the history
  • Loading branch information
cer committed Feb 22, 2024
1 parent 2d8d6fe commit 7113edf
Show file tree
Hide file tree
Showing 37 changed files with 708 additions and 205 deletions.
2 changes: 1 addition & 1 deletion docker-compose-mysql-binlog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ services:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql/eventuate
SPRING_DATASOURCE_USERNAME: mysqluser
SPRING_DATASOURCE_PASSWORD: mysqlpw
SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.mysql.jdbc.Driver
SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.mysql.cj.jdbc.Driver
EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:29092
EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181
EVENTUATELOCAL_CDC_DB_USER_NAME: root
Expand Down
4 changes: 4 additions & 0 deletions eventuate-tram-examples-basic-command-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies {
implementation "org.springframework.boot:spring-boot-starter"
implementation "io.eventuate.tram.core:eventuate-tram-spring-commands-starter"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.eventuate.tram.examples.basic.commands.common;


import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "command")
public class CommandConfigurationProperties {

private String commandChannel = "commandChannel";
private String replyChannel = "replyChannel";

public String getCommandChannel() {
return commandChannel;
}

public void setCommandChannel(String commandChannel) {
this.commandChannel = commandChannel;
}

public String getReplyChannel() {
return replyChannel;
}

public void setReplyChannel(String replyChannel) {
this.replyChannel = replyChannel;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.eventuate.tram.examples.basic.commands;
package io.eventuate.tram.examples.basic.commands.common;

import io.eventuate.tram.commands.common.Command;

Expand Down
35 changes: 35 additions & 0 deletions eventuate-tram-examples-basic-command-consumer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
dependencies {
implementation project(":eventuate-tram-examples-basic-command-common")

implementation "io.eventuate.tram.core:eventuate-tram-spring-commands-starter"
implementation "io.eventuate.tram.core:eventuate-tram-spring-messaging-starter"

if (messageBroker == 'kafka') {
implementation "io.eventuate.tram.core:eventuate-tram-spring-jdbc-$messageBroker"
} else {
implementation "io.eventuate.tram.core:eventuate-tram-jdbc-$messageBroker"
}


implementation "org.springframework.boot:spring-boot-starter-web"

implementation "junit:junit:4.12"

testImplementation project(":eventuate-tram-examples-basic-command-producer")

testImplementation "org.springframework.boot:spring-boot-starter-test"
testImplementation "io.eventuate.tram.core:eventuate-tram-spring-testing-support-outbox-commands:$eventuateTramVersion" // FIXME
testImplementation "io.eventuate.tram.core:eventuate-tram-spring-testing-support-messaging:$eventuateTramVersion" // FIXME


testImplementation "io.eventuate.util:eventuate-util-test"
testImplementation "io.eventuate.tram.core:eventuate-tram-spring-in-memory"
testImplementation "org.springframework.boot:spring-boot-starter-test"
testImplementation "io.rest-assured:spring-mock-mvc"

}

test {
useJUnitPlatform()
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.eventuate.tram.examples.basic.commands.consumer;

import io.eventuate.tram.commands.consumer.CommandDispatcher;
import io.eventuate.tram.commands.consumer.CommandDispatcherFactory;
import io.eventuate.tram.examples.basic.commands.common.CommandConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties(CommandConfigurationProperties.class)
public class CommandConsumerConfiguration {

@Bean
public CreditManagementCommandHandlers creditManagementCommandHandlers(CommandConfigurationProperties commandConfigurationProperties) {
return new CreditManagementCommandHandlers(commandConfigurationProperties.getCommandChannel());
}

@Bean
public CommandDispatcher commandDispatcher(CommandDispatcherFactory commandDispatcherFactory, CreditManagementCommandHandlers creditManagementCommandHandlers) {
return commandDispatcherFactory.make("command-dispatcher-" + System.currentTimeMillis(), creditManagementCommandHandlers.getCommandHandlers());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.eventuate.tram.examples.basic.commands.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;

@SpringBootApplication
@Import(CommandConsumerConfiguration.class)
public class CommandConsumerMain {

public static void main(String[] args) {
SpringApplication.run(CommandConsumerMain.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.eventuate.tram.examples.basic.commands.consumer;

import io.eventuate.tram.commands.consumer.CommandHandlers;
import io.eventuate.tram.commands.consumer.CommandHandlersBuilder;
import io.eventuate.tram.commands.consumer.CommandMessage;
import io.eventuate.tram.examples.basic.commands.common.ReserveCreditCommand;
import io.eventuate.tram.messaging.common.Message;

import static io.eventuate.tram.commands.consumer.CommandHandlerReplyBuilder.withSuccess;

public class CreditManagementCommandHandlers {

private final String commandChannel;

public CreditManagementCommandHandlers(String commandChannel) {
this.commandChannel = commandChannel;
}


public CommandHandlers getCommandHandlers() {
return CommandHandlersBuilder
.fromChannel(commandChannel)
.onMessage(ReserveCreditCommand.class, this::reserveCredit)
.build();

}
public Message reserveCredit(CommandMessage<ReserveCreditCommand> cm) {

System.out.println("customerId=" + cm.getCommand().getCustomerId());
System.out.println("cm=" + cm);
return withSuccess();

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
logging:
level:
io:
eventuate: DEBUG

server.port: 8081

spring:
datasource:
url: jdbc:mysql://${DOCKER_HOST_IP:localhost}/eventuate
username: mysqluser
password: mysqlpw
driver-class-name: com.mysql.cj.jdbc.Driver

eventuatelocal:
kafka:
bootstrap:
servers: ${DOCKER_HOST_IP:localhost}:9092
zookeeper:
connection:
string: ${DOCKER_HOST_IP:localhost}:2181

activemq:
url: tcp://${DOCKER_HOST_IP:localhost}:61616

rabbitmq:
broker:
addresses: ${DOCKER_HOST_IP:localhost}

---
spring:
profiles: postgres
datasource:
url: jdbc:postgresql://${DOCKER_HOST_IP:localhost}/eventuate
username: eventuate
password: eventuate
driver-class-name: org.postgresql.Driver

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.eventuate.tram.examples.basic.commands.broker;

import io.eventuate.tram.commands.common.CommandReplyOutcome;
import io.eventuate.tram.commands.common.ReplyMessageHeaders;
import io.eventuate.tram.examples.basic.commands.consumer.CommandConsumerConfiguration;
import io.eventuate.tram.examples.basic.commands.producer.CommandProducerConfiguration;
import io.eventuate.tram.examples.basic.commands.producer.CommandProducingService;
import io.eventuate.tram.examples.basic.commands.producer.ProduceRequest;
import io.eventuate.tram.spring.testing.messaging.consumer.AssertableMessageConsumer;
import io.eventuate.tram.spring.testing.messaging.consumer.AssertableMessageConsumerConfiguration;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@SpringBootTest(classes = CommandConsumerBrokerTest.Config.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {"command.commandChannel=command-${random.value}",
"command.replyChannel=reply-${random.value}"})
public class CommandConsumerBrokerTest {

@Configuration
@EnableAutoConfiguration
@Import({CommandProducerConfiguration.class, CommandConsumerConfiguration.class,
AssertableMessageConsumerConfiguration.class
})
public static class Config {

}

@LocalServerPort
private int port;

@BeforeEach
public void setup() {
RestAssured.port = port;
}

@Autowired
private CommandProducingService commandProducingService;

@Autowired
private AssertableMessageConsumer assertableMessageConsumer;

@Test
public void shouldHandleCommand() throws InterruptedException {

assertableMessageConsumer.subscribe(commandProducingService.replyChannel());

String customerId = Long.toString(System.currentTimeMillis());

String messageId = RestAssured.given().when()
.log().all()
.body(new ProduceRequest(customerId))
.contentType(ContentType.JSON)
.post("/send")
.then()
.statusCode(200)
.extract().path("messageId")
;

assertableMessageConsumer
.assertMessageReceived(commandProducingService.replyChannel())
.header(ReplyMessageHeaders.IN_REPLY_TO, messageId)
.header(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.eventuate.tram.examples.basic.commands.inmemory;

import io.eventuate.tram.commands.common.CommandReplyOutcome;
import io.eventuate.tram.commands.common.ReplyMessageHeaders;
import io.eventuate.tram.examples.basic.commands.consumer.CommandConsumerConfiguration;
import io.eventuate.tram.examples.basic.commands.producer.CommandProducerConfiguration;
import io.eventuate.tram.examples.basic.commands.producer.CommandProducingService;
import io.eventuate.tram.examples.basic.commands.producer.ProduceRequest;
import io.eventuate.tram.spring.inmemory.TramInMemoryConfiguration;
import io.eventuate.tram.spring.testing.messaging.consumer.AssertableMessageConsumer;
import io.eventuate.tram.spring.testing.messaging.consumer.AssertableMessageConsumerConfiguration;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;


@SpringBootTest(classes = CommandConsumerInMemoryTest.Config.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {"command.commandChannel=command-${random.value}",
"command.replyChannel=reply-${random.value}"})
public class CommandConsumerInMemoryTest {

@Configuration
@EnableAutoConfiguration
@Import({
CommandProducerConfiguration.class,
CommandConsumerConfiguration.class,
TramInMemoryConfiguration.class,
AssertableMessageConsumerConfiguration.class
})
static class Config {
}

@LocalServerPort
private int port;

@BeforeEach
public void setup() {
RestAssured.port = port;
}


@Autowired
private AssertableMessageConsumer assertableMessageConsumer;

@Autowired
private CommandProducingService commandProducingService;

@Test
public void shouldHandleCommand() throws InterruptedException {
assertableMessageConsumer.subscribe(commandProducingService.replyChannel());

String customerId = Long.toString(System.currentTimeMillis());

String messageId = RestAssured.given().when()
.log().all()
.body(new ProduceRequest(customerId))
.contentType(ContentType.JSON)
.post("/send")
.then()
.statusCode(200)
.extract().path("messageId")
;

assertableMessageConsumer
.assertMessageReceived(commandProducingService.replyChannel())
.header(ReplyMessageHeaders.IN_REPLY_TO, messageId)
.header(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name());

}
}
29 changes: 29 additions & 0 deletions eventuate-tram-examples-basic-command-producer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
dependencies {
implementation project(":eventuate-tram-examples-basic-command-common")
implementation "io.eventuate.tram.core:eventuate-tram-spring-commands-starter"
implementation "io.eventuate.tram.core:eventuate-tram-spring-messaging-starter"
if (messageBroker == 'kafka') {
implementation "io.eventuate.tram.core:eventuate-tram-spring-jdbc-$messageBroker"
} else {
implementation "io.eventuate.tram.core:eventuate-tram-jdbc-$messageBroker"
}

implementation "org.springframework.boot:spring-boot-starter-web"

implementation "junit:junit:4.12"

testImplementation "org.springframework.boot:spring-boot-starter-test"
testImplementation "io.eventuate.tram.core:eventuate-tram-spring-testing-support-outbox-commands:$eventuateTramVersion" // FIXME
testImplementation "io.eventuate.tram.core:eventuate-tram-spring-testing-support-messaging:$eventuateTramVersion" // FIXME


testImplementation "io.eventuate.util:eventuate-util-test"
testImplementation "io.eventuate.tram.core:eventuate-tram-spring-in-memory"
testImplementation "org.springframework.boot:spring-boot-starter-test"
testImplementation "io.rest-assured:spring-mock-mvc"

}

test {
useJUnitPlatform()
}
Loading

0 comments on commit 7113edf

Please sign in to comment.