Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into release-3.5
Browse files Browse the repository at this point in the history
  • Loading branch information
volodymyr-babak committed Jul 4, 2023
2 parents 5d96245 + db947ec commit cc8aaa9
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ private void processHandleMessages() {
0, null, new SortOrder("seqId"), queueStartTs, System.currentTimeMillis());
if (newCloudEventsAvailable(seqIdOffset, pageLink)) {
PageData<CloudEvent> pageData;
UUID idOffset = null;
boolean success = true;
CloudEvent latestCloudEvent = null;
do {
pageData = cloudEventService.findCloudEvents(tenantId, seqIdOffset, null, pageLink);
if (initialized) {
Expand All @@ -271,21 +271,21 @@ private void processHandleMessages() {
} else {
success = true;
}
CloudEvent latestCloudEvent = pageData.getData().get(pageData.getData().size() - 1);
idOffset = latestCloudEvent.getUuidId();
seqIdOffset = latestCloudEvent.getSeqId();
if (!pageData.getData().isEmpty()) {
latestCloudEvent = pageData.getData().get(pageData.getData().size() - 1);
}
if (success) {
pageLink = pageLink.nextPageLink();
}
}
} while (initialized && (!success || pageData.hasNext()));
if (idOffset != null) {
if (latestCloudEvent != null) {
try {
Long newStartTs = Uuids.unixTimestamp(idOffset);
updateQueueStartTsSeqIdOffset(newStartTs, seqIdOffset);
log.debug("Queue offset was updated [{}][{}][{}]", idOffset, newStartTs, seqIdOffset);
Long newStartTs = Uuids.unixTimestamp(latestCloudEvent.getUuidId());
updateQueueStartTsSeqIdOffset(newStartTs, latestCloudEvent.getSeqId());
log.debug("Queue offset was updated [{}][{}][{}]", latestCloudEvent.getUuidId(), newStartTs, latestCloudEvent.getSeqId());
} catch (Exception e) {
log.error("[{}] Failed to update queue offset [{}]", idOffset, e);
log.error("Failed to update queue offset [{}]", latestCloudEvent);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private ListenableFuture<Void> addToQueue(CloudEventEntity entity) {

@Override
public PageData<CloudEvent> findCloudEvents(UUID tenantId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink) {
log.trace("Executing findCloudEvents [{}], [{}], [{}], [{}]", tenantId, seqIdStart, seqIdEnd, pageLink);
return DaoUtil.toPageData(
cloudEventRepository
.findEventsByTenantId(
Expand Down
2 changes: 2 additions & 0 deletions docker-edge/tb-edge/conf/tb-edge.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,5 @@ export LOADER_PATH=/usr/share/tb-edge/conf,/usr/share/tb-edge/extensions
# export SPRING_DATASOURCE_URL=jdbc:postgresql://localhost:5432/tb_edge
# export SPRING_DATASOURCE_USERNAME=postgres
# export SPRING_DATASOURCE_PASSWORD=postgres

export CLOUD_RPC_STORAGE_MAX_READ_RECORDS_COUNT=10
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,63 @@

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

@Slf4j
public class TelemetryClientTest extends AbstractContainerTest {

@Test
public void testSendPostTelemetryRequestToCloud_performanceTest() throws Exception {
Device device = saveAndAssignDeviceToEdge();

Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> {
Optional<DeviceCredentials> edgeDeviceCredentials = edgeRestClient.getDeviceCredentialsByDeviceId(device.getId());
Optional<DeviceCredentials> cloudDeviceCredentials = cloudRestClient.getDeviceCredentialsByDeviceId(device.getId());
return edgeDeviceCredentials.isPresent() &&
cloudDeviceCredentials.isPresent() &&
edgeDeviceCredentials.get().getCredentialsId().equals(cloudDeviceCredentials.get().getCredentialsId());
});

DeviceCredentials deviceCredentials = edgeRestClient.getDeviceCredentialsByDeviceId(device.getId()).get();
final String accessToken = deviceCredentials.getCredentialsId();
final String telemetryKey = "index";
final long numberOfTimeseriesToSend = 1000L;
for (int idx = 1; idx <= numberOfTimeseriesToSend; idx++) {
JsonObject timeseriesPayload = new JsonObject();
timeseriesPayload.addProperty(telemetryKey, idx);
ResponseEntity deviceTelemetryResponse = edgeRestClient.getRestTemplate()
.postForEntity(edgeUrl + "/api/v1/{credentialsId}/telemetry",
JacksonUtil.OBJECT_MAPPER.readTree(timeseriesPayload.toString()),
ResponseEntity.class,
accessToken);
Assert.assertTrue(deviceTelemetryResponse.getStatusCode().is2xxSuccessful());
}

verifyDeviceIsActive(cloudRestClient, device.getId());

Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(() -> {
List<TsKvEntry> latestTimeseries;
try {
latestTimeseries = cloudRestClient.getLatestTimeseries(device.getId(), List.of(telemetryKey));
} catch (Exception e) {
return false;
}
return latestTimeseries.size() == 1
&& latestTimeseries.get(0).getLongValue().isPresent()
&& latestTimeseries.get(0).getLongValue().get() == numberOfTimeseriesToSend;
});

// cleanup
cloudRestClient.deleteDevice(device.getId());
}

@Test
public void testSendPostTelemetryRequestToCloud() throws Exception {
List<String> keys = Arrays.asList("strTelemetryToCloud", "boolTelemetryToCloud", "doubleTelemetryToCloud", "longTelemetryToCloud");
Expand Down

0 comments on commit cc8aaa9

Please sign in to comment.