Skip to content

Commit

Permalink
Add logic from Edge side to handle rate limit violation from cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
AndriiLandiak committed Feb 21, 2024
1 parent f776f9f commit a8c4160
Showing 1 changed file with 13 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class CloudManagerService {

private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
private static final String QUEUE_SEQ_ID_OFFSET_ATTR_KEY = "queueSeqIdOffset";
private static final String RATE_LIMIT_REACHED = "Rate limit reached";

@Value("${cloud.routingKey}")
private String routingKey;
Expand Down Expand Up @@ -206,6 +207,7 @@ public class CloudManagerService {
private ScheduledExecutorService reconnectScheduler;
private ScheduledFuture<?> scheduledFuture;
private ScheduledExecutorService shutdownExecutor;
private volatile boolean isRateLimitViolated = false;
private volatile boolean initialized;
private volatile boolean syncInProgress = false;

Expand Down Expand Up @@ -378,6 +380,14 @@ private boolean sendUplinkMsgsPack(List<UplinkMsg> uplinkMsgsPack) throws Interr
log.error("Error during sleep between batches", e);
}
}
if (initialized && !success && isRateLimitViolated) {
isRateLimitViolated = false;
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
log.error("Error during sleep on rate limit violation", e);
}
}
attempt++;
if (attempt > MAX_UPLINK_ATTEMPTS) {
log.warn("Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}",
Expand Down Expand Up @@ -516,6 +526,9 @@ private void onUplinkResponse(UplinkResponseMsg msg) {
if (msg.getSuccess()) {
pendingMsgsMap.remove(msg.getUplinkMsgId());
log.debug("[{}] Msg has been processed successfully! {}", routingKey, msg);
} else if (msg.getErrorMsg().contains(RATE_LIMIT_REACHED)) {
log.warn("[{}] Msg processing failed! {}", routingKey, RATE_LIMIT_REACHED);
isRateLimitViolated = true;
} else {
log.error("[{}] Msg processing failed! Error msg: {}", routingKey, msg.getErrorMsg());
}
Expand Down

0 comments on commit a8c4160

Please sign in to comment.