diff --git a/build.gradle b/build.gradle index 503a72fa..5175e625 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } ext { - set("PROJECT_VERSION", "1.3.5") + set("PROJECT_VERSION", "1.3.6") } // doesn't work in build.gradle in buildSrc project diff --git a/telegram_bot/src/main/java/ru/dankoy/telegrambot/config/integration/FlowConfig.java b/telegram_bot/src/main/java/ru/dankoy/telegrambot/config/integration/FlowConfig.java index 0f7d3eec..42747dfc 100644 --- a/telegram_bot/src/main/java/ru/dankoy/telegrambot/config/integration/FlowConfig.java +++ b/telegram_bot/src/main/java/ru/dankoy/telegrambot/config/integration/FlowConfig.java @@ -15,10 +15,6 @@ import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.MessagingException; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import ru.dankoy.telegrambot.core.domain.message.ChannelSubscriptionMessage; -import ru.dankoy.telegrambot.core.domain.message.CommunitySubscriptionMessage; -import ru.dankoy.telegrambot.core.domain.message.CoubMessage; -import ru.dankoy.telegrambot.core.domain.message.TagSubscriptionMessage; import ru.dankoy.telegrambot.core.domain.subscription.channel.ChannelSubscription; import ru.dankoy.telegrambot.core.domain.subscription.community.CommunitySubscription; import ru.dankoy.telegrambot.core.domain.subscription.tag.TagSubscription; @@ -56,11 +52,6 @@ public MessageChannel inputMessageChannel() { return new PublishSubscribeChannel(executor()); } - @Bean - public MessageChannel subscriptionMessagesChannel() { - return new PublishSubscribeChannel(executor()); - } - @Bean public MessageChannel mySubscriptionsChannel() { return new PublishSubscribeChannel(executor()); @@ -141,21 +132,6 @@ public MessageChannel channelSubscriptionSuccessChannel() { return new PublishSubscribeChannel(executor()); } - @Bean - public MessageChannel communitySubscriptionSendChannel() { - return new PublishSubscribeChannel(executor()); - } - - @Bean - public MessageChannel tagSubscriptionSendChannel() { - return new PublishSubscribeChannel(executor()); - } - - @Bean - public MessageChannel channelSubscriptionSendChannel() { - return new PublishSubscribeChannel(executor()); - } - @Bean public ThreadPoolTaskExecutor executor() { // Make pub-sub channels async. Otherwise all work happens in caller thread. @@ -418,48 +394,4 @@ public IntegrationFlow ordersFlow( .channel(sendMessageChannel()) .get(); } - - @Bean - public IntegrationFlow subscriptionMessagesFlow() { - - return IntegrationFlow.from(subscriptionMessagesChannel()) - .>route( - CoubMessage::getClass, - m -> - m.channelMapping( - CommunitySubscriptionMessage.class, "communitySubscriptionSendChannel") - .channelMapping(TagSubscriptionMessage.class, "tagSubscriptionSendChannel") - .channelMapping( - ChannelSubscriptionMessage.class, "channelSubscriptionSendChannel")) - .get(); - } - - @Bean - public IntegrationFlow communitySubscriptionSendMessageFlow( - ReplyCreatorService replyCreatorService) { - - return IntegrationFlow.from(communitySubscriptionSendChannel()) - .handle(replyCreatorService, "createCommunitySubscriptionMessage") - .channel(sendMessageChannel()) - .get(); - } - - @Bean - public IntegrationFlow tagSubscriptionSendMessageFlow(ReplyCreatorService replyCreatorService) { - - return IntegrationFlow.from(tagSubscriptionSendChannel()) - .handle(replyCreatorService, "createTagSubscriptionMessage") - .channel(sendMessageChannel()) - .get(); - } - - @Bean - public IntegrationFlow channelSubscriptionSendMessageFlow( - ReplyCreatorService replyCreatorService) { - - return IntegrationFlow.from(channelSubscriptionSendChannel()) - .handle(replyCreatorService, "createChannelSubscriptionMessage") - .channel(sendMessageChannel()) - .get(); - } } diff --git a/telegram_bot/src/main/java/ru/dankoy/telegrambot/config/integration/FlowSyncConfig.java b/telegram_bot/src/main/java/ru/dankoy/telegrambot/config/integration/FlowSyncConfig.java new file mode 100644 index 00000000..c95ce9d8 --- /dev/null +++ b/telegram_bot/src/main/java/ru/dankoy/telegrambot/config/integration/FlowSyncConfig.java @@ -0,0 +1,120 @@ +package ru.dankoy.telegrambot.config.integration; + +import io.github.resilience4j.ratelimiter.RateLimiterConfig; +import java.time.Duration; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.handler.advice.RateLimiterRequestHandlerAdvice; +import org.springframework.messaging.MessageChannel; +import ru.dankoy.telegrambot.core.domain.message.ChannelSubscriptionMessage; +import ru.dankoy.telegrambot.core.domain.message.CommunitySubscriptionMessage; +import ru.dankoy.telegrambot.core.domain.message.CoubMessage; +import ru.dankoy.telegrambot.core.domain.message.TagSubscriptionMessage; +import ru.dankoy.telegrambot.core.service.bot.TelegramBot; +import ru.dankoy.telegrambot.core.service.reply.ReplyCreatorService; + +/** + * @author dankoy + *

