diff --git a/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java b/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java index ba1ad832032..2d2a7e4e44b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java @@ -90,6 +90,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -350,13 +351,16 @@ private boolean sendUplinkMsgsPack(List uplinkMsgsPack) throws Interr try { int attempt = 1; boolean success; + LinkedBlockingQueue orderedPendingMsgsQueue = new LinkedBlockingQueue<>(); pendingMsgsMap.clear(); - uplinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getUplinkMsgId(), msg)); + uplinkMsgsPack.forEach(msg -> { + pendingMsgsMap.put(msg.getUplinkMsgId(), msg); + orderedPendingMsgsQueue.add(msg); + }); do { log.trace("[{}] uplink msg(s) are going to be send.", pendingMsgsMap.values().size()); latch = new CountDownLatch(pendingMsgsMap.values().size()); - List copy = new ArrayList<>(pendingMsgsMap.values()); - for (UplinkMsg uplinkMsg : copy) { + for (UplinkMsg uplinkMsg : orderedPendingMsgsQueue) { if (edgeRpcClient.getServerMaxInboundMessageSize() != 0 && uplinkMsg.getSerializedSize() > edgeRpcClient.getServerMaxInboundMessageSize()) { log.error("Uplink msg size [{}] exceeds server max inbound message size [{}]. Skipping this message. " + "Please increase value of EDGES_RPC_MAX_INBOUND_MESSAGE_SIZE env variable on the server and restart it." + diff --git a/application/src/main/java/org/thingsboard/server/service/cloud/rpc/processor/TelemetryCloudProcessor.java b/application/src/main/java/org/thingsboard/server/service/cloud/rpc/processor/TelemetryCloudProcessor.java index a834c1bced3..f83d831a0b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cloud/rpc/processor/TelemetryCloudProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/cloud/rpc/processor/TelemetryCloudProcessor.java @@ -42,6 +42,7 @@ protected String getMsgSourceKey() { } public UplinkMsg convertTelemetryEventToUplink(TenantId tenantId, CloudEvent cloudEvent) { + log.trace("Executing convertTelemetryEventToUplink, cloudEvent [{}]", cloudEvent); EntityType entityType = EntityType.valueOf(cloudEvent.getType().name()); EntityDataProto entityDataProto = convertTelemetryEventToEntityDataProto( tenantId, entityType, cloudEvent.getEntityId(), diff --git a/application/src/main/resources/tb-edge.yml b/application/src/main/resources/tb-edge.yml index cc8bc8b4d5f..0e7e4da108e 100644 --- a/application/src/main/resources/tb-edge.yml +++ b/application/src/main/resources/tb-edge.yml @@ -219,6 +219,10 @@ ui: # Database telemetry parameters database: ts_max_intervals: "${DATABASE_TS_MAX_INTERVALS:700}" # Max number of DB queries generated by a single API call to fetch telemetry records + ts: + type: "${DATABASE_TS_TYPE:sql}" # sql or timescale (for hybrid mode, DATABASE_TS_TYPE value should be timescale) + ts_latest: + type: "${DATABASE_TS_LATEST_TYPE:sql}" # sql or timescale (for hybrid mode, DATABASE_TS_TYPE value should be timescale) # SQL configuration parameters sql: