Skip to content

Commit

Permalink
Merge branch 'master' of github.com:thingsboard/thingsboard
Browse files Browse the repository at this point in the history
  • Loading branch information
ikulikov committed Sep 13, 2024
2 parents 1ff48fb + 6e76509 commit f80a62a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 438 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
*/
package org.thingsboard.rule.engine.delay;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
Expand All @@ -27,27 +26,18 @@
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;

import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.thingsboard.server.dao.service.ConstraintValidator.validateFields;

@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "delay (deprecated)",
version = 1,
configClazz = TbMsgDelayNodeConfiguration.class,
nodeDescription = "Delays incoming message (deprecated)",
nodeDetails = "Delays messages for a configurable period. " +
Expand All @@ -60,22 +50,13 @@
)
public class TbMsgDelayNode implements TbNode {

private static final Set<TimeUnit> supportedTimeUnits = EnumSet.of(TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
private static final String supportedTimeUnitsStr = supportedTimeUnits.stream().map(TimeUnit::name).collect(Collectors.joining(", "));

private TbMsgDelayNodeConfiguration config;
private ConcurrentMap<UUID, TbMsg> pendingMsgs;
private Map<UUID, TbMsg> pendingMsgs;

@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgDelayNodeConfiguration.class);
String errorPrefix = "'" + ctx.getSelf().getName() + "' node configuration is invalid: ";
try {
validateFields(config, errorPrefix);
} catch (DataValidationException e) {
throw new TbNodeException(e, true);
}
this.pendingMsgs = new ConcurrentHashMap<>();
this.pendingMsgs = new HashMap<>();
}

@Override
Expand All @@ -86,7 +67,7 @@ public void onMsg(TbContext ctx, TbMsg msg) {
ctx.enqueueForTellNext(
TbMsg.newMsg(
pendingMsg.getQueueName(),
pendingMsg.getInternalType(),
pendingMsg.getType(),
pendingMsg.getOriginator(),
pendingMsg.getCustomerId(),
pendingMsg.getMetaData(),
Expand All @@ -108,69 +89,25 @@ public void onMsg(TbContext ctx, TbMsg msg) {
}

private long getDelay(TbMsg msg) {
String timeUnitPattern = TbNodeUtils.processPattern(config.getTimeUnit(), msg);
String periodPattern = TbNodeUtils.processPattern(config.getPeriod(), msg);
try {
TimeUnit timeUnit = TimeUnit.valueOf(timeUnitPattern.toUpperCase());
if (!supportedTimeUnits.contains(timeUnit)) {
throw new RuntimeException("Time unit '" + timeUnit + "' is not supported! " +
"Only " + supportedTimeUnitsStr + " are supported.");
int periodInSeconds;
if (config.isUseMetadataPeriodInSecondsPatterns()) {
if (isParsable(msg, config.getPeriodInSecondsPattern())) {
periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg));
} else {
throw new RuntimeException("Can't parse period in seconds from metadata using pattern: " + config.getPeriodInSecondsPattern());
}
int period = Integer.parseInt(periodPattern);
return timeUnit.toMillis(period);
} catch (NumberFormatException e) {
throw new NumberFormatException("Can't parse period value : " + periodPattern);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid value for period time unit : " + timeUnitPattern);
} else {
periodInSeconds = config.getPeriodInSeconds();
}
return TimeUnit.SECONDS.toMillis(periodInSeconds);
}

@Override
public void destroy() {
pendingMsgs.clear();
private boolean isParsable(TbMsg msg, String pattern) {
return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg));
}

@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
var periodInSeconds = "periodInSeconds";
var periodInSecondsPattern = "periodInSecondsPattern";
var useMetadataPeriodInSecondsPatterns = "useMetadataPeriodInSecondsPatterns";
var period = "period";
if (oldConfiguration.has(useMetadataPeriodInSecondsPatterns)) {
var isUsedPattern = oldConfiguration.get(useMetadataPeriodInSecondsPatterns).booleanValue();
if (isUsedPattern) {
if (!oldConfiguration.has(periodInSecondsPattern)) {
throw new TbNodeException("Property to update: '" + periodInSecondsPattern + "' does not exist in configuration.");
}
((ObjectNode) oldConfiguration).set(period, oldConfiguration.get(periodInSecondsPattern));
} else {
if (!oldConfiguration.has(periodInSeconds)) {
throw new TbNodeException("Property to update: '" + periodInSeconds + "' does not exist in configuration.");
}
((ObjectNode) oldConfiguration).put(period, oldConfiguration.get(periodInSeconds).asText());
}
hasChanges = true;
} else if (oldConfiguration.has(periodInSeconds)) {
((ObjectNode) oldConfiguration).put(period, oldConfiguration.get(periodInSeconds).asText());
hasChanges = true;
}
if (!oldConfiguration.has(period)) {
((ObjectNode) oldConfiguration).put(period, "60");
hasChanges = true;
}
var timeUnit = "timeUnit";
if (!oldConfiguration.has(timeUnit)) {
((ObjectNode) oldConfiguration).put(timeUnit, TimeUnit.SECONDS.name());
hasChanges = true;
}
((ObjectNode) oldConfiguration).remove(List.of(periodInSeconds, periodInSecondsPattern, useMetadataPeriodInSecondsPatterns));
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
public void destroy() {
pendingMsgs.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,23 @@
*/
package org.thingsboard.rule.engine.delay;

import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;

import java.util.concurrent.TimeUnit;

@Data
public class TbMsgDelayNodeConfiguration implements NodeConfiguration<TbMsgDelayNodeConfiguration> {

@NotNull
private String period;
@NotNull
private String timeUnit;
@Min(1)
@Max(100000)
private int periodInSeconds;
private int maxPendingMsgs;
private String periodInSecondsPattern;
private boolean useMetadataPeriodInSecondsPatterns;

@Override
public TbMsgDelayNodeConfiguration defaultConfiguration() {
TbMsgDelayNodeConfiguration configuration = new TbMsgDelayNodeConfiguration();
configuration.setPeriod("60");
configuration.setTimeUnit(TimeUnit.SECONDS.name());
configuration.setPeriodInSeconds(60);
configuration.setMaxPendingMsgs(1000);
configuration.setUseMetadataPeriodInSecondsPatterns(false);
return configuration;
}
}
Loading

0 comments on commit f80a62a

Please sign in to comment.