From 12fbb92afdb8ca50d4f5d16423c02118b9683072 Mon Sep 17 00:00:00 2001 From: Neil Stephens Date: Fri, 20 Dec 2024 19:00:51 +1100 Subject: [PATCH] Only consume kafka when in demand --- Code/Ports/KafkaPort/KafkaConsumerPort.cpp | 159 +++++++++++++++------ Code/Ports/KafkaPort/KafkaConsumerPort.h | 17 ++- 2 files changed, 126 insertions(+), 50 deletions(-) diff --git a/Code/Ports/KafkaPort/KafkaConsumerPort.cpp b/Code/Ports/KafkaPort/KafkaConsumerPort.cpp index dbfa45ab..f96ed2fc 100644 --- a/Code/Ports/KafkaPort/KafkaConsumerPort.cpp +++ b/Code/Ports/KafkaPort/KafkaConsumerPort.cpp @@ -76,20 +76,27 @@ void KafkaConsumerPort::Build() log->warn("{}: Consumer client.id is not set in the properties, this may cause issues when reloading/restarting (can't resume from the same offset)", Name); } - pKafkaConsumer = KafkaPort::Build("Consumer"); - if(!pKafkaConsumer) - throw std::runtime_error(Name+": Failed to create Kafka Consumer"); + if(pConf->ShareKafkaClient) + { + if(auto log = odc::spdlog_get("KafkaPort")) + log->error("{}: Consumer ports cannot share a KafkaClient.", Name); + pConf->ShareKafkaClient = false; + } - auto rebalanceCb = [log{odc::spdlog_get("KafkaPort")}](kafka::clients::consumer::RebalanceEventType et, const kafka::TopicPartitions& tps) - { - if(log) - { - const auto action_str = et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned ? "assigned" : "revoked"; - log->debug("Partitions {}: {}", action_str, kafka::toString(tps)); - } - }; + try + { + pKafkaConsumer = KafkaPort::Build("Consumer"); + if(!pKafkaConsumer) + throw std::runtime_error("Build returned nullptr"); + } + catch(const std::exception& e) + { + if(auto log = odc::spdlog_get("KafkaPort")) + log->error("{}: Failed to create Kafka Consumer: {}", Name, e.what()); + return; + } - //subscribe to topics + //find all the topics we need to subscribe to mTopics.insert(pConf->DefaultTopic); if(pConf->pPointMap) { @@ -99,50 +106,116 @@ void KafkaConsumerPort::Build() mTopics.insert(*trans_entry.pTopic); } } - //FIXME: this is blocking and throws on timeout! - is there a non-blocking/throwing version? - // passing in std::chrono::milliseconds::zero() just times out immediately - pKafkaConsumer->subscribe(mTopics,rebalanceCb); - - pKafkaConsumer->pause(); //pause on start, resume on Enable() - Poll(); //Need to poll whether paused or not } void KafkaConsumerPort::Enable() { - auto pConf = static_cast(this->pConf.get()); - if(pConf->ConsumerFastForwardOffset != 0) + //TODO: support "ServerType", eg. ONDEMAND, PERSISTENT, MANUAL + //ONDEMAND should be default (ie. only consume when there are upstream connected ports) + if(InDemand()) + PortUp(); +} + +void KafkaConsumerPort::Disable() +{ + PortDown(); +} + +void KafkaConsumerPort::Event(std::shared_ptr event, const std::string& SenderName, SharedStatusCallback_t pStatusCallback) +{ + if(event->GetEventType() == EventType::ConnectState) { - if(auto log = odc::spdlog_get("KafkaPort")) - log->info("{}: Fast forwarding consumer by {} records", Name, pConf->ConsumerFastForwardOffset); + if(event->GetPayload() == ConnectState::CONNECTED) + PortUp(); + else if(!InDemand()) + PortDown(); + } +} - //TODO: check what happens if the we seek past the end/beginning +void KafkaConsumerPort::PortUp() +{ + //TODO: use a strand to sync port up/down + // or use a dedicated thread if there are blocking calls (ie subscribe) + if(pPollTimer) + return; - std::map offsets; - if(pConf->ConsumerFastForwardOffset < 0) - pKafkaConsumer->endOffsets(pKafkaConsumer->assignment()); - else - pKafkaConsumer->beginningOffsets(pKafkaConsumer->assignment()); + auto rebalanceCb = [this](kafka::clients::consumer::RebalanceEventType et, const kafka::TopicPartitions& tps) + { + if(auto log = odc::spdlog_get("KafkaPort")) + { + const auto action_str = et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned ? "assigned" : "revoked"; + log->debug("Partitions {}: {}", action_str, kafka::toString(tps)); + } + auto pConf = static_cast(this->pConf.get()); + if(pConf->ConsumerFastForwardOffset != 0 + && et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned) + { + if(auto log = odc::spdlog_get("KafkaPort")) + log->info("{}: Fast forwarding consumer partitions by {} records", Name, pConf->ConsumerFastForwardOffset); + //TODO: check what happens if the we seek past the end/beginning + std::map offsets; + if(pConf->ConsumerFastForwardOffset < 0) + offsets = pKafkaConsumer->endOffsets(tps); + else + offsets = pKafkaConsumer->beginningOffsets(tps); + for(const auto& [tp, offset] : offsets) + pKafkaConsumer->seek(tp, offset+pConf->ConsumerFastForwardOffset); + } + }; - for(const auto& [tp, offset] : offsets) - pKafkaConsumer->seek(tp, offset+pConf->ConsumerFastForwardOffset); + try + { + //FIXME: this is blocking and throws on timeout! + pKafkaConsumer->subscribe(mTopics,rebalanceCb); + pKafkaConsumer->resume(); } - pKafkaConsumer->resume(); + catch(const std::exception& e) + { + if(auto log = odc::spdlog_get("KafkaPort")) + log->error("{}: Failed to subscribe to topics: {}", Name, e.what()); + } + + pPollTimer = odc::asio_service::Get()->make_steady_timer(); + pIOS->post([this](){Poll(pPollTimer);}); } -void KafkaConsumerPort::Disable() +//FIXME: Ideally we would fully destroy and recreate the consumer on port down/up +// but librdkafka seems to have issues - some trace of the old object remains, preventing the new one from working +// all calls result in an exception claiming the "Broker handle destroyed" (even though it's a new object) +// it works fine if ALL kafka ports are destroyed and recreated (meaning the library is fully reloaded), so I guess it's a static/global issue +// probably should make a min repro and bug report to librdkafka +// ... so we just pause/unsubscribe for now ... + +void KafkaConsumerPort::PortDown() { - pKafkaConsumer->pause(); + if(pPollTimer) + { + pPollTimer->cancel(); + pPollTimer.reset(); + } + try + { + //pausing before unsubscribing seems to avoid a crash in the kafka library + pKafkaConsumer->pause(); + //FIXME: this is blocking and throws on timeout! + pKafkaConsumer->unsubscribe(); + } + catch(const std::exception& e) + { + if(auto log = odc::spdlog_get("KafkaPort")) + log->error("{}: Failed to unsubscribe from topics: {}", Name, e.what()); + } } -void KafkaConsumerPort::Poll() +void KafkaConsumerPort::Poll(std::weak_ptr wTimer) { - //poll() should be called even when paused (to handle rebalances, etc) - // only bail out if we're being destroyed (which resets pPollTimer) - if(!pPollTimer) - return; - //Polling timer chain // fires immediately if events returned, otherwise exponentially back off to MaxPollIntervalms + + auto pTimer = wTimer.lock(); + if(!pTimer) + return; + auto Records = pKafkaConsumer->poll(std::chrono::milliseconds::zero()); pKafkaConsumer->commitAsync(); @@ -160,11 +233,11 @@ void KafkaConsumerPort::Poll() PollBackoff_ms = 1; } - pPollTimer->expires_from_now(poll_delay); - pPollTimer->async_wait([this](asio::error_code err) + pTimer->expires_from_now(poll_delay); + pTimer->async_wait([this,wTimer](asio::error_code err) { if(!err) - Poll(); + Poll(wTimer); }); for(const auto& record : Records) @@ -246,5 +319,3 @@ std::shared_ptr KafkaConsumerPort::LuaDeserialise(const KCC::Consumer return pLuaDeserialiser->Deserialise(record); } -//TODO: support "ServerType", eg. ONDEMAND, PERSISTENT, MANUAL -//ONDEMAND should be default (ie. only consume when there are upstream connected ports) \ No newline at end of file diff --git a/Code/Ports/KafkaPort/KafkaConsumerPort.h b/Code/Ports/KafkaPort/KafkaConsumerPort.h index d9b58a66..27cedf65 100644 --- a/Code/Ports/KafkaPort/KafkaConsumerPort.h +++ b/Code/Ports/KafkaPort/KafkaConsumerPort.h @@ -42,21 +42,26 @@ class KafkaConsumerPort: public KafkaPort :KafkaPort(Name,Filename,Overrides){}; virtual ~KafkaConsumerPort() { - pPollTimer->cancel(); - pPollTimer.reset(); + if(pPollTimer) + { + pPollTimer->cancel(); + pPollTimer.reset(); + } }; void Build() override; void Enable() override; void Disable() override; - void Event(std::shared_ptr event, const std::string& SenderName, SharedStatusCallback_t pStatusCallback) override {} + void Event(std::shared_ptr event, const std::string& SenderName, SharedStatusCallback_t pStatusCallback) override; private: - std::shared_ptr pKafkaConsumer; + std::shared_ptr pKafkaConsumer = nullptr; std::set mTopics; size_t PollBackoff_ms = 1; - std::shared_ptr pPollTimer = odc::asio_service::Get()->make_steady_timer(); - void Poll(); + std::shared_ptr pPollTimer = nullptr; + void PortUp(); + void PortDown(); + void Poll(std::weak_ptr wTimer); void ProcessRecord(const KCC::ConsumerRecord& record); std::shared_ptr TemplateDeserialise(const KCC::ConsumerRecord& record, const std::unique_ptr& pTemplateDeserialiser); std::shared_ptr CBORDeserialise(const KCC::ConsumerRecord& record, const std::unique_ptr& pCBORDeserialiser);