-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka cloud events #125
Kafka cloud events #125
Conversation
application/src/main/java/org/thingsboard/server/controller/BaseController.java
Show resolved
Hide resolved
dao/src/main/java/org/thingsboard/server/dao/cloud/MessageConstants.java
Outdated
Show resolved
Hide resolved
dao/src/main/java/org/thingsboard/server/dao/cloud/DefaultEdgeSettingsService.java
Outdated
Show resolved
Hide resolved
common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java
Outdated
Show resolved
Hide resolved
application/src/main/java/org/thingsboard/server/service/cloud/QueueConstants.java
Outdated
Show resolved
Hide resolved
...ication/src/main/java/org/thingsboard/server/service/cloud/PostgresUplinkMessageService.java
Outdated
Show resolved
Hide resolved
...ication/src/main/java/org/thingsboard/server/service/cloud/PostgresUplinkMessageService.java
Outdated
Show resolved
Hide resolved
application/src/main/java/org/thingsboard/server/service/cloud/KafkaUplinkMessageService.java
Outdated
Show resolved
Hide resolved
application/src/main/java/org/thingsboard/server/service/cloud/BaseUplinkMessageService.java
Outdated
Show resolved
Hide resolved
application/src/main/java/org/thingsboard/server/service/cloud/BaseUplinkMessageService.java
Outdated
Show resolved
Hide resolved
protected CloudEventService cloudEventService; | ||
|
||
public void processHandleMessages(TenantId tenantId) { | ||
PageData<CloudEvent> cloudEvents = findCloudEvents(tenantId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
Based on the current config, you'll find 8192 records from Kafka and try to send them all at once. I think we could face rate limits (need to double-check it), but even without it, we need to split sending by some batch. Let's discuss it with @volodymyr-babak
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion and based on the tests I conducted, the best value is 200. It is the optimal value that yields good processing results. Of course, the fastest processing occurs with 8192.
Test results: 600,000 ts:
Value 200: processed in 4 minutes 24 seconds.
Value 8192: processed in 3 minutes 33 seconds.
During the test, 10,000 ts were sent every second for 60 seconds.
- Renamed BaseCloudEventService to PostgresCloudEventService and refactored it. - Created KafkaCloudEventService. - Created queues: tb_cloud_event and tb_cloud_event_ts. - Refactored BaseUplinkMessageService. - Created KafkaUplinkMessageService. - Created TbCoreCloudEventProvider. - Refactored all components that used these services.
- added KafkaContainerTestSuite for start tests in kafka queue type mode - added service second service edge, kafka and zookeeper service to docker-compose - refactored tests
895c36f
to
8ecc452
Compare
…database and publish them to Kafka topic
application/src/main/java/org/thingsboard/server/service/cloud/KafkaCloudEventService.java
Outdated
Show resolved
Hide resolved
- remove cleanupEvents from CloudEventService
application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java
Outdated
Show resolved
Hide resolved
dao/src/main/java/org/thingsboard/server/dao/cloud/PostgresCloudEventService.java
Outdated
Show resolved
Hide resolved
application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java
Outdated
Show resolved
Hide resolved
application/src/main/java/org/thingsboard/server/service/cloud/CloudManagerService.java
Outdated
Show resolved
Hide resolved
application/src/main/java/org/thingsboard/server/service/cloud/BaseUplinkMessageService.java
Outdated
Show resolved
Hide resolved
@Service | ||
@Slf4j | ||
@ConditionalOnExpression("'${queue.type:null}'=='kafka'") | ||
public class KafkaCloudEventService implements CloudEventService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaEdgeGrpcSession - I like the approach that is used in this class more. It simplifies the code and does not use Postgres logic for consuming, also we use QueueConsumerManager, not creating our logic of working with a consumer, consider it in the next improvement :)
...cation/src/main/java/org/thingsboard/server/service/ttl/cloud/CloudEventsCleanUpService.java
Show resolved
Hide resolved
# Conflicts: # application/src/main/java/org/thingsboard/server/service/cloud/BaseUplinkMessageService.java # application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java # application/src/main/resources/tb-edge.yml # common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java # common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java # common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java # common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java # common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java # msa/edge-black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java # msa/edge-black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
… Postgres/Kafka implementations
Pull Request description
Implementation of Kafka for processing cloud events and telemetry instead of Postgres
in-memory
andkafka
.General checklist
Back-End feature checklist