Skip to content

Commit

Permalink
Merge pull request #10977 from thingsboard/feature/attr_tskv_version
Browse files Browse the repository at this point in the history
Add versioning for attributes, latest timeseries and relations
  • Loading branch information
ashvayka authored Aug 2, 2024
2 parents 5443897 + dfd417b commit ea662bc
Show file tree
Hide file tree
Showing 104 changed files with 2,537 additions and 1,074 deletions.
16 changes: 16 additions & 0 deletions application/src/main/data/upgrade/3.7.0/schema_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,19 @@
-- limitations under the License.
--

-- KV VERSIONING UPDATE START

CREATE SEQUENCE IF NOT EXISTS attribute_kv_version_seq cache 1;
CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1;

ALTER TABLE attribute_kv ADD COLUMN version bigint default 0;
ALTER TABLE ts_kv_latest ADD COLUMN version bigint default 0;

-- KV VERSIONING UPDATE END

-- RELATION VERSIONING UPDATE START

CREATE SEQUENCE IF NOT EXISTS relation_version_seq cache 1;
ALTER TABLE relation ADD COLUMN version bigint default 0;

-- RELATION VERSIONING UPDATE END
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,31 @@ public class EntityRelationController extends BaseController {
@RequestMapping(value = "/relation", method = RequestMethod.POST)
@ResponseStatus(value = HttpStatus.OK)
public void saveRelation(@Parameter(description = "A JSON value representing the relation.", required = true)
@RequestBody EntityRelation relation) throws ThingsboardException {
@RequestBody EntityRelation relation) throws ThingsboardException {
doSave(relation);
}

@ApiOperation(value = "Create Relation (saveRelationV2)",
notes = "Creates or updates a relation between two entities in the platform. " +
"Relations unique key is a combination of from/to entity id and relation type group and relation type. " +
SECURITY_CHECKS_ENTITIES_DESCRIPTION)
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/v2/relation", method = RequestMethod.POST)
@ResponseStatus(value = HttpStatus.OK)
public EntityRelation saveRelationV2(@Parameter(description = "A JSON value representing the relation.", required = true)
@RequestBody EntityRelation relation) throws ThingsboardException {
return doSave(relation);
}

private EntityRelation doSave(EntityRelation relation) throws ThingsboardException {
checkNotNull(relation);
checkCanCreateRelation(relation.getFrom());
checkCanCreateRelation(relation.getTo());
if (relation.getTypeGroup() == null) {
relation.setTypeGroup(RelationTypeGroup.COMMON);
}

tbEntityRelationService.save(getTenantId(), getCurrentUser().getCustomerId(), relation, getCurrentUser());
return tbEntityRelationService.save(getTenantId(), getCurrentUser().getCustomerId(), relation, getCurrentUser());
}

@ApiOperation(value = "Delete Relation (deleteRelation)",
Expand All @@ -101,6 +117,24 @@ public void deleteRelation(@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION,
@Parameter(description = RELATION_TYPE_GROUP_PARAM_DESCRIPTION) @RequestParam(value = "relationTypeGroup", required = false) String strRelationTypeGroup,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) @RequestParam(TO_ID) String strToId,
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) @RequestParam(TO_TYPE) String strToType) throws ThingsboardException {
doDelete(strFromId, strFromType, strRelationType, strRelationTypeGroup, strToId, strToType);
}

@ApiOperation(value = "Delete Relation (deleteRelationV2)",
notes = "Deletes a relation between two entities in the platform. " + SECURITY_CHECKS_ENTITIES_DESCRIPTION)
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/v2/relation", method = RequestMethod.DELETE, params = {FROM_ID, FROM_TYPE, RELATION_TYPE, TO_ID, TO_TYPE})
@ResponseStatus(value = HttpStatus.OK)
public EntityRelation deleteRelationV2(@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) @RequestParam(FROM_ID) String strFromId,
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) @RequestParam(FROM_TYPE) String strFromType,
@Parameter(description = RELATION_TYPE_PARAM_DESCRIPTION, required = true) @RequestParam(RELATION_TYPE) String strRelationType,
@Parameter(description = RELATION_TYPE_GROUP_PARAM_DESCRIPTION) @RequestParam(value = "relationTypeGroup", required = false) String strRelationTypeGroup,
@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) @RequestParam(TO_ID) String strToId,
@Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true) @RequestParam(TO_TYPE) String strToType) throws ThingsboardException {
return doDelete(strFromId, strFromType, strRelationType, strRelationTypeGroup, strToId, strToType);
}

