Skip to content

Commit

Permalink
Merge pull request #9030 from dskarzh/feature/device-state-node
Browse files Browse the repository at this point in the history
Device state rule node; device state service improvements
  • Loading branch information
ashvayka authored Feb 16, 2024
2 parents 921159d + 73afd9a commit d5da538
Show file tree
Hide file tree
Showing 20 changed files with 2,281 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.SmsService;
import org.thingsboard.rule.engine.api.slack.SlackService;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
Expand Down Expand Up @@ -203,6 +204,10 @@ public ConcurrentMap<TenantId, DebugTbRateLimits> getDebugPerTenantLimits() {
@Getter
private DeviceCredentialsService deviceCredentialsService;

@Autowired(required = false)
@Getter
private RuleEngineDeviceStateManager deviceStateManager;

@Autowired
@Getter
private TbTenantProfileCache tenantProfileCache;
Expand Down Expand Up @@ -556,6 +561,10 @@ public void printStats() {
@Getter
private boolean externalNodeForceAck;

@Value("${state.rule.node.deviceState.rateLimit:1:1,30:60,60:3600}")
@Getter
private String deviceStateNodeRateLimitConfig;

@Getter
@Setter
private TbActorSystem actorSystem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService;
import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.ScriptEngine;
Expand Down Expand Up @@ -108,6 +109,7 @@
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
import org.thingsboard.server.service.script.RuleNodeTbelScriptEngine;

Expand Down Expand Up @@ -213,7 +215,19 @@ private void enqueue(TopicPartitionInfo tpi, TbMsg tbMsg, Consumer<Throwable> on
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(onSuccess, onFailure));
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(
metadata -> {
if (onSuccess != null) {
onSuccess.run();
}
},
t -> {
if (onFailure != null) {
onFailure.accept(t);
} else {
log.debug("[{}] Failed to put item into queue!", nodeCtx.getTenantId().getId(), t);
}
}));
}

@Override
Expand Down Expand Up @@ -299,7 +313,19 @@ private void enqueueForTellNext(TopicPartitionInfo tpi, String queueName, TbMsg
relationTypes.forEach(relationType ->
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, relationType, null, failureMessage));
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(onSuccess, onFailure));
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(
metadata -> {
if (onSuccess != null) {
onSuccess.run();
}
},
t -> {
if (onFailure != null) {
onFailure.accept(t);
} else {
log.debug("[{}] Failed to put item into queue!", nodeCtx.getTenantId().getId(), t);
}
}));
}

@Override
Expand Down Expand Up @@ -658,6 +684,16 @@ public DeviceCredentialsService getDeviceCredentialsService() {
return mainCtx.getDeviceCredentialsService();
}

@Override
public RuleEngineDeviceStateManager getDeviceStateManager() {
return mainCtx.getDeviceStateManager();
}

@Override
public String getDeviceStateNodeRateLimitConfig() {
return mainCtx.getDeviceStateNodeRateLimitConfig();
}

@Override
public TbClusterService getClusterService() {
return mainCtx.getClusterService();
Expand Down Expand Up @@ -952,29 +988,4 @@ private static String getFailureMessage(Throwable th) {
return failureMessage;
}

private class SimpleTbQueueCallback implements TbQueueCallback {
private final Runnable onSuccess;
private final Consumer<Throwable> onFailure;

public SimpleTbQueueCallback(Runnable onSuccess, Consumer<Throwable> onFailure) {
this.onSuccess = onSuccess;
this.onFailure = onFailure;
}

@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
if (onSuccess != null) {
onSuccess.run();
}
}

@Override
public void onFailure(Throwable t) {
if (onFailure != null) {
onFailure.accept(t);
} else {
log.debug("[{}] Failed to put item into queue", nodeCtx.getTenantId(), t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package org.thingsboard.server.service.queue;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -148,6 +151,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
protected volatile ExecutorService consumersExecutor;
protected volatile ExecutorService usageStatsExecutor;
private volatile ExecutorService firmwareStatesExecutor;
private volatile ListeningExecutorService deviceActivityEventsExecutor;

public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory,
ActorSystemContext actorContext,
Expand Down Expand Up @@ -195,6 +199,7 @@ public void init() {
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-consumer"));
this.usageStatsExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-usage-stats-consumer"));
this.firmwareStatesExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-firmware-notifications-consumer"));
this.deviceActivityEventsExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-device-activity-events-executor")));
}

@PreDestroy
Expand All @@ -209,6 +214,9 @@ public void destroy() {
if (firmwareStatesExecutor != null) {
firmwareStatesExecutor.shutdownNow();
}
if (deviceActivityEventsExecutor != null) {
deviceActivityEventsExecutor.shutdownNow();
}
}

@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
Expand Down Expand Up @@ -265,14 +273,23 @@ protected void launchMainConsumers() {
log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
} else if (toCoreMsg.hasDeviceStateServiceMsg()) {
log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
} else if (toCoreMsg.hasEdgeNotificationMsg()) {
log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg());
forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback);
} else if (toCoreMsg.hasDeviceConnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceConnectMsg());
forwardToStateService(toCoreMsg.getDeviceConnectMsg(), callback);
} else if (toCoreMsg.hasDeviceActivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceActivityMsg());
forwardToStateService(toCoreMsg.getDeviceActivityMsg(), callback);
} else if (toCoreMsg.hasDeviceDisconnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceDisconnectMsg());
forwardToStateService(toCoreMsg.getDeviceDisconnectMsg(), callback);
} else if (toCoreMsg.hasDeviceInactivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceInactivityMsg());
forwardToStateService(toCoreMsg.getDeviceInactivityMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorNotification()) {
TbActorMsg actorMsg = ProtoUtils.fromProto(toCoreMsg.getToDeviceActorNotification());
if (actorMsg != null) {
Expand Down Expand Up @@ -641,25 +658,71 @@ private void forwardToSubMgrService(SubscriptionMgrMsgProto msg, TbCallback call
}
}

private void forwardToStateService(DeviceStateServiceMsgProto deviceStateServiceMsg, TbCallback callback) {
void forwardToStateService(DeviceStateServiceMsgProto deviceStateServiceMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceStateServiceMsg);
}
stateService.onQueueMsg(deviceStateServiceMsg, callback);
}

