Build reactive pipelines with spring cloud stream
@Log4j2
@Configuration
class DeskAgent {
@Bean
Supplier<Flux<Passenger>> checkIn() {
Supplier<Passenger> randomPassenger = randomPassenger(names());
return () -> Flux.interval(Duration.ofMillis(345))
.onBackpressureDrop()
.map(aLong -> randomPassenger.get())
.doOnNext(passenger -> log.info("generated: {}", passenger));
}
}
spring.cloud.stream.bindings.checkIn-out-0.destination=processorTransformingApp
#spring.cloud.function.definition=checkIn
@Log4j2
@Configuration
class GateAgent {
@Bean
Function<Flux<Passenger>, Flux<FlyingPassenger>> transfer() {
ThreadLocalRandom random = ThreadLocalRandom.current();
Supplier<Status> randomStatus = () -> random.nextInt(5) == 0 ? Status.PREMIUM : Status.VALUED;
return flux -> flux.map(passenger -> new FlyingPassenger(passenger.getId(),
passenger.getName(),
randomStatus.get()))
.doOnNext(passenger -> log.info("transformed: {}", passenger));
}
}
spring.cloud.stream.bindings.transfer-in-0.destination=processorTransformingApp
spring.cloud.stream.bindings.transfer-in-0.group=processorTransformingApp
spring.cloud.stream.bindings.transfer-out-0.destination=sinkInputApp
#spring.cloud.function.definition=transfer
@Log4j2
@Configuration
class FlyAttendant {
@Bean
Consumer<Flux<FlyingPassenger>> attendee() {
return flux -> flux.subscribe(passenger -> log.info("received: {}", passenger));
}
}
spring.cloud.stream.bindings.attendee-in-0.destination=sinkInputApp
spring.cloud.stream.bindings.attendee-in-0.group=sinkInputApp
#spring.cloud.function.definition=attendee
./mvnw -f rabbitmq docker:build docker:start
./mvnw
java -jar ./source-output-app/target/*.jar &
java -jar ./processor-transforming-app/target/*.jar &
java -jar ./sink-input-app/target/*.jar &
sleep 15s
killall -9 java
./mvnw -f rabbitmq docker:stop docker:remove