From c393d1d377348c3fd3da325113a5cacecb0578d2 Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Thu, 12 Sep 2024 18:15:35 +0300 Subject: [PATCH] Revert "Delay node: improvements" --- .../rule/engine/delay/TbMsgDelayNode.java | 101 +----- .../delay/TbMsgDelayNodeConfiguration.java | 18 +- .../rule/engine/delay/TbMsgDelayNodeTest.java | 343 ------------------ 3 files changed, 24 insertions(+), 438 deletions(-) delete mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java index 267d3fae735..b60a46ee0a8 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -15,9 +15,8 @@ */ package org.thingsboard.rule.engine.delay; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.math.NumberUtils; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -27,27 +26,18 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.plugin.ComponentType; -import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; -import org.thingsboard.server.dao.exception.DataValidationException; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.thingsboard.server.dao.service.ConstraintValidator.validateFields; @Slf4j @RuleNode( type = ComponentType.ACTION, name = "delay (deprecated)", - version = 1, configClazz = TbMsgDelayNodeConfiguration.class, nodeDescription = "Delays incoming message (deprecated)", nodeDetails = "Delays messages for a configurable period. " + @@ -60,22 +50,13 @@ ) public class TbMsgDelayNode implements TbNode { - private static final Set supportedTimeUnits = EnumSet.of(TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS); - private static final String supportedTimeUnitsStr = supportedTimeUnits.stream().map(TimeUnit::name).collect(Collectors.joining(", ")); - private TbMsgDelayNodeConfiguration config; - private ConcurrentMap pendingMsgs; + private Map pendingMsgs; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgDelayNodeConfiguration.class); - String errorPrefix = "'" + ctx.getSelf().getName() + "' node configuration is invalid: "; - try { - validateFields(config, errorPrefix); - } catch (DataValidationException e) { - throw new TbNodeException(e, true); - } - this.pendingMsgs = new ConcurrentHashMap<>(); + this.pendingMsgs = new HashMap<>(); } @Override @@ -86,7 +67,7 @@ public void onMsg(TbContext ctx, TbMsg msg) { ctx.enqueueForTellNext( TbMsg.newMsg( pendingMsg.getQueueName(), - pendingMsg.getInternalType(), + pendingMsg.getType(), pendingMsg.getOriginator(), pendingMsg.getCustomerId(), pendingMsg.getMetaData(), @@ -108,69 +89,25 @@ public void onMsg(TbContext ctx, TbMsg msg) { } private long getDelay(TbMsg msg) { - String timeUnitPattern = TbNodeUtils.processPattern(config.getTimeUnit(), msg); - String periodPattern = TbNodeUtils.processPattern(config.getPeriod(), msg); - try { - TimeUnit timeUnit = TimeUnit.valueOf(timeUnitPattern.toUpperCase()); - if (!supportedTimeUnits.contains(timeUnit)) { - throw new RuntimeException("Time unit '" + timeUnit + "' is not supported! " + - "Only " + supportedTimeUnitsStr + " are supported."); + int periodInSeconds; + if (config.isUseMetadataPeriodInSecondsPatterns()) { + if (isParsable(msg, config.getPeriodInSecondsPattern())) { + periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg)); + } else { + throw new RuntimeException("Can't parse period in seconds from metadata using pattern: " + config.getPeriodInSecondsPattern()); } - int period = Integer.parseInt(periodPattern); - return timeUnit.toMillis(period); - } catch (NumberFormatException e) { - throw new NumberFormatException("Can't parse period value : " + periodPattern); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Invalid value for period time unit : " + timeUnitPattern); + } else { + periodInSeconds = config.getPeriodInSeconds(); } + return TimeUnit.SECONDS.toMillis(periodInSeconds); } - @Override - public void destroy() { - pendingMsgs.clear(); + private boolean isParsable(TbMsg msg, String pattern) { + return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg)); } @Override - public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { - boolean hasChanges = false; - switch (fromVersion) { - case 0: - var periodInSeconds = "periodInSeconds"; - var periodInSecondsPattern = "periodInSecondsPattern"; - var useMetadataPeriodInSecondsPatterns = "useMetadataPeriodInSecondsPatterns"; - var period = "period"; - if (oldConfiguration.has(useMetadataPeriodInSecondsPatterns)) { - var isUsedPattern = oldConfiguration.get(useMetadataPeriodInSecondsPatterns).booleanValue(); - if (isUsedPattern) { - if (!oldConfiguration.has(periodInSecondsPattern)) { - throw new TbNodeException("Property to update: '" + periodInSecondsPattern + "' does not exist in configuration."); - } - ((ObjectNode) oldConfiguration).set(period, oldConfiguration.get(periodInSecondsPattern)); - } else { - if (!oldConfiguration.has(periodInSeconds)) { - throw new TbNodeException("Property to update: '" + periodInSeconds + "' does not exist in configuration."); - } - ((ObjectNode) oldConfiguration).put(period, oldConfiguration.get(periodInSeconds).asText()); - } - hasChanges = true; - } else if (oldConfiguration.has(periodInSeconds)) { - ((ObjectNode) oldConfiguration).put(period, oldConfiguration.get(periodInSeconds).asText()); - hasChanges = true; - } - if (!oldConfiguration.has(period)) { - ((ObjectNode) oldConfiguration).put(period, "60"); - hasChanges = true; - } - var timeUnit = "timeUnit"; - if (!oldConfiguration.has(timeUnit)) { - ((ObjectNode) oldConfiguration).put(timeUnit, TimeUnit.SECONDS.name()); - hasChanges = true; - } - ((ObjectNode) oldConfiguration).remove(List.of(periodInSeconds, periodInSecondsPattern, useMetadataPeriodInSecondsPatterns)); - break; - default: - break; - } - return new TbPair<>(hasChanges, oldConfiguration); + public void destroy() { + pendingMsgs.clear(); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java index 56df2c8aea8..f35552de188 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java @@ -15,31 +15,23 @@ */ package org.thingsboard.rule.engine.delay; -import jakarta.validation.constraints.Max; -import jakarta.validation.constraints.Min; -import jakarta.validation.constraints.NotNull; import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration; -import java.util.concurrent.TimeUnit; - @Data public class TbMsgDelayNodeConfiguration implements NodeConfiguration { - @NotNull - private String period; - @NotNull - private String timeUnit; - @Min(1) - @Max(100000) + private int periodInSeconds; private int maxPendingMsgs; + private String periodInSecondsPattern; + private boolean useMetadataPeriodInSecondsPatterns; @Override public TbMsgDelayNodeConfiguration defaultConfiguration() { TbMsgDelayNodeConfiguration configuration = new TbMsgDelayNodeConfiguration(); - configuration.setPeriod("60"); - configuration.setTimeUnit(TimeUnit.SECONDS.name()); + configuration.setPeriodInSeconds(60); configuration.setMaxPendingMsgs(1000); + configuration.setUseMetadataPeriodInSecondsPatterns(false); return configuration; } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java deleted file mode 100644 index 23fb69d8b70..00000000000 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeTest.java +++ /dev/null @@ -1,343 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.rule.engine.delay; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.test.util.ReflectionTestUtils; -import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; -import org.thingsboard.rule.engine.api.TbNodeConfiguration; -import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.RuleNodeId; -import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.data.msg.TbNodeConnectionType; -import org.thingsboard.server.common.data.rule.RuleNode; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgMetaData; - -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatNoException; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.BDDMockito.given; -import static org.mockito.BDDMockito.spy; -import static org.mockito.BDDMockito.then; -import static org.mockito.BDDMockito.willAnswer; - -@ExtendWith(MockitoExtension.class) -public class TbMsgDelayNodeTest extends AbstractRuleNodeUpgradeTest { - - private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("20107cf0-1c5e-4ac4-8131-7c466c955a7c")); - private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1be24225-b669-4b26-ab7e-083aaa82d0a0")); - - private final Set supportedTimeUnits = EnumSet.of(TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS); - private final String supportedTimeUnitsStr = supportedTimeUnits.stream().map(TimeUnit::name).collect(Collectors.joining(", ")); - - private TbMsgDelayNode node; - private TbMsgDelayNodeConfiguration config; - - @Mock - private TbContext ctxMock; - @Mock - private RuleNode ruleNodeMock; - - @BeforeEach - public void setUp() { - node = spy(new TbMsgDelayNode()); - config = new TbMsgDelayNodeConfiguration().defaultConfiguration(); - } - - @Test - public void verifyDefaultConfig() { - assertThat(config.getPeriod()).isEqualTo("60"); - assertThat(config.getMaxPendingMsgs()).isEqualTo(1000); - assertThat(config.getTimeUnit()).isEqualTo(TimeUnit.SECONDS.name()); - } - - @Test - public void givenDefaultConfig_whenInit_thenOk() { - given(ctxMock.getSelf()).willReturn(ruleNodeMock); - assertThatNoException().isThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))); - } - - @ParameterizedTest - @ValueSource(ints = {-1, 0, 5000000}) - public void givenInvalidMaxPendingMsgsValue_whenInit_thenThrowsException(int maxPendingMsgs) { - config.setMaxPendingMsgs(maxPendingMsgs); - verifyValidationExceptionOnInit(); - } - - @Test - public void givenPeriodIsNull_whenInit_thenThrowsException() { - config.setPeriod(null); - verifyValidationExceptionOnInit(); - } - - @Test - public void givenTimeUnitIsNull_whenInit_thenThrowsException() { - config.setTimeUnit(null); - verifyValidationExceptionOnInit(); - } - - @ParameterizedTest - @MethodSource - public void givenPeriodValueAndPeriodTimeUnitPatterns_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext( - String periodPattern, String timeUnitPattern, TbMsgMetaData metaData, String data, long expectedDelay) throws TbNodeException { - config.setPeriod(periodPattern); - config.setTimeUnit(timeUnitPattern); - given(ctxMock.getSelf()).willReturn(ruleNodeMock); - - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - - var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, data); - var tickMsg = TbMsg.newMsg(TbMsgType.DELAY_TIMEOUT_SELF_MSG, RULE_NODE_ID, TbMsgMetaData.EMPTY, msg.getId().toString()); - - given(ctxMock.newMsg(any(), any(TbMsgType.class), any(), any(), any(), any())).willReturn(tickMsg); - given(ctxMock.getSelfId()).willReturn(RULE_NODE_ID); - willAnswer(invocation -> { - node.onMsg(ctxMock, invocation.getArgument(0)); - return null; - }).given(ctxMock).tellSelf(any(TbMsg.class), any(Long.class)); - - node.onMsg(ctxMock, msg); - - then(ctxMock).should().tellSelf(tickMsg, expectedDelay); - then(ctxMock).should().ack(msg); - ArgumentCaptor actualMsg = ArgumentCaptor.forClass(TbMsg.class); - then(ctxMock).should().enqueueForTellNext(actualMsg.capture(), eq(TbNodeConnectionType.SUCCESS)); - assertThat(actualMsg.getValue()).usingRecursiveComparison().ignoringFields("id", "ts").isEqualTo(msg); - } - - private static Stream givenPeriodValueAndPeriodTimeUnitPatterns_whenOnMsg_thenTellSelfTickMsgAndEnqueueForTellNext() { - return Stream.of( - Arguments.of("1", "HOURS", TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT, TimeUnit.HOURS.toMillis(1L)), - Arguments.of("${md-period}", "${md-time-unit}", - new TbMsgMetaData(Map.of( - "md-period", "5", - "md-time-unit", "MINUTES" - )), TbMsg.EMPTY_JSON_OBJECT, TimeUnit.MINUTES.toMillis(5L)), - Arguments.of("$[msg-period]", "$[msg-time-unit]", TbMsgMetaData.EMPTY, - "{\"msg-period\":10,\"msg-time-unit\":\"SECONDS\"}", TimeUnit.SECONDS.toMillis(10L)) - ); - } - - @ParameterizedTest - @EnumSource(TimeUnit.class) - public void givenTimeUnit_whenOnMsg_thenVerify(TimeUnit timeUnit) throws TbNodeException { - config.setTimeUnit(timeUnit.name()); - given(ctxMock.getSelf()).willReturn(ruleNodeMock); - - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); - if (supportedTimeUnits.contains(timeUnit)) { - assertThatNoException().isThrownBy(() -> node.onMsg(ctxMock, msg)); - } else { - assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) - .isInstanceOf(RuntimeException.class) - .hasMessage("Time unit '" + timeUnit + "' is not supported! Only " + supportedTimeUnitsStr + " are supported."); - } - } - - @Test - public void givenPeriodIsUnparsable_whenOnMsg_thenThrowsException() throws TbNodeException { - config.setPeriod("five"); - given(ctxMock.getSelf()).willReturn(ruleNodeMock); - - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); - assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) - .isInstanceOf(NumberFormatException.class) - .hasMessage("Can't parse period value : five"); - } - - @Test - public void givenInvalidTimeUnit_whenOnMsg_thenThrowsException() throws TbNodeException { - config.setTimeUnit("sec"); - given(ctxMock.getSelf()).willReturn(ruleNodeMock); - - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); - assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid value for period time unit : sec"); - } - - @Test - public void givenMaxLimitOfPendingMsgsReached_whenOnMsg_thenTellFailure() throws TbNodeException { - config.setMaxPendingMsgs(1); - given(ctxMock.getSelf()).willReturn(ruleNodeMock); - - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); - for (int i = 0; i < 2; i++) { - node.onMsg(ctxMock, msg); - } - - ArgumentCaptor throwable = ArgumentCaptor.forClass(Throwable.class); - then(ctxMock).should().tellFailure(eq(msg), throwable.capture()); - assertThat(throwable.getValue()).isInstanceOf(RuntimeException.class).hasMessage("Max limit of pending messages reached!"); - } - - @Test - public void verifyDestroyMethod() { - var msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); - var pendingMsgs = new ConcurrentHashMap<>(); - pendingMsgs.put(UUID.fromString("321f0301-9bed-4e7d-b92f-a978f53ec5d6"), msg); - ReflectionTestUtils.setField(node, "pendingMsgs", pendingMsgs); - var actualPendingMsgs = (Map) ReflectionTestUtils.getField(node, "pendingMsgs"); - assertThat(actualPendingMsgs).isEqualTo(pendingMsgs); - - node.destroy(); - - assertThat(actualPendingMsgs).isEmpty(); - } - - private void verifyValidationExceptionOnInit() { - RuleNode ruleNode = new RuleNode(); - ruleNode.setName("test"); - given(ctxMock.getSelf()).willReturn(ruleNode); - String errorPrefix = "'test' node configuration is invalid: "; - assertThatThrownBy(() -> node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)))) - .isInstanceOf(TbNodeException.class) - .hasMessageContaining(errorPrefix) - .extracting(e -> ((TbNodeException) e).isUnrecoverable()) - .isEqualTo(true); - } - - private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { - return Stream.of( - // config for version 1 with upgrade from version 0 (useMetadataPeriodInSecondsPatterns does not exist and periodInSeconds exists) - Arguments.of(0, - """ - { - "periodInSeconds": 13, - "maxPendingMsgs": 1000, - "periodInSecondsPattern": "17" - } - """, - true, - """ - { - "period": "13", - "timeUnit": "SECONDS", - "maxPendingMsgs": 1000 - } - """ - ), - // config for version 1 with upgrade from version 0 (useMetadataPeriodInSecondsPatterns and periodInSeconds do not exist) - Arguments.of(0, - """ - { - "maxPendingMsgs": 1000, - "periodInSecondsPattern": "17" - } - """, - true, - """ - { - "period": "60", - "timeUnit": "SECONDS", - "maxPendingMsgs": 1000 - } - """ - ), - // config for version 1 with upgrade from version 0 (useMetadataPeriodInSecondsPatterns is false) - Arguments.of(0, - """ - { - "periodInSeconds": 60, - "maxPendingMsgs": 1000, - "periodInSecondsPattern": null, - "useMetadataPeriodInSecondsPatterns": false - } - """, - true, - """ - { - "period": "60", - "timeUnit": "SECONDS", - "maxPendingMsgs": 1000 - } - """ - ), - // config for version 1 with upgrade from version 0 (useMetadataPeriodInSecondsPattern is true) - Arguments.of(0, - """ - { - "periodInSeconds": 60, - "maxPendingMsgs": 1000, - "periodInSecondsPattern": "${period-pattern}", - "useMetadataPeriodInSecondsPatterns": true - } - """, - true, - """ - { - "period": "${period-pattern}", - "timeUnit": "SECONDS", - "maxPendingMsgs": 1000 - } - """ - ), - // config for version 1 with upgrade from version 0 (hasChanges is false) - Arguments.of(0, - """ - { - "period": "${period-pattern}", - "timeUnit": "SECONDS", - "maxPendingMsgs": 1000 - } - """, - false, - """ - { - "period": "${period-pattern}", - "timeUnit": "SECONDS", - "maxPendingMsgs": 1000 - } - """ - ) - ); - } - - @Override - protected TbNode getTestNode() { - return node; - } -}