private void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
void forwardToStateService(TransportProtos.DeviceConnectProto deviceConnectMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceConnectMsg);
}
var tenantId = toTenantId(deviceConnectMsg.getTenantIdMSB(), deviceConnectMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceConnectMsg.getDeviceIdMSB(), deviceConnectMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceConnect(tenantId, deviceId, deviceConnectMsg.getLastConnectTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device connect message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}

void forwardToStateService(TransportProtos.DeviceActivityProto deviceActivityMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceActivityMsg);
}
TenantId tenantId = toTenantId(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB());
DeviceId deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
try {
stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime());
callback.onSuccess();
} catch (Exception e) {
callback.onFailure(new RuntimeException("Failed update device activity for device [" + deviceId.getId() + "]!", e));
var tenantId = toTenantId(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device activity message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(new RuntimeException("Failed to update device activity for device [" + deviceId.getId() + "]!", t));
});
}

void forwardToStateService(TransportProtos.DeviceDisconnectProto deviceDisconnectMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceDisconnectMsg);
}
var tenantId = toTenantId(deviceDisconnectMsg.getTenantIdMSB(), deviceDisconnectMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceDisconnectMsg.getDeviceIdMSB(), deviceDisconnectMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceDisconnect(tenantId, deviceId, deviceDisconnectMsg.getLastDisconnectTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device disconnect message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}

void forwardToStateService(TransportProtos.DeviceInactivityProto deviceInactivityMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceInactivityMsg);
}
var tenantId = toTenantId(deviceInactivityMsg.getTenantIdMSB(), deviceInactivityMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceInactivityMsg.getDeviceIdMSB(), deviceInactivityMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceInactivity(tenantId, deviceId, deviceInactivityMsg.getLastInactivityTime()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device inactivity message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}

private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public class TbCoreConsumerStats {
public static final String DEVICE_STATES = "deviceState";
public static final String SUBSCRIPTION_MSGS = "subMsgs";
public static final String EDGE_NOTIFICATIONS = "edgeNfs";
public static final String DEVICE_CONNECTS = "deviceConnect";
public static final String DEVICE_ACTIVITIES = "deviceActivity";
public static final String DEVICE_DISCONNECTS = "deviceDisconnect";
public static final String DEVICE_INACTIVITIES = "deviceInactivity";

public static final String TO_CORE_NF_OTHER = "coreNfOther"; // normally, there is no messages when codebase is fine
public static final String TO_CORE_NF_COMPONENT_LIFECYCLE = "coreNfCompLfcl";
Expand All @@ -63,7 +66,10 @@ public class TbCoreConsumerStats {
private final StatsCounter deviceStateCounter;
private final StatsCounter subscriptionMsgCounter;
private final StatsCounter edgeNotificationsCounter;
private final StatsCounter deviceConnectsCounter;
private final StatsCounter deviceActivitiesCounter;
private final StatsCounter deviceDisconnectsCounter;
private final StatsCounter deviceInactivitiesCounter;

private final StatsCounter toCoreNfOtherCounter;
private final StatsCounter toCoreNfComponentLifecycleCounter;
Expand Down Expand Up @@ -94,7 +100,10 @@ public TbCoreConsumerStats(StatsFactory statsFactory) {
this.deviceStateCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_STATES));
this.subscriptionMsgCounter = register(statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS));
this.edgeNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, EDGE_NOTIFICATIONS));
this.deviceConnectsCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_CONNECTS));
this.deviceActivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_ACTIVITIES));
this.deviceDisconnectsCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_DISCONNECTS));
this.deviceInactivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_INACTIVITIES));

// Core notification counters
this.toCoreNfOtherCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_OTHER));
Expand Down Expand Up @@ -152,11 +161,26 @@ public void log(TransportProtos.EdgeNotificationMsgProto msg) {
edgeNotificationsCounter.increment();
}

public void log(TransportProtos.DeviceConnectProto msg) {
totalCounter.increment();
deviceConnectsCounter.increment();
}

public void log(TransportProtos.DeviceActivityProto msg) {
totalCounter.increment();
deviceActivitiesCounter.increment();
}

public void log(TransportProtos.DeviceDisconnectProto msg) {
totalCounter.increment();
deviceDisconnectsCounter.increment();
}

public void log(TransportProtos.DeviceInactivityProto msg) {
totalCounter.increment();
deviceInactivitiesCounter.increment();
}

public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
totalCounter.increment();
subscriptionMsgCounter.increment();
Expand Down
Loading

0 comments on commit d5da538

Please sign in to comment.