Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka cloud events #125

Merged
merged 29 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a442daa
Use Kafka for cloud events in case KAFKA queue
jekka001 Oct 16, 2024
efaea08
Resolve Merge
jekka001 Oct 22, 2024
1d51899
Added tests for Kafka events
jekka001 Oct 21, 2024
8ecc452
Added Conversation Changes
jekka001 Nov 1, 2024
b938521
Added CloudEventMigrationService to retrieve stored cloudEvents from …
jekka001 Nov 1, 2024
98280c6
Fix Licence
jekka001 Nov 1, 2024
9bedc3f
Refactoring PostgresToKafkaCloudEventMigrateService
jekka001 Nov 1, 2024
0a191c3
Change CloudEventMigrationService to CloudEventSync
jekka001 Nov 1, 2024
d464f3d
Fixed Telemetry PerformanceTest
jekka001 Nov 5, 2024
a41df19
Some changes after performance tests
jekka001 Nov 29, 2024
355bed5
Refactoring
jekka001 Dec 3, 2024
e2c3b38
Changes Added From the Conversation
jekka001 Dec 4, 2024
6ac7fce
Merge branch 'master' into kafka-cloud-events
jekka001 Dec 4, 2024
69f1530
Change naming
jekka001 Dec 4, 2024
6c8defe
Fixed Proto
jekka001 Dec 4, 2024
11076b3
Fixed And Refactoring Migration
jekka001 Dec 4, 2024
a0e42a3
Refactoring CloudManagerService; Use prioritized check on boolean for…
volodymyr-babak Dec 20, 2024
c003b9e
Merge remote-tracking branch 'origin/rc' into kafka-cloud-events
volodymyr-babak Dec 20, 2024
ce59287
Fixed TenantClienTest
volodymyr-babak Dec 21, 2024
c9470a3
Fixed TenantClientTest
volodymyr-babak Dec 21, 2024
4660ebc
Migration improvement
AndriiLandiak Dec 23, 2024
5b10e37
Added logs on saving cloud events
volodymyr-babak Dec 23, 2024
68afa2e
Merge remote-tracking branch 'yevhen/kafka-cloud-events' into kafka-c…
AndriiLandiak Dec 24, 2024
750ef83
Minor improvement for MigrationService
AndriiLandiak Dec 24, 2024
3a5ede4
Rename to just sleep()
AndriiLandiak Dec 24, 2024
7d1a5f5
Prevent KafkaMigration from creating new grpc channel
AndriiLandiak Dec 24, 2024
9da4cdd
Fix migration
AndriiLandiak Dec 26, 2024
7ce5404
Improve logging of Edge tests and services
volodymyr-babak Dec 26, 2024
3f84964
processCloudEvent->processCloudEvents @Override
volodymyr-babak Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetInfo;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.cloud.CloudEventType;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.domain.Domain;
Expand Down Expand Up @@ -127,9 +125,9 @@
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.audit.AuditLogService;
import org.thingsboard.server.dao.cloud.CloudEventService;
import org.thingsboard.server.dao.cloud.EdgeSettingsService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.ClaimDevicesService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService;
Expand Down Expand Up @@ -165,16 +163,12 @@
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.entitiy.TbLogEntityActionService;
import org.thingsboard.server.service.entitiy.user.TbUserSettingsService;
import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.AccessControlService;
import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.security.permission.Resource;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.sync.ie.exporting.ExportableEntitiesService;
import org.thingsboard.server.service.sync.vc.EntitiesVersionControlService;
import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;

