Skip to content

Commit

Permalink
slack messages no longer processed multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
retrodaredevil committed Jul 31, 2024
1 parent d36408c commit 0abd828
Showing 1 changed file with 56 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.slack.api.socket_mode.response.AckResponse;
import com.slack.api.socket_mode.response.SocketModeResponse;
import me.retrodaredevil.action.SimpleAction;
import me.retrodaredevil.solarthing.SolarThingConstants;
import me.retrodaredevil.solarthing.chatbot.ChatBotHandler;
import me.retrodaredevil.solarthing.chatbot.Message;
import me.retrodaredevil.solarthing.message.MessageSender;
Expand All @@ -16,6 +17,10 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -34,6 +39,9 @@ public class SlackChatBotAction extends SimpleAction {

private volatile SocketModeClient client = null;

/** This set is synchronized so that in case Slack decides to run on multiple threads we still get thread safety. (I don't know if this is necessary, so this is to be safe)*/
private final Set<EnvelopeIdAndTimestamp> processedMessages = Collections.synchronizedSet(new HashSet<>());

public SlackChatBotAction(String appToken, MessageSender messageSender, Slack slack, ChatBotHandler handler) {
super(false);
requireNonNull(this.appToken = appToken);
Expand Down Expand Up @@ -80,7 +88,8 @@ private void initClient() {
if (this.client != null) {
return;
}
LOGGER.debug("Initializing client!");
// This is a summary log because there's currently a bug where a long-running Slack chatbot will process messages multiple times
LOGGER.info(SolarThingConstants.SUMMARY_MARKER, "Initializing slack client!");
final SocketModeClient client;
try {
client = slack.socketMode(appToken, SocketModeClient.Backend.JavaWebSocket);
Expand All @@ -94,7 +103,17 @@ private void initClient() {
client.addEventsApiEnvelopeListener(eventsApiEnvelope -> {
SocketModeResponse ack = AckResponse.builder().envelopeId(eventsApiEnvelope.getEnvelopeId()).build();
client.sendSocketModeResponse(ack);
handle(eventsApiEnvelope);
LOGGER.debug("Got an event. envelopeId: " + eventsApiEnvelope.getEnvelopeId() + " with retry: " + eventsApiEnvelope.getRetryAttempt() + " and reason: " + eventsApiEnvelope.getRetryReason());
EnvelopeIdAndTimestamp id = new EnvelopeIdAndTimestamp(eventsApiEnvelope.getEnvelopeId(), Instant.now());
if (processedMessages.contains(id)) {
// Making this log message a summary message for now to confirm it works. Consider removing the summary marker in the future once this is confirmed to fix the bug.
LOGGER.info(SolarThingConstants.SUMMARY_MARKER, "Envelope ID: " + eventsApiEnvelope.getEnvelopeId() + " has already been processed!");
} else {
Instant removeIfBefore = Instant.now().minusSeconds(30 * 60);
processedMessages.removeIf(envelopeIdAndTimestamp -> envelopeIdAndTimestamp.firstReceiveInstant.isBefore(removeIfBefore));
processedMessages.add(id);
handle(eventsApiEnvelope);
}
});
client.addWebSocketErrorListener(throwable -> {
LOGGER.error("Got slack connection error", throwable);
Expand Down Expand Up @@ -134,6 +153,17 @@ private void handle(EventsApiEnvelope eventsApiEnvelope) {
if ("message".equals(message.get("type").getAsString()) && message.get("subtype") == null) {
String text = message.get("text").getAsString();
Instant timestamp = epochSecondsToInstant(message.get("ts").getAsBigDecimal());
if (timestamp.isBefore(Instant.now().minusSeconds(15 * 60))) {
LOGGER.debug("We got a message that was from over 15 minutes ago!");
// Messages that we receive now that were sent more than 15 minutes ago should be ignored to prevent spammy messages.
// For instance, the handler that SolarThing uses at the time of writing will send a "Sorry! Try sending 'command' again. It took too long to process".
// Before 2024-07-30, many times multiple messages would be sent. As of 2024-07-30, we are keeping track of the envelope IDs
// so that we don't process messages twice. However, we delete envelope IDs that we already processed after some time, so
// we want to make sure that if a message is from a while ago, we don't process it because we don't know if we already processed it.
// Additionally, there is not really any reason to believe that this code should ever be hit,
// but Slack sure does seem set on making for sure for sure for sure that a message was delivered to us, so this is a safeguard against the Slack gods.
return;
}

String userId = message.get("user").getAsString();
LOGGER.debug("Message raw: " + message);
Expand All @@ -148,4 +178,28 @@ private static Instant epochSecondsToInstant(BigDecimal timestampBigDecimal) {
return Instant.ofEpochMilli(timestampBigDecimal.multiply(new BigDecimal(1000)).longValue())
.plusNanos(nanos);
}

private static final class EnvelopeIdAndTimestamp {
private final String envelopeId;
/** The timestamp of the time an EventsApiEnvelope was received with the id {@link #envelopeId}. This is mere metadata and is not*/
private final Instant firstReceiveInstant;

private EnvelopeIdAndTimestamp(String envelopeId, Instant firstReceiveInstant) {
this.envelopeId = requireNonNull(envelopeId);
this.firstReceiveInstant = requireNonNull(firstReceiveInstant);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EnvelopeIdAndTimestamp that = (EnvelopeIdAndTimestamp) o;
return Objects.equals(envelopeId, that.envelopeId);
}

@Override
public int hashCode() {
return Objects.hashCode(envelopeId);
}
}
}

0 comments on commit 0abd828

Please sign in to comment.