Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
AndriiLandiak committed Feb 16, 2024
2 parents f3cb9b6 + b934610 commit fd249dd
Show file tree
Hide file tree
Showing 95 changed files with 5,486 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"single_switch",
"command_button",
"power_button",
"slider",
"control_widgets.switch_control",
"control_widgets.slide_toggle_control",
"control_widgets.round_switch",
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

39 changes: 39 additions & 0 deletions application/src/main/data/json/system/widget_types/slider.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.SmsService;
import org.thingsboard.rule.engine.api.slack.SlackService;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
Expand Down Expand Up @@ -204,6 +205,10 @@ public ConcurrentMap<TenantId, DebugTbRateLimits> getDebugPerTenantLimits() {
@Getter
private DeviceCredentialsService deviceCredentialsService;

@Autowired(required = false)
@Getter
private RuleEngineDeviceStateManager deviceStateManager;

@Autowired
@Getter
private TbTenantProfileCache tenantProfileCache;
Expand Down Expand Up @@ -561,6 +566,10 @@ public void printStats() {
@Getter
private boolean externalNodeForceAck;

@Value("${state.rule.node.deviceState.rateLimit:1:1,30:60,60:3600}")
@Getter
private String deviceStateNodeRateLimitConfig;

@Getter
@Setter
private TbActorSystem actorSystem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService;
import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.ScriptEngine;
Expand Down Expand Up @@ -109,6 +110,7 @@
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
import org.thingsboard.server.service.script.RuleNodeTbelScriptEngine;

Expand Down Expand Up @@ -214,7 +216,19 @@ private void enqueue(TopicPartitionInfo tpi, TbMsg tbMsg, Consumer<Throwable> on
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(onSuccess, onFailure));
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(
metadata -> {
if (onSuccess != null) {
onSuccess.run();
}
},
t -> {
if (onFailure != null) {
onFailure.accept(t);
} else {
log.debug("[{}] Failed to put item into queue!", nodeCtx.getTenantId().getId(), t);
}
}));
}

@Override
Expand Down Expand Up @@ -300,7 +314,19 @@ private void enqueueForTellNext(TopicPartitionInfo tpi, String queueName, TbMsg
relationTypes.forEach(relationType ->
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, relationType, null, failureMessage));
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(onSuccess, onFailure));
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(
metadata -> {
if (onSuccess != null) {
onSuccess.run();
}
},
t -> {
if (onFailure != null) {
onFailure.accept(t);
} else {
log.debug("[{}] Failed to put item into queue!", nodeCtx.getTenantId().getId(), t);
}
}));
}

@Override
Expand Down Expand Up @@ -659,6 +685,16 @@ public DeviceCredentialsService getDeviceCredentialsService() {
return mainCtx.getDeviceCredentialsService();
}

@Override
public RuleEngineDeviceStateManager getDeviceStateManager() {
return mainCtx.getDeviceStateManager();
}

@Override
public String getDeviceStateNodeRateLimitConfig() {
return mainCtx.getDeviceStateNodeRateLimitConfig();
}

