Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
volodymyr-babak committed Sep 5, 2024
2 parents 72e237c + f8e3d60 commit 7a79f21
Show file tree
Hide file tree
Showing 20 changed files with 535 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class OAuth2Controller extends BaseController {
@ApiOperation(value = "Get OAuth2 clients (getOAuth2Clients)", notes = "Get the list of OAuth2 clients " +
"to log in with, available for such domain scheme (HTTP or HTTPS) (if x-forwarded-proto request header is present - " +
"the scheme is known from it) and domain name and port (port may be known from x-forwarded-port header)")
@PostMapping(value = "/noauth/oauth2/client")
@PostMapping(value = "/noauth/oauth2Clients")
public List<OAuth2ClientLoginInfo> getOAuth2Clients(HttpServletRequest request,
@Parameter(description = "Mobile application package name, to find OAuth2 clients " +
"where there is configured mobile application with such package name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
private final DbTypeInfoComponent dbTypeInfoComponent;
private final TbApiUsageReportClient apiUsageReportClient;
private final NotificationRuleProcessor notificationRuleProcessor;
@Autowired @Lazy
@Autowired
@Lazy
private TelemetrySubscriptionService tsSubService;

@Value("${state.defaultInactivityTimeoutInSec}")
Expand Down Expand Up @@ -362,14 +363,16 @@ public void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallb
if (proto.getAdded()) {
Futures.addCallback(fetchDeviceState(device), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable DeviceStateData state) {
public void onSuccess(DeviceStateData state) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId());
if (addDeviceUsingState(tpi, state)) {
save(deviceId, ACTIVITY_STATE, false);
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
boolean isMyPartition = deviceIds != null;
if (isMyPartition) {
deviceIds.add(state.getDeviceId());
initializeActivityState(deviceId, state);
callback.onSuccess();
} else {
log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}"
, tenantId, deviceId, tpi.getFullTopicName());
log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}", tenantId, deviceId, tpi.getFullTopicName());
callback.onFailure(new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!"));
}
}
Expand Down Expand Up @@ -400,6 +403,21 @@ public void onFailure(Throwable t) {
}
}

private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
cleanupEntity(deviceId);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);
if (deviceIdSet != null) {
deviceIdSet.remove(deviceId);
}
}

private void initializeActivityState(DeviceId deviceId, DeviceStateData fetchedState) {
DeviceStateData cachedState = deviceStates.putIfAbsent(fetchedState.getDeviceId(), fetchedState);
boolean activityState = Objects.requireNonNullElse(cachedState, fetchedState).getState().isActive();
save(deviceId, ACTIVITY_STATE, activityState);
}

@Override
protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
Expand Down Expand Up @@ -436,10 +454,16 @@ protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(S
}
if (devicePackFutureHolder.future == null || !devicePackFutureHolder.future.isCancelled()) {
for (var state : states) {
if (!addDeviceUsingState(entry.getKey(), state)) {
return;
TopicPartitionInfo tpi = entry.getKey();
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
boolean isMyPartition = deviceIds != null;
if (isMyPartition) {
deviceIds.add(state.getDeviceId());
deviceStates.putIfAbsent(state.getDeviceId(), state);
checkAndUpdateState(state.getDeviceId(), state);
} else {
log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
}
checkAndUpdateState(state.getDeviceId(), state);
}
log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size());
}
Expand Down Expand Up @@ -475,18 +499,6 @@ void checkAndUpdateState(@Nonnull DeviceId deviceId, @Nonnull DeviceStateData st
}
}

private boolean addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) {
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
if (deviceIds != null) {
deviceIds.add(state.getDeviceId());
deviceStates.putIfAbsent(state.getDeviceId(), state);
return true;
} else {
log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
return false;
}
}

void checkStates() {
try {
final long ts = getCurrentTimeMillis();
Expand Down Expand Up @@ -619,15 +631,6 @@ boolean cleanDeviceStateIfBelongsToExternalPartition(TenantId tenantId, final De
return cleanup;
}

private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
cleanupEntity(deviceId);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);
if (deviceIdSet != null) {
deviceIdSet.remove(deviceId);
}
}

@Override
protected void cleanupEntityOnPartitionRemoval(DeviceId deviceId) {
cleanupEntity(deviceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.state;

import com.google.common.util.concurrent.Futures;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -28,8 +29,10 @@
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceIdInfo;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger;
Expand All @@ -41,11 +44,13 @@
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.sql.query.EntityQueryRepository;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
Expand All @@ -66,6 +71,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -1070,4 +1076,106 @@ public void givenConcurrentAccess_whenGetOrFetchDeviceStateData_thenFetchDeviceS
then(service).should().fetchDeviceStateDataUsingSeparateRequests(deviceId);
}

@Test
public void givenDeviceAdded_whenOnQueueMsg_thenShouldCacheAndSaveActivityToFalse() throws InterruptedException {
// GIVEN
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId));
given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));

TransportProtos.DeviceStateServiceMsgProto proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setAdded(true)
.setUpdated(false)
.setDeleted(false)
.build();

// WHEN
service.onQueueMsg(proto, TbCallback.EMPTY);

// THEN
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(false);
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(false), any());
});
}