Expand Down Expand Up @@ -283,9 +277,6 @@ public abstract class BaseController {
@Autowired
protected AuditLogService auditLogService;

@Autowired
protected DeviceStateService deviceStateService;

@Autowired
protected EntityViewService entityViewService;

Expand All @@ -296,10 +287,10 @@ public abstract class BaseController {
protected AttributesService attributesService;

@Autowired
protected ClaimDevicesService claimDevicesService;
jekka001 marked this conversation as resolved.
Show resolved Hide resolved
protected CloudEventService cloudEventService;

@Autowired
protected CloudEventService cloudEventService;
protected EdgeSettingsService edgeSettingsService;

@Autowired
protected PartitionService partitionService;
Expand All @@ -310,9 +301,6 @@ public abstract class BaseController {
@Autowired
protected OtaPackageService otaPackageService;

@Autowired
protected OtaPackageStateService otaPackageStateService;

@Autowired
protected RpcService rpcService;

Expand All @@ -325,9 +313,6 @@ public abstract class BaseController {
@Autowired
protected TbDeviceProfileCache deviceProfileCache;

@Autowired
protected TbAssetProfileCache assetProfileCache;

@Autowired(required = false)
protected EdgeService edgeService;

Expand All @@ -340,9 +325,6 @@ public abstract class BaseController {
@Autowired
protected QueueService queueService;

@Autowired
protected EntitiesVersionControlService vcService;

@Autowired
protected ExportableEntitiesService entitiesService;

Expand All @@ -357,6 +339,10 @@ public abstract class BaseController {
@Getter
protected boolean edgesEnabled;

@Value("${queue.type}")
@Getter
protected String queueType;

@ExceptionHandler(Exception.class)
public void handleControllerException(Exception e, HttpServletResponse response) {
ThingsboardException thingsboardException = handleException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.thingsboard.server.common.data.sync.ie.importing.csv.BulkImportRequest;
import org.thingsboard.server.common.data.sync.ie.importing.csv.BulkImportResult;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.config.annotations.ApiOperation;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
Expand All @@ -74,7 +73,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -473,7 +471,7 @@ public List<EntitySubtype> getEdgeTypes() throws ThingsboardException, Execution
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
@PostMapping(value = "/edge/sync/{edgeId}")
public DeferredResult<ResponseEntity> syncEdge(@Parameter(description = EDGE_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("edgeId") String strEdgeId) throws ThingsboardException {
@PathVariable("edgeId") String strEdgeId) throws ThingsboardException {
checkParameter("edgeId", strEdgeId);
final DeferredResult<ResponseEntity> response = new DeferredResult<>();
if (isEdgesEnabled() && edgeRpcServiceOpt.isPresent()) {
Expand Down Expand Up @@ -565,7 +563,7 @@ public EdgeSettings getEdgeSettings() throws ThingsboardException {
try {
SecurityUser user = getCurrentUser();
TenantId tenantId = user.getTenantId();
return checkNotNull(cloudEventService.findEdgeSettings(tenantId));
return checkNotNull(edgeSettingsService.findEdgeSettings(tenantId));
} catch (Exception e) {
throw handleException(e);
}
Expand All @@ -581,6 +579,10 @@ public PageData<CloudEvent> getCloudEvents(
@RequestParam(required = false) String sortOrder,
@RequestParam(required = false) Long startTime,
@RequestParam(required = false) Long endTime) throws ThingsboardException {
if (queueType.equals("kafka")) {
throw new UnsupportedOperationException("getCloudEvents not supported for queue type - kafka");
}

try {
TenantId tenantId = getCurrentUser().getTenantId();
TimePageLink pageLink = createTimePageLink(pageSize, page, textSearch, sortProperty, sortOrder, startTime, endTime);
Expand All @@ -600,6 +602,9 @@ public PageData<CloudEvent> getTimeseriesCloudEvents(
@RequestParam(required = false) String sortOrder,
@RequestParam(required = false) Long startTime,
@RequestParam(required = false) Long endTime) throws ThingsboardException {
if (queueType.equals("kafka")) {
throw new UnsupportedOperationException("getTimeseriesCloudEvents not supported for queue type - kafka");
}
try {
TenantId tenantId = getCurrentUser().getTenantId();
TimePageLink pageLink = createTimePageLink(pageSize, page, textSearch, sortProperty, sortOrder, startTime, endTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class ThingsboardInstallService {
@Value("${state.persistToTelemetry:false}")
private boolean persistToTelemetry;

@Value("${queue.type}")
private String queueType;
jekka001 marked this conversation as resolved.
Show resolved Hide resolved

@Autowired
private EntityDatabaseSchemaService entityDatabaseSchemaService;

Expand Down
Loading
Loading