Skip to content

Commit

Permalink
Process correct order of sending uplink msgs to Cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
AndriiLandiak committed Feb 23, 2024
1 parent a8c4160 commit 2e3b8af
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -350,13 +351,16 @@ private boolean sendUplinkMsgsPack(List<UplinkMsg> uplinkMsgsPack) throws Interr
try {
int attempt = 1;
boolean success;
LinkedBlockingQueue<UplinkMsg> orderedPendingMsgsQueue = new LinkedBlockingQueue<>();
pendingMsgsMap.clear();
uplinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getUplinkMsgId(), msg));
uplinkMsgsPack.forEach(msg -> {
pendingMsgsMap.put(msg.getUplinkMsgId(), msg);
orderedPendingMsgsQueue.add(msg);
});
do {
log.trace("[{}] uplink msg(s) are going to be send.", pendingMsgsMap.values().size());
latch = new CountDownLatch(pendingMsgsMap.values().size());
List<UplinkMsg> copy = new ArrayList<>(pendingMsgsMap.values());
for (UplinkMsg uplinkMsg : copy) {
for (UplinkMsg uplinkMsg : orderedPendingMsgsQueue) {
if (edgeRpcClient.getServerMaxInboundMessageSize() != 0 && uplinkMsg.getSerializedSize() > edgeRpcClient.getServerMaxInboundMessageSize()) {
log.error("Uplink msg size [{}] exceeds server max inbound message size [{}]. Skipping this message. " +
"Please increase value of EDGES_RPC_MAX_INBOUND_MESSAGE_SIZE env variable on the server and restart it." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ protected String getMsgSourceKey() {
}

public UplinkMsg convertTelemetryEventToUplink(TenantId tenantId, CloudEvent cloudEvent) {
log.trace("Executing convertTelemetryEventToUplink, cloudEvent [{}]", cloudEvent);
EntityType entityType = EntityType.valueOf(cloudEvent.getType().name());
EntityDataProto entityDataProto = convertTelemetryEventToEntityDataProto(
tenantId, entityType, cloudEvent.getEntityId(),
Expand Down
4 changes: 4 additions & 0 deletions application/src/main/resources/tb-edge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ ui:
# Database telemetry parameters
database:
ts_max_intervals: "${DATABASE_TS_MAX_INTERVALS:700}" # Max number of DB queries generated by a single API call to fetch telemetry records
ts:
type: "${DATABASE_TS_TYPE:sql}" # sql or timescale (for hybrid mode, DATABASE_TS_TYPE value should be timescale)
ts_latest:
type: "${DATABASE_TS_LATEST_TYPE:sql}" # sql or timescale (for hybrid mode, DATABASE_TS_TYPE value should be timescale)

# SQL configuration parameters
sql:
Expand Down

0 comments on commit 2e3b8af

Please sign in to comment.