private EntityRelation doDelete(String strFromId, String strFromType, String strRelationType, String strRelationTypeGroup, String strToId, String strToType) throws ThingsboardException {
checkParameter(FROM_ID, strFromId);
checkParameter(FROM_TYPE, strFromType);
checkParameter(RELATION_TYPE, strRelationType);
Expand All @@ -113,7 +147,7 @@ public void deleteRelation(@Parameter(description = ENTITY_ID_PARAM_DESCRIPTION,

RelationTypeGroup relationTypeGroup = parseRelationTypeGroup(strRelationTypeGroup, RelationTypeGroup.COMMON);
EntityRelation relation = new EntityRelation(fromId, toId, strRelationType, relationTypeGroup);
tbEntityRelationService.delete(getTenantId(), getCurrentUser().getCustomerId(), relation, getCurrentUser());
return tbEntityRelationService.delete(getTenantId(), getCurrentUser().getCustomerId(), relation, getCurrentUser());
}

@ApiOperation(value = "Delete common relations (deleteCommonRelations)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private DeviceCredentials updateDeviceCredentials(TenantId tenantId, DeviceCrede
return deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
}

private ListenableFuture<List<String>> saveProvisionStateAttribute(Device device) {
private ListenableFuture<List<Long>> saveProvisionStateAttribute(Device device) {
return attributesService.save(device.getTenantId(), device.getId(), AttributeScope.SERVER_SCOPE,
Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE),
System.currentTimeMillis())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ ListenableFuture<Boolean> processEdgeEvents() throws Exception {
@Override
public void onSuccess(@Nullable Pair<Long, Long> newStartTsAndSeqId) {
if (newStartTsAndSeqId != null) {
ListenableFuture<List<String>> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId);
ListenableFuture<List<Long>> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId);
Futures.addCallback(updateFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<String> list) {
public void onSuccess(@Nullable List<Long> list) {
log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId);
if (fetcher.isSeqIdNewCycleStarted()) {
seqIdEnd = fetcher.getSeqIdEnd();
Expand Down Expand Up @@ -626,7 +626,7 @@ private long findStartSeqIdFromOldestEventIfAny() {
return startSeqId;
}

private ListenableFuture<List<String>> updateQueueStartTsAndSeqId(Pair<Long, Long> pair) {
private ListenableFuture<List<Long>> updateQueueStartTsAndSeqId(Pair<Long, Long> pair) {
this.newStartTs = pair.getFirst();
this.newStartSeqId = pair.getSecond();
log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", this.sessionId, edge.getId(), this.newStartTs, this.newStartSeqId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ public class DefaultTbEntityRelationService extends AbstractTbEntityService impl
private final RelationService relationService;

@Override
public void save(TenantId tenantId, CustomerId customerId, EntityRelation relation, User user) throws ThingsboardException {
public EntityRelation save(TenantId tenantId, CustomerId customerId, EntityRelation relation, User user) throws ThingsboardException {
ActionType actionType = ActionType.RELATION_ADD_OR_UPDATE;
try {
relationService.saveRelation(tenantId, relation);
var savedRelation = relationService.saveRelation(tenantId, relation);
logEntityActionService.logEntityRelationAction(tenantId, customerId,
relation, user, actionType, null, relation);
savedRelation, user, actionType, null, savedRelation);
return savedRelation;
} catch (Exception e) {
logEntityActionService.logEntityRelationAction(tenantId, customerId,
relation, user, actionType, e, relation);
Expand All @@ -53,14 +54,15 @@ public void save(TenantId tenantId, CustomerId customerId, EntityRelation relati
}

@Override
public void delete(TenantId tenantId, CustomerId customerId, EntityRelation relation, User user) throws ThingsboardException {
public EntityRelation delete(TenantId tenantId, CustomerId customerId, EntityRelation relation, User user) throws ThingsboardException {
ActionType actionType = ActionType.RELATION_DELETED;
try {
boolean found = relationService.deleteRelation(tenantId, relation.getFrom(), relation.getTo(), relation.getType(), relation.getTypeGroup());
if (!found) {
var found = relationService.deleteRelation(tenantId, relation.getFrom(), relation.getTo(), relation.getType(), relation.getTypeGroup());
if (found == null) {
throw new ThingsboardException("Requested item wasn't found!", ThingsboardErrorCode.ITEM_NOT_FOUND);
}
logEntityActionService.logEntityRelationAction(tenantId, customerId, relation, user, actionType, null, relation);
logEntityActionService.logEntityRelationAction(tenantId, customerId, found, user, actionType, null, found);
return found;
} catch (Exception e) {
logEntityActionService.logEntityRelationAction(tenantId, customerId,
relation, user, actionType, e, relation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

public interface TbEntityRelationService {

void save(TenantId tenantId, CustomerId customerId, EntityRelation entity, User user) throws ThingsboardException;
EntityRelation save(TenantId tenantId, CustomerId customerId, EntityRelation entity, User user) throws ThingsboardException;

void delete(TenantId tenantId, CustomerId customerId, EntityRelation entity, User user) throws ThingsboardException;
EntityRelation delete(TenantId tenantId, CustomerId customerId, EntityRelation entity, User user) throws ThingsboardException;

void deleteCommonRelations(TenantId tenantId, CustomerId customerId, EntityId entityId, User user) throws ThingsboardException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ private void save(DeviceId deviceId, String key, boolean value) {
Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L);
addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value));
} else {
ListenableFuture<List<String>> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE,
ListenableFuture<List<Long>> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE,
Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value)
, System.currentTimeMillis())));
addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public void clearCache(String fromVersion) throws Exception {
log.info("Clearing cache to upgrade from version 3.6.4 to 3.7.0");
clearAll();
break;
case "3.7.0":
log.info("Clearing cache to upgrade from version 3.7.0 to 3.7.1");
clearAll();
break;
default:
//Do nothing, since cache cleanup is optional.
}
Expand All @@ -81,7 +85,7 @@ void clearAll() {
if (redisTemplate.isPresent()) {
log.info("Flushing all caches");
redisTemplate.get().execute((RedisCallback<Object>) connection -> {
connection.flushAll();
connection.serverCommands().flushAll();
return null;
});
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,14 @@ public void saveAndNotify(TenantId tenantId, EntityId entityId, AttributeScope s

@Override
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback) {
ListenableFuture<List<String>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
ListenableFuture<List<Long>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
addVoidCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice));
}

@Override
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback) {
ListenableFuture<List<String>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
ListenableFuture<List<Long>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
addVoidCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope.name(), attributes, notifyDevice));
}
Expand All @@ -280,7 +280,7 @@ public void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvE

@Override
public void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> saveFuture = tsService.saveLatest(tenantId, entityId, ts);
ListenableFuture<List<Long>> saveFuture = tsService.saveLatest(tenantId, entityId, ts);
addVoidCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
}
Expand Down
7 changes: 7 additions & 0 deletions application/src/main/resources/thingsboard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ cache:
attributes:
# make sure that if cache.type is 'redis' and cache.attributes.enabled is 'true' if you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random'
enabled: "${CACHE_ATTRIBUTES_ENABLED:true}"
ts_latest:
# Will enable cache-aside strategy for SQL timeseries latest DAO.
# make sure that if cache.type is 'redis' and cache.ts_latest.enabled is 'true' if you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random'
enabled: "${CACHE_TS_LATEST_ENABLED:true}"
specs:
relations:
timeToLiveInMinutes: "${CACHE_SPECS_RELATIONS_TTL:1440}" # Relations cache TTL
Expand Down Expand Up @@ -547,6 +551,9 @@ cache:
attributes:
timeToLiveInMinutes: "${CACHE_SPECS_ATTRIBUTES_TTL:1440}" # Attributes cache TTL
maxSize: "${CACHE_SPECS_ATTRIBUTES_MAX_SIZE:100000}" # 0 means the cache is disabled
tsLatest:
timeToLiveInMinutes: "${CACHE_SPECS_TS_LATEST_TTL:1440}" # Timeseries latest cache TTL
maxSize: "${CACHE_SPECS_TS_LATEST_MAX_SIZE:100000}" # 0 means the cache is disabled
userSessionsInvalidation:
# The value of this TTL is ignored and replaced by the JWT refresh token expiration time
timeToLiveInMinutes: "0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testSaveAndFindRelation() throws Exception {

Mockito.reset(tbClusterService, auditLogService);

doPost("/api/relation", relation).andExpect(status().isOk());
relation = doPost("/api/v2/relation", relation, EntityRelation.class);

String url = String.format("/api/relation?fromId=%s&fromType=%s&relationType=%s&toId=%s&toType=%s",
mainDevice.getUuidId(), EntityType.DEVICE,
Expand Down Expand Up @@ -315,7 +315,7 @@ public void testDeleteRelation() throws Exception {
Device device = buildSimpleDevice("Test device 1");

EntityRelation relation = createFromRelation(mainDevice, device, "CONTAINS");
doPost("/api/relation", relation).andExpect(status().isOk());
relation = doPost("/api/v2/relation", relation, EntityRelation.class);

String url = String.format("/api/relation?fromId=%s&fromType=%s&relationType=%s&toId=%s&toType=%s",
mainDevice.getUuidId(), EntityType.DEVICE,
Expand All @@ -329,11 +329,15 @@ public void testDeleteRelation() throws Exception {

Mockito.reset(tbClusterService, auditLogService);

doDelete(url).andExpect(status().isOk());
String deleteUrl = String.format("/api/v2/relation?fromId=%s&fromType=%s&relationType=%s&toId=%s&toType=%s",
mainDevice.getUuidId(), EntityType.DEVICE,
"CONTAINS", device.getUuidId(), EntityType.DEVICE
);
var deletedRelation = doDelete(deleteUrl, EntityRelation.class);

testNotifyEntityAllOneTimeRelation(foundRelation,
testNotifyEntityAllOneTimeRelation(deletedRelation,
savedTenant.getId(), tenantAdmin.getCustomerId(), tenantAdmin.getId(), tenantAdmin.getEmail(),
ActionType.RELATION_DELETED, foundRelation);
ActionType.RELATION_DELETED, deletedRelation);

doGet(url).andExpect(status().is4xxClientError());
}
Expand Down Expand Up @@ -523,7 +527,7 @@ public void testFindRelationsInfoByToQuery() throws Exception {
@Test
public void testCreateRelationFromTenantToDevice() throws Exception {
EntityRelation relation = new EntityRelation(tenantAdmin.getTenantId(), mainDevice.getId(), "CONTAINS");
doPost("/api/relation", relation).andExpect(status().isOk());
relation = doPost("/api/v2/relation", relation, EntityRelation.class);

String url = String.format("/api/relation?fromId=%s&fromType=%s&relationType=%s&toId=%s&toType=%s",
tenantAdmin.getTenantId(), EntityType.TENANT,
Expand All @@ -539,7 +543,7 @@ public void testCreateRelationFromTenantToDevice() throws Exception {
@Test
public void testCreateRelationFromDeviceToTenant() throws Exception {
EntityRelation relation = new EntityRelation(mainDevice.getId(), tenantAdmin.getTenantId(), "CONTAINS");
doPost("/api/relation", relation).andExpect(status().isOk());
relation = doPost("/api/v2/relation", relation, EntityRelation.class);

String url = String.format("/api/relation?fromId=%s&fromType=%s&relationType=%s&toId=%s&toType=%s",
mainDevice.getUuidId(), EntityType.DEVICE,
Expand Down
Loading

0 comments on commit ea662bc

Please sign in to comment.