@Override
public TbClusterService getClusterService() {
return mainCtx.getClusterService();
Expand Down Expand Up @@ -958,29 +994,4 @@ private static String getFailureMessage(Throwable th) {
return failureMessage;
}

private class SimpleTbQueueCallback implements TbQueueCallback {
private final Runnable onSuccess;
private final Consumer<Throwable> onFailure;

public SimpleTbQueueCallback(Runnable onSuccess, Consumer<Throwable> onFailure) {
this.onSuccess = onSuccess;
this.onFailure = onFailure;
}

@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
if (onSuccess != null) {
onSuccess.run();
}
}

@Override
public void onFailure(Throwable t) {
if (onFailure != null) {
onFailure.accept(t);
} else {
log.debug("[{}] Failed to put item into queue", nodeCtx.getTenantId(), t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public void handleEvent(DeleteEntityEvent<?> event) {
private EdgeEventActionType getEdgeEventActionTypeForEntityEvent(Object entity) {
if (entity instanceof AlarmComment) {
return EdgeEventActionType.DELETED_COMMENT;
} else if (entity instanceof Alarm) {
return EdgeEventActionType.ALARM_DELETE;
}
return EdgeEventActionType.DELETED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ private List<DownlinkMsg> convertToDownlinkMsgsPack(List<EdgeEvent> edgeEvents)
case UNASSIGNED_FROM_EDGE:
case ALARM_ACK:
case ALARM_CLEAR:
case ALARM_DELETE:
case CREDENTIALS_UPDATED:
case RELATION_ADD_OR_UPDATE:
case RELATION_DELETED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ protected UpdateMsgType getUpdateMsgType(EdgeEventActionType actionType) {
case UNASSIGNED_FROM_EDGE:
case RELATION_DELETED:
case DELETED_COMMENT:
case ALARM_DELETE:
return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE;
case ALARM_ACK:
return UpdateMsgType.ALARM_ACK_RPC_MESSAGE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public ListenableFuture<Void> processAlarmNotification(TenantId tenantId, Transp
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB());
if (EdgeEventActionType.DELETED.equals(actionType)) {
if (EdgeEventActionType.DELETED.equals(actionType) || EdgeEventActionType.ALARM_DELETE.equals(actionType)) {
Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class);
if (deletedAlarm == null) {
return Futures.immediateFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ protected AlarmUpdateMsg convertAlarmEventToAlarmMsg(TenantId tenantId, UUID ent
.constructAlarmUpdatedMsg(msgType, alarm, findOriginatorEntityName(tenantId, alarm));
}
break;
case ALARM_DELETE:
case DELETED:
Alarm deletedAlarm = JacksonUtil.convertValue(body, Alarm.class);
if (deletedAlarm != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public List<AlarmId> unassignDeletedUserAlarms(TenantId tenantId, User user, lon
public Boolean delete(Alarm alarm, User user) {
TenantId tenantId = alarm.getTenantId();
notificationEntityService.logEntityAction(tenantId, alarm.getOriginator(), alarm, alarm.getCustomerId(),
ActionType.DELETED, user);
ActionType.ALARM_DELETE, user);
return alarmSubscriptionService.deleteAlarm(tenantId, alarm.getId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package org.thingsboard.server.service.queue;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -150,6 +153,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
protected volatile ExecutorService consumersExecutor;
protected volatile ExecutorService usageStatsExecutor;
private volatile ExecutorService firmwareStatesExecutor;
private volatile ListeningExecutorService deviceActivityEventsExecutor;

public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory,
ActorSystemContext actorContext,
Expand Down Expand Up @@ -199,6 +203,7 @@ public void init() {
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-consumer"));
this.usageStatsExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-usage-stats-consumer"));
this.firmwareStatesExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-firmware-notifications-consumer"));
this.deviceActivityEventsExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-device-activity-events-executor")));
}

@PreDestroy
Expand All @@ -213,6 +218,9 @@ public void destroy() {
if (firmwareStatesExecutor != null) {
firmwareStatesExecutor.shutdownNow();
}
if (deviceActivityEventsExecutor != null) {
deviceActivityEventsExecutor.shutdownNow();
}
}

@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
Expand Down Expand Up @@ -269,17 +277,26 @@ protected void launchMainConsumers() {
log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
} else if (toCoreMsg.hasDeviceStateServiceMsg()) {
log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
} else if (toCoreMsg.hasEdgeNotificationMsg()) {
log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg());
forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback);
} else if (toCoreMsg.hasCloudNotificationMsg()) {
log.trace("[{}] Forwarding message to cloud service {}", id, toCoreMsg.getCloudNotificationMsg());
forwardToCloudNotificationService(toCoreMsg.getCloudNotificationMsg(), callback);
} else if (toCoreMsg.hasDeviceConnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceConnectMsg());
forwardToStateService(toCoreMsg.getDeviceConnectMsg(), callback);
} else if (toCoreMsg.hasDeviceActivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceActivityMsg());
forwardToStateService(toCoreMsg.getDeviceActivityMsg(), callback);
} else if (toCoreMsg.hasDeviceDisconnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceDisconnectMsg());
forwardToStateService(toCoreMsg.getDeviceDisconnectMsg(), callback);
} else if (toCoreMsg.hasDeviceInactivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceInactivityMsg());
forwardToStateService(toCoreMsg.getDeviceInactivityMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorNotification()) {
TbActorMsg actorMsg = ProtoUtils.fromProto(toCoreMsg.getToDeviceActorNotification());
if (actorMsg != null) {
Expand Down Expand Up @@ -648,7 +665,7 @@ private void forwardToSubMgrService(SubscriptionMgrMsgProto msg, TbCallback call
}
}

private void forwardToStateService(DeviceStateServiceMsgProto deviceStateServiceMsg, TbCallback callback) {
void forwardToStateService(DeviceStateServiceMsgProto deviceStateServiceMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceStateServiceMsg);
}
Expand All @@ -662,18 +679,64 @@ private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, Tb
actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
}

private void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
void forwardToStateService(TransportProtos.DeviceConnectProto deviceConnectMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceConnectMsg);
}
var tenantId = toTenantId(deviceConnectMsg.getTenantIdMSB(), deviceConnectMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceConnectMsg.getDeviceIdMSB(), deviceConnectMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceConnect(tenantId, deviceId, deviceConnectMsg.getLastConnectTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device connect message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}

void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceActivityMsg);
}
TenantId tenantId = toTenantId(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB());
DeviceId deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
try {
stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime());
callback.onSuccess();
} catch (Exception e) {
callback.onFailure(new RuntimeException("Failed update device activity for device [" + deviceId.getId() + "]!", e));
var tenantId = toTenantId(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device activity message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(new RuntimeException("Failed to update device activity for device [" + deviceId.getId() + "]!", t));
});
}

void forwardToStateService(TransportProtos.DeviceDisconnectProto deviceDisconnectMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceDisconnectMsg);
}
var tenantId = toTenantId(deviceDisconnectMsg.getTenantIdMSB(), deviceDisconnectMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceDisconnectMsg.getDeviceIdMSB(), deviceDisconnectMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceDisconnect(tenantId, deviceId, deviceDisconnectMsg.getLastDisconnectTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device disconnect message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}

void forwardToStateService(TransportProtos.DeviceInactivityProto deviceInactivityMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceInactivityMsg);
}
var tenantId = toTenantId(deviceInactivityMsg.getTenantIdMSB(), deviceInactivityMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceInactivityMsg.getDeviceIdMSB(), deviceInactivityMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceInactivity(tenantId, deviceId, deviceInactivityMsg.getLastInactivityTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device inactivity message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}

private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) {
Expand Down
Loading

0 comments on commit fd249dd

Please sign in to comment.