@Test
public void givenDeviceActivityEventHappenedAfterAdded_whenOnDeviceActivity_thenShouldCacheAndSaveActivityToTrue() throws InterruptedException {
// GIVEN
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
long currentTime = System.currentTimeMillis();
DeviceState deviceState = DeviceState.builder()
.active(false)
.inactivityTimeout(service.getDefaultInactivityTimeoutInSec())
.build();
DeviceStateData stateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.deviceCreationTime(currentTime - 10000)
.state(deviceState)
.metaData(TbMsgMetaData.EMPTY)
.build();
service.deviceStates.put(deviceId, stateData);

// WHEN
service.onDeviceActivity(tenantId, deviceId, currentTime);

// THEN
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true);
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(LAST_ACTIVITY_TIME), eq(currentTime), any());
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(true), any());
});
}

@Test
public void givenDeviceActivityEventHappenedBeforeAdded_whenOnQueueMsg_thenShouldSaveActivityStateUsingValueFromCache() throws InterruptedException {
// GIVEN
final long defaultTimeout = 1000;
initStateService(defaultTimeout);
given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId));
given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));

long currentTime = System.currentTimeMillis();
DeviceState deviceState = DeviceState.builder()
.active(true)
.lastConnectTime(currentTime - 8000)
.lastActivityTime(currentTime - 4000)
.lastDisconnectTime(0)
.lastInactivityAlarmTime(0)
.inactivityTimeout(3000)
.build();
DeviceStateData stateData = DeviceStateData.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.deviceCreationTime(currentTime - 10000)
.state(deviceState)
.build();
service.deviceStates.put(deviceId, stateData);

// WHEN
TransportProtos.DeviceStateServiceMsgProto proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setAdded(true)
.setUpdated(false)
.setDeleted(false)
.build();
service.onQueueMsg(proto, TbCallback.EMPTY);

// THEN
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true);
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(true), any());
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;

import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

import static org.thingsboard.server.dao.service.ConstraintValidator.validateFields;

@Slf4j
@RuleNode(
type = ComponentType.EXTERNAL,
Expand All @@ -62,10 +65,9 @@ public class TbAwsLambdaNode extends TbAbstractExternalNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
config = TbNodeUtils.convert(configuration, TbAwsLambdaNodeConfiguration.class);
if (StringUtils.isBlank(config.getFunctionName())) {
throw new TbNodeException("Function name must be set!", true);
}
String errorPrefix = "'" + ctx.getSelf().getName() + "' node configuration is invalid: ";
try {
validateFields(config, errorPrefix);
AWSCredentials awsCredentials = new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey());
client = AWSLambdaAsyncClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
Expand All @@ -74,6 +76,8 @@ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNode
.withConnectionTimeout((int) TimeUnit.SECONDS.toMillis(config.getConnectionTimeout()))
.withRequestTimeout((int) TimeUnit.SECONDS.toMillis(config.getRequestTimeout())))
.build();
} catch (DataValidationException e) {
throw new TbNodeException(e, true);
} catch (Exception e) {
throw new TbNodeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.thingsboard.rule.engine.aws.lambda;

import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;

Expand All @@ -23,12 +25,18 @@ public class TbAwsLambdaNodeConfiguration implements NodeConfiguration<TbAwsLamb

public static final String DEFAULT_QUALIFIER = "$LATEST";

@NotBlank
private String accessKey;
@NotBlank
private String secretKey;
@NotBlank
private String region;
@NotBlank
private String functionName;
private String qualifier;
@Min(0)
private int connectionTimeout;
@Min(0)
private int requestTimeout;
private boolean tellFailureIfFuncThrowsExc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNode
this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
try {
this.mqttClient = initClient(ctx);
} catch (TbNodeException e) {
throw e;
} catch (Exception e) {
throw new TbNodeException(e);
}
Expand Down Expand Up @@ -119,8 +121,7 @@ protected MqttClient initClient(TbContext ctx) throws Exception {
MqttClientConfig config = new MqttClientConfig(getSslContext());
config.setOwnerId(getOwnerId(ctx));
if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) {
config.setClientId(this.mqttNodeConfiguration.isAppendClientIdSuffix() ?
this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() : this.mqttNodeConfiguration.getClientId());
config.setClientId(getClientId(ctx));
}
config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());

Expand All @@ -146,6 +147,17 @@ protected MqttClient initClient(TbContext ctx) throws Exception {
return client;
}

private String getClientId(TbContext ctx) throws TbNodeException {
String clientId = this.mqttNodeConfiguration.isAppendClientIdSuffix() ?
this.mqttNodeConfiguration.getClientId() + "_" + ctx.getServiceId() :
this.mqttNodeConfiguration.getClientId();
if (clientId.length() > 23) {
throw new TbNodeException("Client ID is too long '" + clientId + "'. " +
"The length of Client ID cannot be longer than 23, but current length is " + clientId.length() + ".", true);
}
return clientId;
}

MqttClient getMqttClient(TbContext ctx, MqttClientConfig config) {
return MqttClient.create(config, null, ctx.getExternalCallExecutor());
}
Expand Down
Loading

0 comments on commit 7a79f21

Please sign in to comment.