Should work in sync mode. It was made to fix bug ... + */ +@Slf4j +@Configuration +public class FlowSyncConfig { + + @Bean + public MessageChannel subscriptionMessagesChannel() { + return new DirectChannel(); + } + + @Bean + public MessageChannel sendMessageDirectChannel() { + // this channel send messages as soon they created. + // With queue integration flow waited 5 seconds every time before handle message. + return new DirectChannel(); + } + + @Bean + public MessageChannel communitySubscriptionSendDirectChannel() { + return new DirectChannel(); + } + + @Bean + public MessageChannel tagSubscriptionSendDirectChannel() { + return new DirectChannel(); + } + + @Bean + public MessageChannel channelSubscriptionSendDirectChannel() { + return new DirectChannel(); + } + + // rate limiter for one message in 2 second + @Bean + public RateLimiterRequestHandlerAdvice rateLimiterRequestOneInTwoSecondsHandlerAdvice() { + return new RateLimiterRequestHandlerAdvice( + RateLimiterConfig.custom() + .limitRefreshPeriod(Duration.ofSeconds(2)) + .limitForPeriod(1) + .build()); + } + + @Bean + public IntegrationFlow sendMessageSyncFlow(TelegramBot telegramBot) { + return IntegrationFlow.from(sendMessageDirectChannel()) + .handle( + telegramBot, + "sendMessage", + c -> c.advice(rateLimiterRequestOneInTwoSecondsHandlerAdvice())) + .get(); + } + + @Bean + public IntegrationFlow subscriptionMessagesFlow() { + + return IntegrationFlow.from(subscriptionMessagesChannel()) + .>route( + CoubMessage::getClass, + m -> + m.channelMapping( + CommunitySubscriptionMessage.class, + "communitySubscriptionSendDirectChannel") + .channelMapping( + TagSubscriptionMessage.class, "tagSubscriptionSendDirectChannel") + .channelMapping( + ChannelSubscriptionMessage.class, "channelSubscriptionSendDirectChannel")) + .get(); + } + + @Bean + public IntegrationFlow communitySubscriptionSendMessageFlow( + ReplyCreatorService replyCreatorService) { + + return IntegrationFlow.from(communitySubscriptionSendDirectChannel()) + .handle(replyCreatorService, "createCommunitySubscriptionMessage") + .channel(sendMessageDirectChannel()) + .get(); + } + + @Bean + public IntegrationFlow tagSubscriptionSendMessageFlow(ReplyCreatorService replyCreatorService) { + + return IntegrationFlow.from(tagSubscriptionSendDirectChannel()) + .handle(replyCreatorService, "createTagSubscriptionMessage") + .channel(sendMessageDirectChannel()) + .get(); + } + + @Bean + public IntegrationFlow channelSubscriptionSendMessageFlow( + ReplyCreatorService replyCreatorService) { + + return IntegrationFlow.from(channelSubscriptionSendDirectChannel()) + .handle(replyCreatorService, "createChannelSubscriptionMessage") + .channel(sendMessageDirectChannel()) + .get(); + } +} diff --git a/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/controller/SubscriptionController.java b/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/controller/SubscriptionController.java index 81dae79f..9c7d0ef6 100644 --- a/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/controller/SubscriptionController.java +++ b/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/controller/SubscriptionController.java @@ -7,26 +7,26 @@ import ru.dankoy.telegrambot.core.domain.message.ChannelSubscriptionMessage; import ru.dankoy.telegrambot.core.domain.message.CommunitySubscriptionMessage; import ru.dankoy.telegrambot.core.domain.message.TagSubscriptionMessage; -import ru.dankoy.telegrambot.core.gateway.BotMessageGateway; +import ru.dankoy.telegrambot.core.gateway.BotMessageSyncGateway; @RequiredArgsConstructor @RestController public class SubscriptionController { - private final BotMessageGateway botMessageGateway; + private final BotMessageSyncGateway botMessageSyncGateway; @PostMapping("/api/v1/community_message") public void sendMessage(@RequestBody CommunitySubscriptionMessage message) { - botMessageGateway.process(message); + botMessageSyncGateway.process(message); } @PostMapping("/api/v1/tag_message") public void sendMessage(@RequestBody TagSubscriptionMessage message) { - botMessageGateway.process(message); + botMessageSyncGateway.process(message); } @PostMapping("/api/v1/channel_message") public void sendMessage(@RequestBody ChannelSubscriptionMessage message) { - botMessageGateway.process(message); + botMessageSyncGateway.process(message); } } diff --git a/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/exceptions/BotSendMessageException.java b/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/exceptions/BotSendMessageException.java new file mode 100644 index 00000000..d65d3ae3 --- /dev/null +++ b/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/exceptions/BotSendMessageException.java @@ -0,0 +1,12 @@ +package ru.dankoy.telegrambot.core.exceptions; + +public class BotSendMessageException extends BotException { + + public BotSendMessageException(String message) { + super(message); + } + + public BotSendMessageException(String message, Exception e) { + super(message, e); + } +} diff --git a/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/gateway/BotMessageGateway.java b/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/gateway/BotMessageGateway.java index 77f0fecc..b67e01b4 100644 --- a/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/gateway/BotMessageGateway.java +++ b/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/gateway/BotMessageGateway.java @@ -3,14 +3,10 @@ import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import org.telegram.telegrambots.meta.api.objects.Message; -import ru.dankoy.telegrambot.core.domain.message.CoubMessage; @MessagingGateway(errorChannel = "errorChannel") public interface BotMessageGateway { @Gateway(requestChannel = "inputMessageChannel") void process(Message telegramMessage); - - @Gateway(requestChannel = "subscriptionMessagesChannel") - void process(CoubMessage communitySubscriptionMessage); } diff --git a/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/gateway/BotMessageSyncGateway.java b/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/gateway/BotMessageSyncGateway.java new file mode 100644 index 00000000..ff213812 --- /dev/null +++ b/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/gateway/BotMessageSyncGateway.java @@ -0,0 +1,15 @@ +package ru.dankoy.telegrambot.core.gateway; + +import org.springframework.integration.annotation.Gateway; +import org.springframework.integration.annotation.MessagingGateway; +import ru.dankoy.telegrambot.core.domain.message.CoubMessage; + +// To make sync gateway all their channels must be sync (direct channel), not executors, also don't +// include errorChannel name here + +@MessagingGateway +public interface BotMessageSyncGateway { + + @Gateway(requestChannel = "subscriptionMessagesChannel") + void process(CoubMessage communitySubscriptionMessage); +} diff --git a/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/service/bot/TelegramBotIntegrationFlowImpl.java b/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/service/bot/TelegramBotIntegrationFlowImpl.java index 0de935ca..38f85e15 100644 --- a/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/service/bot/TelegramBotIntegrationFlowImpl.java +++ b/telegram_bot/src/main/java/ru/dankoy/telegrambot/core/service/bot/TelegramBotIntegrationFlowImpl.java @@ -18,6 +18,7 @@ import org.telegram.telegrambots.meta.exceptions.TelegramApiRequestException; import ru.dankoy.telegrambot.config.bot.configuration.botflow.BotConfiguration; import ru.dankoy.telegrambot.core.exceptions.BotException; +import ru.dankoy.telegrambot.core.exceptions.BotSendMessageException; import ru.dankoy.telegrambot.core.gateway.BotMessageGateway; import ru.dankoy.telegrambot.core.service.chat.TelegramChatService; @@ -149,9 +150,11 @@ private void send(SendMessage sendMessage) { } else { // some other exception in api log.error("Something went wrong: {}", e.getMessage()); + throw new BotSendMessageException(e.getMessage(), e); } } catch (TelegramApiException e) { log.error("Error sending message - {}", e.getMessage()); + throw new BotSendMessageException(e.getMessage(), e); } } }