From a8c4160e7d2571b97ebc991503fb8f5b5d54a0dd Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Wed, 21 Feb 2024 14:56:43 +0200 Subject: [PATCH] Add logic from Edge side to handle rate limit violation from cloud --- .../server/service/cloud/CloudManagerService.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java b/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java index 72e9301b5e..ba1ad83203 100644 --- a/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java @@ -105,6 +105,7 @@ public class CloudManagerService { private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; private static final String QUEUE_SEQ_ID_OFFSET_ATTR_KEY = "queueSeqIdOffset"; + private static final String RATE_LIMIT_REACHED = "Rate limit reached"; @Value("${cloud.routingKey}") private String routingKey; @@ -206,6 +207,7 @@ public class CloudManagerService { private ScheduledExecutorService reconnectScheduler; private ScheduledFuture scheduledFuture; private ScheduledExecutorService shutdownExecutor; + private volatile boolean isRateLimitViolated = false; private volatile boolean initialized; private volatile boolean syncInProgress = false; @@ -378,6 +380,14 @@ private boolean sendUplinkMsgsPack(List uplinkMsgsPack) throws Interr log.error("Error during sleep between batches", e); } } + if (initialized && !success && isRateLimitViolated) { + isRateLimitViolated = false; + try { + TimeUnit.SECONDS.sleep(60); + } catch (InterruptedException e) { + log.error("Error during sleep on rate limit violation", e); + } + } attempt++; if (attempt > MAX_UPLINK_ATTEMPTS) { log.warn("Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}", @@ -516,6 +526,9 @@ private void onUplinkResponse(UplinkResponseMsg msg) { if (msg.getSuccess()) { pendingMsgsMap.remove(msg.getUplinkMsgId()); log.debug("[{}] Msg has been processed successfully! {}", routingKey, msg); + } else if (msg.getErrorMsg().contains(RATE_LIMIT_REACHED)) { + log.warn("[{}] Msg processing failed! {}", routingKey, RATE_LIMIT_REACHED); + isRateLimitViolated = true; } else { log.error("[{}] Msg processing failed! Error msg: {}", routingKey, msg.getErrorMsg()); }