Skip to content

Commit

Permalink
Merge remote-tracking branch 'ce/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
volodymyr-babak committed Sep 18, 2024
2 parents dbaee10 + fb54dbc commit fca4678
Show file tree
Hide file tree
Showing 59 changed files with 2,807 additions and 1,213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public final class EdgeGrpcSession implements Closeable {
private static final ReentrantLock downlinkMsgLock = new ReentrantLock();
private static final ConcurrentLinkedQueue<EdgeEvent> highPriorityQueue = new ConcurrentLinkedQueue<>();

private static final int MAX_DOWNLINK_ATTEMPTS = 10; // max number of attemps to send downlink message if edge connected
private static final int MAX_DOWNLINK_ATTEMPTS = 10; // max number of attempts to send downlink message if edge connected

private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.service.entitiy.AbstractTbEntityService;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

@Service
@AllArgsConstructor
Expand Down Expand Up @@ -176,20 +173,20 @@ public AlarmInfo unassign(Alarm alarm, long unassignTs, User user) throws Things
}

@Override
public List<AlarmId> unassignDeletedUserAlarms(TenantId tenantId, UserId userId, String userTitle, long unassignTs) {
List<AlarmId> totalAlarmIds = new ArrayList<>();
PageLink pageLink = new PageLink(100, 0, null, new SortOrder("id", SortOrder.Direction.ASC));
while (true) {
PageData<AlarmId> pageData = alarmService.findAlarmIdsByAssigneeId(tenantId, userId, pageLink);
List<AlarmId> alarmIds = pageData.getData();
if (alarmIds.isEmpty()) {
break;
public void unassignDeletedUserAlarms(TenantId tenantId, UserId userId, String userTitle, List<UUID> alarms, long unassignTs) {
for (UUID alarmId : alarms) {
log.trace("[{}] Unassigning alarm {} from user {}", tenantId, alarmId, userId);
AlarmApiCallResult result = alarmSubscriptionService.unassignAlarm(tenantId, new AlarmId(alarmId), unassignTs);
if (!result.isSuccessful()) {
log.error("[{}] Cannot unassign alarm {} from user {}", tenantId, alarmId, userId);
continue;
}
if (result.isModified()) {
String comment = String.format("Alarm was unassigned because user %s - was deleted", userTitle);
addSystemAlarmComment(result.getAlarm(), null, "ASSIGN", comment);
logEntityActionService.logEntityAction(result.getAlarm().getTenantId(), result.getAlarm().getOriginator(), result.getAlarm(), result.getAlarm().getCustomerId(), ActionType.ALARM_UNASSIGNED, null);
}
processAlarmsUnassignment(tenantId, userId, userTitle, alarmIds, unassignTs);
totalAlarmIds.addAll(alarmIds);
pageLink = pageLink.nextPageLink();
}
return totalAlarmIds;
}

@Override
Expand All @@ -204,22 +201,6 @@ private static long getOrDefault(long ts) {
return ts > 0 ? ts : System.currentTimeMillis();
}

private void processAlarmsUnassignment(TenantId tenantId, UserId userId, String userTitle, List<AlarmId> alarmIds, long unassignTs) {
for (AlarmId alarmId : alarmIds) {
log.trace("[{}] Unassigning alarm {} userId {}", tenantId, alarmId, userId);
AlarmApiCallResult result = alarmSubscriptionService.unassignAlarm(tenantId, alarmId, unassignTs);
if (!result.isSuccessful()) {
log.error("[{}] Cannot unassign alarm {} userId {}", tenantId, alarmId, userId);
continue;
}
if (result.isModified()) {
String comment = String.format("Alarm was unassigned because user %s - was deleted", userTitle);
addSystemAlarmComment(result.getAlarm(), null, "ASSIGN", comment);
logEntityActionService.logEntityAction(result.getAlarm().getTenantId(), result.getAlarm().getOriginator(), result.getAlarm(), result.getAlarm().getCustomerId(), ActionType.ALARM_UNASSIGNED, null);
}
}
}

private void addSystemAlarmComment(Alarm alarm, User user, String subType, String commentText) {
addSystemAlarmComment(alarm, user, subType, commentText, null);
}
Expand All @@ -245,4 +226,5 @@ private void addSystemAlarmComment(Alarm alarm, User user, String subType, Strin
log.error("Failed to save alarm comment", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;

import java.util.List;
import java.util.UUID;

public interface TbAlarmService {

Expand All @@ -41,7 +41,7 @@ public interface TbAlarmService {

AlarmInfo unassign(Alarm alarm, long unassignTs, User user) throws ThingsboardException;

List<AlarmId> unassignDeletedUserAlarms(TenantId tenantId, UserId userId, String userTitle, long unassignTs);
void unassignDeletedUserAlarms(TenantId tenantId, UserId userId, String userTitle, List<UUID> alarms, long unassignTs);

Boolean delete(Alarm alarm, User user);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,46 @@
import org.thingsboard.server.common.data.housekeeper.AlarmsUnassignHousekeeperTask;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.service.entitiy.alarm.TbAlarmService;

import java.util.List;
import java.util.UUID;

@Component
@RequiredArgsConstructor
@Slf4j
public class AlarmsUnassignTaskProcessor extends HousekeeperTaskProcessor<AlarmsUnassignHousekeeperTask> {

private final TbAlarmService alarmService;
private final TbAlarmService tbAlarmService;
private final AlarmService alarmService;

@Override
public void process(AlarmsUnassignHousekeeperTask task) throws Exception {
List<AlarmId> alarms = alarmService.unassignDeletedUserAlarms(task.getTenantId(), (UserId) task.getEntityId(), task.getUserTitle(), task.getTs());
log.debug("[{}][{}] Unassigned {} alarms", task.getTenantId(), task.getEntityId(), alarms.size());
TenantId tenantId = task.getTenantId();
UserId userId = (UserId) task.getEntityId();
if (task.getAlarms() == null) {
AlarmId lastId = null;
long lastCreatedTime = 0;
while (true) {
List<TbPair<UUID, Long>> alarms = alarmService.findAlarmIdsByAssigneeId(tenantId, userId, lastCreatedTime, lastId, 64);
if (alarms.isEmpty()) {
break;
}
housekeeperClient.submitTask(new AlarmsUnassignHousekeeperTask(tenantId, userId, task.getUserTitle(), alarms.stream().map(TbPair::getFirst).toList()));

TbPair<UUID, Long> last = alarms.get(alarms.size() - 1);
lastId = new AlarmId(last.getFirst());
lastCreatedTime = last.getSecond();
log.debug("[{}][{}] Submitted task for unassigning {} alarms", tenantId, userId, alarms.size());
}
} else {
tbAlarmService.unassignDeletedUserAlarms(tenantId, userId, task.getUserTitle(), task.getAlarms(), task.getTs());
log.debug("[{}][{}] Unassigned {} alarms", tenantId, userId, task.getAlarms().size());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,21 @@ public void upgradeDatabase(String fromVersion) throws Exception {
updateSchema("3.6.4", 3006004, "3.7.0", 3007000, null);
break;
case "3.7.0":
updateSchema("3.7.0", 3007000, "3.7.1", 3007001, null);
updateSchema("3.7.0", 3007000, "3.7.1", 3007001, connection -> {
try {
connection.createStatement().execute("UPDATE rule_node SET " +
"configuration = CASE " +
" WHEN (configuration::jsonb ->> 'persistAlarmRulesState') = 'false'" +
" THEN (configuration::jsonb || '{\"fetchAlarmRulesStateOnStart\": \"false\"}'::jsonb)::varchar " +
" ELSE configuration " +
"END, " +
"configuration_version = 1 " +
"WHERE type = 'org.thingsboard.rule.engine.profile.TbDeviceProfileNode' " +
"AND configuration_version < 1;");
} catch (Exception e) {
log.warn("Failed to execute update script for device profile rule nodes due to: ", e);
}
});
break;
default:
throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public void onTimeSeriesUpdate(EntityId entityId, List<TsKvEntry> data, TbCallba
}

private void onTimeSeriesUpdate(UUID entityId, List<TsKvEntry> data, TbCallback callback) {
entityUpdates.get(entityId).timeSeriesUpdateTs = System.currentTimeMillis();
getEntityUpdatesInfo(entityId).timeSeriesUpdateTs = System.currentTimeMillis();
processSubscriptionData(entityId,
sub -> TbSubscriptionType.TIMESERIES.equals(sub.getType()),
s -> {
Expand Down Expand Up @@ -371,7 +371,7 @@ public void onAttributesUpdate(EntityId entityId, String scope, List<TsKvEntry>
}

private void onAttributesUpdate(UUID entityId, String scope, List<TsKvEntry> data, TbCallback callback) {
entityUpdates.get(entityId).attributesUpdateTs = System.currentTimeMillis();
getEntityUpdatesInfo(entityId).attributesUpdateTs = System.currentTimeMillis();
processSubscriptionData(entityId,
sub -> TbSubscriptionType.ATTRIBUTES.equals(sub.getType()),
s -> {
Expand Down Expand Up @@ -639,4 +639,8 @@ private void handleRateLimitError(TbSubscription<?> subscription, WebSocketSessi
throw new TbRateLimitsException(message);
}

private TbEntityUpdatesInfo getEntityUpdatesInfo(UUID entityId) {
return entityUpdates.computeIfAbsent(entityId, id -> new TbEntityUpdatesInfo(0));
}

}
2 changes: 2 additions & 0 deletions application/src/main/resources/tb-edge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,8 @@ transport:
# MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message.
disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}"
msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before the device connected state. This limit works on the low level before TenantProfileLimits mechanism
# Interval of periodic report of the gateway metrics
gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:60}"
netty:
# Netty leak detector level
leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.edge.EdgeService;
Expand All @@ -46,7 +45,6 @@
import org.thingsboard.server.service.sync.vc.EntitiesVersionControlService;
import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -164,16 +162,13 @@ public void testUnassignDeletedUserAlarms() throws ThingsboardException {
AlarmInfo alarm = new AlarmInfo();
alarm.setId(new AlarmId(UUID.randomUUID()));

when(alarmService.findAlarmIdsByAssigneeId(any(), any(), any()))
.thenReturn(new PageData<>(List.of(alarm.getId()), 0, 1, false))
.thenReturn(new PageData<>(Collections.EMPTY_LIST, 0, 0, false));
when(alarmSubscriptionService.unassignAlarm(any(), any(), anyLong()))
.thenReturn(AlarmApiCallResult.builder().successful(true).modified(true).alarm(alarm).build());

User user = new User();
user.setEmail("testEmail@gmail.com");
user.setId(new UserId(UUID.randomUUID()));
service.unassignDeletedUserAlarms(new TenantId(UUID.randomUUID()), user.getId(), user.getTitle(), System.currentTimeMillis());
service.unassignDeletedUserAlarms(new TenantId(UUID.randomUUID()), user.getId(), user.getTitle(), List.of(alarm.getUuidId()), System.currentTimeMillis());

ObjectNode commentNode = JacksonUtil.newObjectNode();
commentNode.put("subtype", "ASSIGN");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.alarm.AlarmDao;
import org.thingsboard.server.dao.alarm.AlarmService;
Expand Down Expand Up @@ -184,23 +184,32 @@ public void whenUserIsDeleted_thenCleanUpRelatedData() throws Exception {
Device device = createDevice("test", "test");
UserId userId = customerUserId;
createRelatedData(userId);
Alarm alarm = Alarm.builder()
.type("test")
.tenantId(tenantId)
.originator(device.getId())
.severity(AlarmSeverity.MAJOR)
.build();
alarm = doPost("/api/alarm", alarm, Alarm.class);
AlarmId alarmId = alarm.getId();
alarm = doPost("/api/alarm/" + alarmId + "/assign/" + userId, "", Alarm.class);
assertThat(alarm.getAssigneeId()).isEqualTo(userId);
assertThat(alarmService.findAlarmIdsByAssigneeId(tenantId, userId, new PageLink(100)).getData()).isNotEmpty();

List<AlarmId> alarms = new ArrayList<>();
int count = 112;
for (int i = 0; i < count; i++) {
Alarm alarm = Alarm.builder()
.type("test" + i)
.tenantId(tenantId)
.originator(device.getId())
.severity(AlarmSeverity.MAJOR)
.build();
alarm = doPost("/api/alarm", alarm, Alarm.class);
AlarmId alarmId = alarm.getId();
alarm = doPost("/api/alarm/" + alarmId + "/assign/" + userId, "", Alarm.class);
assertThat(alarm.getAssigneeId()).isEqualTo(userId);
alarms.add(alarmId);
}
List<AlarmId> assignedAlarms = alarmService.findAlarmIdsByAssigneeId(tenantId, userId, 0, null, 5000).stream()
.map(TbPair::getFirst).map(AlarmId::new).toList();
assertThat(assignedAlarms).size().isEqualTo(count);
assertThat(assignedAlarms).containsAll(alarms);

doDelete("/api/user/" + userId).andExpect(status().isOk());

await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
verifyNoRelatedData(userId);
assertThat(alarmService.findAlarmById(tenantId, alarmId).getAssigneeId()).isNull();
assertThat(alarmService.findAlarmIdsByAssigneeId(tenantId, userId, 0, null, 5000)).size().isZero();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,27 +411,36 @@ private static void awaitClientDestroy(LeshanClient leshanClient) {
}

protected void awaitObserveReadAll(int cntObserve, String deviceIdStr) throws Exception {
await("ObserveReadAll after start client/test: countObserve " + cntObserve)
await("ObserveReadAll: countObserve " + cntObserve)
.atMost(40, TimeUnit.SECONDS)
.until(() -> cntObserve == getCntObserveAll(deviceIdStr));
}

protected Integer getCntObserveAll(String deviceIdStr) throws Exception {
String actualResultBefore = sendObserve("ObserveReadAll", null, deviceIdStr);
ObjectNode rpcActualResultBefore = JacksonUtil.fromString(actualResultBefore, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResultBefore.get("result").asText());
JsonElement element = JsonUtils.parse(rpcActualResultBefore.get("value").asText());
String actualResult = sendObserveOK("ObserveReadAll", null, deviceIdStr);
ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
JsonElement element = JsonUtils.parse(rpcActualResult.get("value").asText());
return element.isJsonArray() ? ((JsonArray)element).size() : null;
}

protected void sendCancelObserveAllWithAwait(String deviceIdStr) throws Exception {
String actualResultCancelAll = sendObserve("ObserveCancelAll", null, deviceIdStr);
protected void sendObserveCancelAllWithAwait(String deviceIdStr) throws Exception {
String actualResultCancelAll = sendObserveOK("ObserveCancelAll", null, deviceIdStr);
ObjectNode rpcActualResultCancelAll = JacksonUtil.fromString(actualResultCancelAll, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResultCancelAll.get("result").asText());
awaitObserveReadAll(0, deviceId);
}

protected String sendObserve(String method, String params, String deviceIdStr) throws Exception {
protected String sendRpcObserveOkWithResultValue(String method, String params) throws Exception {
String actualResultReadAll = sendRpcObserveOk(method, params);
ObjectNode rpcActualResult = JacksonUtil.fromString(actualResultReadAll, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
return rpcActualResult.get("value").asText();
}
protected String sendRpcObserveOk(String method, String params) throws Exception {
return sendObserveOK(method, params, deviceId);
}
protected String sendObserveOK(String method, String params, String deviceIdStr) throws Exception {
String sendRpcRequest;
if (params == null) {
sendRpcRequest = "{\"method\": \"" + method + "\"}";
Expand All @@ -442,4 +451,9 @@ protected String sendObserve(String method, String params, String deviceIdStr) t
return doPostAsync("/api/plugins/rpc/twoway/" + deviceIdStr, sendRpcRequest, String.class, status().isOk());
}

protected ObjectNode sendRpcObserveWithResult(String method, String params) throws Exception {
String actualResultReadAll = sendRpcObserveOk(method, params);
return JacksonUtil.fromString(actualResultReadAll, ObjectNode.class);
}

}
Loading

0 comments on commit fca4678

Please sign in to comment.