From db947eccd63e4b9d498d37c213c95d9f73a2124c Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 4 Jul 2023 15:21:19 +0300 Subject: [PATCH] Fixed incorrect update of seqId paramater - as a result incorrect fetch of data from DB --- .../service/cloud/CloudManagerService.java | 18 +++---- .../dao/sql/cloud/JpaBaseCloudEventDao.java | 1 + docker-edge/tb-edge/conf/tb-edge.conf | 2 + .../server/msa/edge/TelemetryClientTest.java | 52 +++++++++++++++++++ 4 files changed, 64 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java b/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java index 004ab1bfca..1374350481 100644 --- a/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java @@ -254,8 +254,8 @@ private void processHandleMessages() { 0, null, new SortOrder("seqId"), queueStartTs, System.currentTimeMillis()); if (newCloudEventsAvailable(seqIdOffset, pageLink)) { PageData pageData; - UUID idOffset = null; boolean success = true; + CloudEvent latestCloudEvent = null; do { pageData = cloudEventService.findCloudEvents(tenantId, seqIdOffset, null, pageLink); if (initialized) { @@ -271,21 +271,21 @@ private void processHandleMessages() { } else { success = true; } - CloudEvent latestCloudEvent = pageData.getData().get(pageData.getData().size() - 1); - idOffset = latestCloudEvent.getUuidId(); - seqIdOffset = latestCloudEvent.getSeqId(); + if (!pageData.getData().isEmpty()) { + latestCloudEvent = pageData.getData().get(pageData.getData().size() - 1); + } if (success) { pageLink = pageLink.nextPageLink(); } } } while (initialized && (!success || pageData.hasNext())); - if (idOffset != null) { + if (latestCloudEvent != null) { try { - Long newStartTs = Uuids.unixTimestamp(idOffset); - updateQueueStartTsSeqIdOffset(newStartTs, seqIdOffset); - log.debug("Queue offset was updated [{}][{}][{}]", idOffset, newStartTs, seqIdOffset); + Long newStartTs = Uuids.unixTimestamp(latestCloudEvent.getUuidId()); + updateQueueStartTsSeqIdOffset(newStartTs, latestCloudEvent.getSeqId()); + log.debug("Queue offset was updated [{}][{}][{}]", latestCloudEvent.getUuidId(), newStartTs, latestCloudEvent.getSeqId()); } catch (Exception e) { - log.error("[{}] Failed to update queue offset [{}]", idOffset, e); + log.error("Failed to update queue offset [{}]", latestCloudEvent); } } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cloud/JpaBaseCloudEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cloud/JpaBaseCloudEventDao.java index 75da5d69cb..97279028ec 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cloud/JpaBaseCloudEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cloud/JpaBaseCloudEventDao.java @@ -162,6 +162,7 @@ private ListenableFuture addToQueue(CloudEventEntity entity) { @Override public PageData findCloudEvents(UUID tenantId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink) { + log.trace("Executing findCloudEvents [{}], [{}], [{}], [{}]", tenantId, seqIdStart, seqIdEnd, pageLink); return DaoUtil.toPageData( cloudEventRepository .findEventsByTenantId( diff --git a/docker-edge/tb-edge/conf/tb-edge.conf b/docker-edge/tb-edge/conf/tb-edge.conf index f1acbba941..d926839949 100644 --- a/docker-edge/tb-edge/conf/tb-edge.conf +++ b/docker-edge/tb-edge/conf/tb-edge.conf @@ -40,3 +40,5 @@ export LOADER_PATH=/usr/share/tb-edge/conf,/usr/share/tb-edge/extensions # export SPRING_DATASOURCE_URL=jdbc:postgresql://localhost:5432/tb_edge # export SPRING_DATASOURCE_USERNAME=postgres # export SPRING_DATASOURCE_PASSWORD=postgres + +export CLOUD_RPC_STORAGE_MAX_READ_RECORDS_COUNT=10 \ No newline at end of file diff --git a/msa/edge-black-box-tests/src/test/java/org/thingsboard/server/msa/edge/TelemetryClientTest.java b/msa/edge-black-box-tests/src/test/java/org/thingsboard/server/msa/edge/TelemetryClientTest.java index 17aebd16a7..4f8cbfd150 100644 --- a/msa/edge-black-box-tests/src/test/java/org/thingsboard/server/msa/edge/TelemetryClientTest.java +++ b/msa/edge-black-box-tests/src/test/java/org/thingsboard/server/msa/edge/TelemetryClientTest.java @@ -32,11 +32,63 @@ import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; @Slf4j public class TelemetryClientTest extends AbstractContainerTest { + @Test + public void testSendPostTelemetryRequestToCloud_performanceTest() throws Exception { + Device device = saveAndAssignDeviceToEdge(); + + Awaitility.await() + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .until(() -> { + Optional edgeDeviceCredentials = edgeRestClient.getDeviceCredentialsByDeviceId(device.getId()); + Optional cloudDeviceCredentials = cloudRestClient.getDeviceCredentialsByDeviceId(device.getId()); + return edgeDeviceCredentials.isPresent() && + cloudDeviceCredentials.isPresent() && + edgeDeviceCredentials.get().getCredentialsId().equals(cloudDeviceCredentials.get().getCredentialsId()); + }); + + DeviceCredentials deviceCredentials = edgeRestClient.getDeviceCredentialsByDeviceId(device.getId()).get(); + final String accessToken = deviceCredentials.getCredentialsId(); + final String telemetryKey = "index"; + final long numberOfTimeseriesToSend = 1000L; + for (int idx = 1; idx <= numberOfTimeseriesToSend; idx++) { + JsonObject timeseriesPayload = new JsonObject(); + timeseriesPayload.addProperty(telemetryKey, idx); + ResponseEntity deviceTelemetryResponse = edgeRestClient.getRestTemplate() + .postForEntity(edgeUrl + "/api/v1/{credentialsId}/telemetry", + JacksonUtil.OBJECT_MAPPER.readTree(timeseriesPayload.toString()), + ResponseEntity.class, + accessToken); + Assert.assertTrue(deviceTelemetryResponse.getStatusCode().is2xxSuccessful()); + } + + verifyDeviceIsActive(cloudRestClient, device.getId()); + + Awaitility.await() + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .until(() -> { + List latestTimeseries; + try { + latestTimeseries = cloudRestClient.getLatestTimeseries(device.getId(), List.of(telemetryKey)); + } catch (Exception e) { + return false; + } + return latestTimeseries.size() == 1 + && latestTimeseries.get(0).getLongValue().isPresent() + && latestTimeseries.get(0).getLongValue().get() == numberOfTimeseriesToSend; + }); + + // cleanup + cloudRestClient.deleteDevice(device.getId()); + } + @Test public void testSendPostTelemetryRequestToCloud() throws Exception { List keys = Arrays.asList("strTelemetryToCloud", "boolTelemetryToCloud", "doubleTelemetryToCloud", "longTelemetryToCloud");