diff --git a/client/src/main/java/me/retrodaredevil/solarthing/actions/chatbot/SlackChatBotAction.java b/client/src/main/java/me/retrodaredevil/solarthing/actions/chatbot/SlackChatBotAction.java index 0181bfaa..6ec48a1d 100644 --- a/client/src/main/java/me/retrodaredevil/solarthing/actions/chatbot/SlackChatBotAction.java +++ b/client/src/main/java/me/retrodaredevil/solarthing/actions/chatbot/SlackChatBotAction.java @@ -7,6 +7,7 @@ import com.slack.api.socket_mode.response.AckResponse; import com.slack.api.socket_mode.response.SocketModeResponse; import me.retrodaredevil.action.SimpleAction; +import me.retrodaredevil.solarthing.SolarThingConstants; import me.retrodaredevil.solarthing.chatbot.ChatBotHandler; import me.retrodaredevil.solarthing.chatbot.Message; import me.retrodaredevil.solarthing.message.MessageSender; @@ -16,6 +17,10 @@ import java.io.IOException; import java.math.BigDecimal; import java.time.Instant; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -34,6 +39,9 @@ public class SlackChatBotAction extends SimpleAction { private volatile SocketModeClient client = null; + /** This set is synchronized so that in case Slack decides to run on multiple threads we still get thread safety. (I don't know if this is necessary, so this is to be safe)*/ + private final Set processedMessages = Collections.synchronizedSet(new HashSet<>()); + public SlackChatBotAction(String appToken, MessageSender messageSender, Slack slack, ChatBotHandler handler) { super(false); requireNonNull(this.appToken = appToken); @@ -80,7 +88,8 @@ private void initClient() { if (this.client != null) { return; } - LOGGER.debug("Initializing client!"); + // This is a summary log because there's currently a bug where a long-running Slack chatbot will process messages multiple times + LOGGER.info(SolarThingConstants.SUMMARY_MARKER, "Initializing slack client!"); final SocketModeClient client; try { client = slack.socketMode(appToken, SocketModeClient.Backend.JavaWebSocket); @@ -94,7 +103,17 @@ private void initClient() { client.addEventsApiEnvelopeListener(eventsApiEnvelope -> { SocketModeResponse ack = AckResponse.builder().envelopeId(eventsApiEnvelope.getEnvelopeId()).build(); client.sendSocketModeResponse(ack); - handle(eventsApiEnvelope); + LOGGER.debug("Got an event. envelopeId: " + eventsApiEnvelope.getEnvelopeId() + " with retry: " + eventsApiEnvelope.getRetryAttempt() + " and reason: " + eventsApiEnvelope.getRetryReason()); + EnvelopeIdAndTimestamp id = new EnvelopeIdAndTimestamp(eventsApiEnvelope.getEnvelopeId(), Instant.now()); + if (processedMessages.contains(id)) { + // Making this log message a summary message for now to confirm it works. Consider removing the summary marker in the future once this is confirmed to fix the bug. + LOGGER.info(SolarThingConstants.SUMMARY_MARKER, "Envelope ID: " + eventsApiEnvelope.getEnvelopeId() + " has already been processed!"); + } else { + Instant removeIfBefore = Instant.now().minusSeconds(30 * 60); + processedMessages.removeIf(envelopeIdAndTimestamp -> envelopeIdAndTimestamp.firstReceiveInstant.isBefore(removeIfBefore)); + processedMessages.add(id); + handle(eventsApiEnvelope); + } }); client.addWebSocketErrorListener(throwable -> { LOGGER.error("Got slack connection error", throwable); @@ -134,6 +153,17 @@ private void handle(EventsApiEnvelope eventsApiEnvelope) { if ("message".equals(message.get("type").getAsString()) && message.get("subtype") == null) { String text = message.get("text").getAsString(); Instant timestamp = epochSecondsToInstant(message.get("ts").getAsBigDecimal()); + if (timestamp.isBefore(Instant.now().minusSeconds(15 * 60))) { + LOGGER.debug("We got a message that was from over 15 minutes ago!"); + // Messages that we receive now that were sent more than 15 minutes ago should be ignored to prevent spammy messages. + // For instance, the handler that SolarThing uses at the time of writing will send a "Sorry! Try sending 'command' again. It took too long to process". + // Before 2024-07-30, many times multiple messages would be sent. As of 2024-07-30, we are keeping track of the envelope IDs + // so that we don't process messages twice. However, we delete envelope IDs that we already processed after some time, so + // we want to make sure that if a message is from a while ago, we don't process it because we don't know if we already processed it. + // Additionally, there is not really any reason to believe that this code should ever be hit, + // but Slack sure does seem set on making for sure for sure for sure that a message was delivered to us, so this is a safeguard against the Slack gods. + return; + } String userId = message.get("user").getAsString(); LOGGER.debug("Message raw: " + message); @@ -148,4 +178,28 @@ private static Instant epochSecondsToInstant(BigDecimal timestampBigDecimal) { return Instant.ofEpochMilli(timestampBigDecimal.multiply(new BigDecimal(1000)).longValue()) .plusNanos(nanos); } + + private static final class EnvelopeIdAndTimestamp { + private final String envelopeId; + /** The timestamp of the time an EventsApiEnvelope was received with the id {@link #envelopeId}. This is mere metadata and is not*/ + private final Instant firstReceiveInstant; + + private EnvelopeIdAndTimestamp(String envelopeId, Instant firstReceiveInstant) { + this.envelopeId = requireNonNull(envelopeId); + this.firstReceiveInstant = requireNonNull(firstReceiveInstant); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + EnvelopeIdAndTimestamp that = (EnvelopeIdAndTimestamp) o; + return Objects.equals(envelopeId, that.envelopeId); + } + + @Override + public int hashCode() { + return Objects.hashCode(envelopeId); + } + } }