Skip to content

Commit

Permalink
fix: integration flow is sync for messages from kafka (#127)
Browse files Browse the repository at this point in the history
* refactor: created sync gateway

* refactor: added send message exception

* refactor: moved send messages form kafka to sync integration flows

* refactor: added exceptions throwing for telegram api errors

* refactor: controller use sync integration flow

* chore: updated version to 1.3.6
  • Loading branch information
Dankoy authored Aug 23, 2024
1 parent a0d7222 commit 8ef1b04
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 78 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

ext {
set("PROJECT_VERSION", "1.3.5")
set("PROJECT_VERSION", "1.3.6")
}

// doesn't work in build.gradle in buildSrc project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import ru.dankoy.telegrambot.core.domain.message.ChannelSubscriptionMessage;
import ru.dankoy.telegrambot.core.domain.message.CommunitySubscriptionMessage;
import ru.dankoy.telegrambot.core.domain.message.CoubMessage;
import ru.dankoy.telegrambot.core.domain.message.TagSubscriptionMessage;
import ru.dankoy.telegrambot.core.domain.subscription.channel.ChannelSubscription;
import ru.dankoy.telegrambot.core.domain.subscription.community.CommunitySubscription;
import ru.dankoy.telegrambot.core.domain.subscription.tag.TagSubscription;
Expand Down Expand Up @@ -56,11 +52,6 @@ public MessageChannel inputMessageChannel() {
return new PublishSubscribeChannel(executor());
}

@Bean
public MessageChannel subscriptionMessagesChannel() {
return new PublishSubscribeChannel(executor());
}

@Bean
public MessageChannel mySubscriptionsChannel() {
return new PublishSubscribeChannel(executor());
Expand Down Expand Up @@ -141,21 +132,6 @@ public MessageChannel channelSubscriptionSuccessChannel() {
return new PublishSubscribeChannel(executor());
}

@Bean
public MessageChannel communitySubscriptionSendChannel() {
return new PublishSubscribeChannel(executor());
}

@Bean
public MessageChannel tagSubscriptionSendChannel() {
return new PublishSubscribeChannel(executor());
}

@Bean
public MessageChannel channelSubscriptionSendChannel() {
return new PublishSubscribeChannel(executor());
}

@Bean
public ThreadPoolTaskExecutor executor() {
// Make pub-sub channels async. Otherwise all work happens in caller thread.
Expand Down Expand Up @@ -418,48 +394,4 @@ public IntegrationFlow ordersFlow(
.channel(sendMessageChannel())
.get();
}

@Bean
public IntegrationFlow subscriptionMessagesFlow() {

return IntegrationFlow.from(subscriptionMessagesChannel())
.<CoubMessage, Class<?>>route(
CoubMessage::getClass,
m ->
m.channelMapping(
CommunitySubscriptionMessage.class, "communitySubscriptionSendChannel")
.channelMapping(TagSubscriptionMessage.class, "tagSubscriptionSendChannel")
.channelMapping(
ChannelSubscriptionMessage.class, "channelSubscriptionSendChannel"))
.get();
}

@Bean
public IntegrationFlow communitySubscriptionSendMessageFlow(
ReplyCreatorService replyCreatorService) {

return IntegrationFlow.from(communitySubscriptionSendChannel())
.handle(replyCreatorService, "createCommunitySubscriptionMessage")
.channel(sendMessageChannel())
.get();
}

@Bean
public IntegrationFlow tagSubscriptionSendMessageFlow(ReplyCreatorService replyCreatorService) {

return IntegrationFlow.from(tagSubscriptionSendChannel())
.handle(replyCreatorService, "createTagSubscriptionMessage")
.channel(sendMessageChannel())
.get();
}

@Bean
public IntegrationFlow channelSubscriptionSendMessageFlow(
ReplyCreatorService replyCreatorService) {

return IntegrationFlow.from(channelSubscriptionSendChannel())
.handle(replyCreatorService, "createChannelSubscriptionMessage")
.channel(sendMessageChannel())
.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package ru.dankoy.telegrambot.config.integration;

import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import java.time.Duration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.handler.advice.RateLimiterRequestHandlerAdvice;
import org.springframework.messaging.MessageChannel;
import ru.dankoy.telegrambot.core.domain.message.ChannelSubscriptionMessage;
import ru.dankoy.telegrambot.core.domain.message.CommunitySubscriptionMessage;
import ru.dankoy.telegrambot.core.domain.message.CoubMessage;
import ru.dankoy.telegrambot.core.domain.message.TagSubscriptionMessage;
import ru.dankoy.telegrambot.core.service.bot.TelegramBot;
import ru.dankoy.telegrambot.core.service.reply.ReplyCreatorService;

/**
* @author dankoy
* <p>Should work in sync mode. It was made to fix bug <a
* href="https://github.com/Dankoy/jforwarder/issues/126">...</a>
*/
@Slf4j
@Configuration
public class FlowSyncConfig {

@Bean
public MessageChannel subscriptionMessagesChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel sendMessageDirectChannel() {
// this channel send messages as soon they created.
// With queue integration flow waited 5 seconds every time before handle message.
return new DirectChannel();
}

@Bean
public MessageChannel communitySubscriptionSendDirectChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel tagSubscriptionSendDirectChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel channelSubscriptionSendDirectChannel() {
return new DirectChannel();
}

// rate limiter for one message in 2 second
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestOneInTwoSecondsHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(
RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(2))
.limitForPeriod(1)
.build());
}

@Bean
public IntegrationFlow sendMessageSyncFlow(TelegramBot telegramBot) {
return IntegrationFlow.from(sendMessageDirectChannel())
.handle(
telegramBot,
"sendMessage",
c -> c.advice(rateLimiterRequestOneInTwoSecondsHandlerAdvice()))
.get();
}

@Bean
public IntegrationFlow subscriptionMessagesFlow() {

return IntegrationFlow.from(subscriptionMessagesChannel())
.<CoubMessage, Class<?>>route(
CoubMessage::getClass,
m ->
m.channelMapping(
CommunitySubscriptionMessage.class,
"communitySubscriptionSendDirectChannel")
.channelMapping(
TagSubscriptionMessage.class, "tagSubscriptionSendDirectChannel")
.channelMapping(
ChannelSubscriptionMessage.class, "channelSubscriptionSendDirectChannel"))
.get();
}

@Bean
public IntegrationFlow communitySubscriptionSendMessageFlow(
ReplyCreatorService replyCreatorService) {

return IntegrationFlow.from(communitySubscriptionSendDirectChannel())
.handle(replyCreatorService, "createCommunitySubscriptionMessage")
.channel(sendMessageDirectChannel())
.get();
}

@Bean
public IntegrationFlow tagSubscriptionSendMessageFlow(ReplyCreatorService replyCreatorService) {

return IntegrationFlow.from(tagSubscriptionSendDirectChannel())
.handle(replyCreatorService, "createTagSubscriptionMessage")
.channel(sendMessageDirectChannel())
.get();
}

@Bean
public IntegrationFlow channelSubscriptionSendMessageFlow(
ReplyCreatorService replyCreatorService) {

return IntegrationFlow.from(channelSubscriptionSendDirectChannel())
.handle(replyCreatorService, "createChannelSubscriptionMessage")
.channel(sendMessageDirectChannel())
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@
import ru.dankoy.telegrambot.core.domain.message.ChannelSubscriptionMessage;
import ru.dankoy.telegrambot.core.domain.message.CommunitySubscriptionMessage;
import ru.dankoy.telegrambot.core.domain.message.TagSubscriptionMessage;
import ru.dankoy.telegrambot.core.gateway.BotMessageGateway;
import ru.dankoy.telegrambot.core.gateway.BotMessageSyncGateway;

@RequiredArgsConstructor
@RestController
public class SubscriptionController {

private final BotMessageGateway botMessageGateway;
private final BotMessageSyncGateway botMessageSyncGateway;

@PostMapping("/api/v1/community_message")
public void sendMessage(@RequestBody CommunitySubscriptionMessage message) {
botMessageGateway.process(message);
botMessageSyncGateway.process(message);
}

@PostMapping("/api/v1/tag_message")
public void sendMessage(@RequestBody TagSubscriptionMessage message) {
botMessageGateway.process(message);
botMessageSyncGateway.process(message);
}

@PostMapping("/api/v1/channel_message")
public void sendMessage(@RequestBody ChannelSubscriptionMessage message) {
botMessageGateway.process(message);
botMessageSyncGateway.process(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package ru.dankoy.telegrambot.core.exceptions;

public class BotSendMessageException extends BotException {

public BotSendMessageException(String message) {
super(message);
}

public BotSendMessageException(String message, Exception e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.telegram.telegrambots.meta.api.objects.Message;
import ru.dankoy.telegrambot.core.domain.message.CoubMessage;

@MessagingGateway(errorChannel = "errorChannel")
public interface BotMessageGateway {

@Gateway(requestChannel = "inputMessageChannel")
void process(Message telegramMessage);

@Gateway(requestChannel = "subscriptionMessagesChannel")
void process(CoubMessage communitySubscriptionMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ru.dankoy.telegrambot.core.gateway;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import ru.dankoy.telegrambot.core.domain.message.CoubMessage;

// To make sync gateway all their channels must be sync (direct channel), not executors, also don't
// include errorChannel name here

@MessagingGateway
public interface BotMessageSyncGateway {

@Gateway(requestChannel = "subscriptionMessagesChannel")
void process(CoubMessage communitySubscriptionMessage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.telegram.telegrambots.meta.exceptions.TelegramApiRequestException;
import ru.dankoy.telegrambot.config.bot.configuration.botflow.BotConfiguration;
import ru.dankoy.telegrambot.core.exceptions.BotException;
import ru.dankoy.telegrambot.core.exceptions.BotSendMessageException;
import ru.dankoy.telegrambot.core.gateway.BotMessageGateway;
import ru.dankoy.telegrambot.core.service.chat.TelegramChatService;

Expand Down Expand Up @@ -149,9 +150,11 @@ private void send(SendMessage sendMessage) {
} else {
// some other exception in api
log.error("Something went wrong: {}", e.getMessage());
throw new BotSendMessageException(e.getMessage(), e);
}
} catch (TelegramApiException e) {
log.error("Error sending message - {}", e.getMessage());
throw new BotSendMessageException(e.getMessage(), e);
}
}
}

0 comments on commit 8ef1b04

Please sign in to comment.