Skip to content

Commit

Permalink
Only consume kafka when in demand
Browse files Browse the repository at this point in the history
  • Loading branch information
neilstephens committed Dec 20, 2024
1 parent 2c4847d commit 12fbb92
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 50 deletions.
159 changes: 115 additions & 44 deletions Code/Ports/KafkaPort/KafkaConsumerPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,27 @@ void KafkaConsumerPort::Build()
log->warn("{}: Consumer client.id is not set in the properties, this may cause issues when reloading/restarting (can't resume from the same offset)", Name);
}

pKafkaConsumer = KafkaPort::Build<KCC::KafkaConsumer>("Consumer");
if(!pKafkaConsumer)
throw std::runtime_error(Name+": Failed to create Kafka Consumer");
if(pConf->ShareKafkaClient)
{
if(auto log = odc::spdlog_get("KafkaPort"))
log->error("{}: Consumer ports cannot share a KafkaClient.", Name);
pConf->ShareKafkaClient = false;
}

auto rebalanceCb = [log{odc::spdlog_get("KafkaPort")}](kafka::clients::consumer::RebalanceEventType et, const kafka::TopicPartitions& tps)
{
if(log)
{
const auto action_str = et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned ? "assigned" : "revoked";
log->debug("Partitions {}: {}", action_str, kafka::toString(tps));
}
};
try
{
pKafkaConsumer = KafkaPort::Build<KCC::KafkaConsumer>("Consumer");
if(!pKafkaConsumer)
throw std::runtime_error("Build<KCC::KafkaConsumer> returned nullptr");
}
catch(const std::exception& e)
{
if(auto log = odc::spdlog_get("KafkaPort"))
log->error("{}: Failed to create Kafka Consumer: {}", Name, e.what());
return;
}

//subscribe to topics
//find all the topics we need to subscribe to
mTopics.insert(pConf->DefaultTopic);
if(pConf->pPointMap)
{
Expand All @@ -99,50 +106,116 @@ void KafkaConsumerPort::Build()
mTopics.insert(*trans_entry.pTopic);
}
}
//FIXME: this is blocking and throws on timeout! - is there a non-blocking/throwing version?
// passing in std::chrono::milliseconds::zero() just times out immediately
pKafkaConsumer->subscribe(mTopics,rebalanceCb);

pKafkaConsumer->pause(); //pause on start, resume on Enable()
Poll(); //Need to poll whether paused or not
}

void KafkaConsumerPort::Enable()
{
auto pConf = static_cast<KafkaPortConf*>(this->pConf.get());
if(pConf->ConsumerFastForwardOffset != 0)
//TODO: support "ServerType", eg. ONDEMAND, PERSISTENT, MANUAL
//ONDEMAND should be default (ie. only consume when there are upstream connected ports)
if(InDemand())
PortUp();
}

void KafkaConsumerPort::Disable()
{
PortDown();
}

void KafkaConsumerPort::Event(std::shared_ptr<const EventInfo> event, const std::string& SenderName, SharedStatusCallback_t pStatusCallback)
{
if(event->GetEventType() == EventType::ConnectState)
{
if(auto log = odc::spdlog_get("KafkaPort"))
log->info("{}: Fast forwarding consumer by {} records", Name, pConf->ConsumerFastForwardOffset);
if(event->GetPayload<EventType::ConnectState>() == ConnectState::CONNECTED)
PortUp();
else if(!InDemand())
PortDown();
}
}

//TODO: check what happens if the we seek past the end/beginning
void KafkaConsumerPort::PortUp()
{
//TODO: use a strand to sync port up/down
// or use a dedicated thread if there are blocking calls (ie subscribe)
if(pPollTimer)
return;

std::map<kafka::TopicPartition,kafka::Offset> offsets;
if(pConf->ConsumerFastForwardOffset < 0)
pKafkaConsumer->endOffsets(pKafkaConsumer->assignment());
else
pKafkaConsumer->beginningOffsets(pKafkaConsumer->assignment());
auto rebalanceCb = [this](kafka::clients::consumer::RebalanceEventType et, const kafka::TopicPartitions& tps)
{
if(auto log = odc::spdlog_get("KafkaPort"))
{
const auto action_str = et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned ? "assigned" : "revoked";
log->debug("Partitions {}: {}", action_str, kafka::toString(tps));
}
auto pConf = static_cast<KafkaPortConf*>(this->pConf.get());
if(pConf->ConsumerFastForwardOffset != 0
&& et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned)
{
if(auto log = odc::spdlog_get("KafkaPort"))
log->info("{}: Fast forwarding consumer partitions by {} records", Name, pConf->ConsumerFastForwardOffset);
//TODO: check what happens if the we seek past the end/beginning
std::map<kafka::TopicPartition,kafka::Offset> offsets;
if(pConf->ConsumerFastForwardOffset < 0)
offsets = pKafkaConsumer->endOffsets(tps);
else
offsets = pKafkaConsumer->beginningOffsets(tps);
for(const auto& [tp, offset] : offsets)
pKafkaConsumer->seek(tp, offset+pConf->ConsumerFastForwardOffset);
}
};

for(const auto& [tp, offset] : offsets)
pKafkaConsumer->seek(tp, offset+pConf->ConsumerFastForwardOffset);
try
{
//FIXME: this is blocking and throws on timeout!
pKafkaConsumer->subscribe(mTopics,rebalanceCb);
pKafkaConsumer->resume();
}
pKafkaConsumer->resume();
catch(const std::exception& e)
{
if(auto log = odc::spdlog_get("KafkaPort"))
log->error("{}: Failed to subscribe to topics: {}", Name, e.what());
}

pPollTimer = odc::asio_service::Get()->make_steady_timer();
pIOS->post([this](){Poll(pPollTimer);});
}

void KafkaConsumerPort::Disable()
//FIXME: Ideally we would fully destroy and recreate the consumer on port down/up
// but librdkafka seems to have issues - some trace of the old object remains, preventing the new one from working
// all calls result in an exception claiming the "Broker handle destroyed" (even though it's a new object)
// it works fine if ALL kafka ports are destroyed and recreated (meaning the library is fully reloaded), so I guess it's a static/global issue
// probably should make a min repro and bug report to librdkafka
// ... so we just pause/unsubscribe for now ...

void KafkaConsumerPort::PortDown()
{
pKafkaConsumer->pause();
if(pPollTimer)
{
pPollTimer->cancel();
pPollTimer.reset();
}
try
{
//pausing before unsubscribing seems to avoid a crash in the kafka library
pKafkaConsumer->pause();
//FIXME: this is blocking and throws on timeout!
pKafkaConsumer->unsubscribe();
}
catch(const std::exception& e)
{
if(auto log = odc::spdlog_get("KafkaPort"))
log->error("{}: Failed to unsubscribe from topics: {}", Name, e.what());
}
}

void KafkaConsumerPort::Poll()
void KafkaConsumerPort::Poll(std::weak_ptr<asio::steady_timer> wTimer)
{
//poll() should be called even when paused (to handle rebalances, etc)
// only bail out if we're being destroyed (which resets pPollTimer)
if(!pPollTimer)
return;

//Polling timer chain
// fires immediately if events returned, otherwise exponentially back off to MaxPollIntervalms

auto pTimer = wTimer.lock();
if(!pTimer)
return;

auto Records = pKafkaConsumer->poll(std::chrono::milliseconds::zero());
pKafkaConsumer->commitAsync();

Expand All @@ -160,11 +233,11 @@ void KafkaConsumerPort::Poll()
PollBackoff_ms = 1;
}

pPollTimer->expires_from_now(poll_delay);
pPollTimer->async_wait([this](asio::error_code err)
pTimer->expires_from_now(poll_delay);
pTimer->async_wait([this,wTimer](asio::error_code err)
{
if(!err)
Poll();
Poll(wTimer);
});

for(const auto& record : Records)
Expand Down Expand Up @@ -246,5 +319,3 @@ std::shared_ptr<EventInfo> KafkaConsumerPort::LuaDeserialise(const KCC::Consumer
return pLuaDeserialiser->Deserialise(record);
}

//TODO: support "ServerType", eg. ONDEMAND, PERSISTENT, MANUAL
//ONDEMAND should be default (ie. only consume when there are upstream connected ports)
17 changes: 11 additions & 6 deletions Code/Ports/KafkaPort/KafkaConsumerPort.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,26 @@ class KafkaConsumerPort: public KafkaPort
:KafkaPort(Name,Filename,Overrides){};
virtual ~KafkaConsumerPort()
{
pPollTimer->cancel();
pPollTimer.reset();
if(pPollTimer)
{
pPollTimer->cancel();
pPollTimer.reset();
}
};

void Build() override;
void Enable() override;
void Disable() override;
void Event(std::shared_ptr<const EventInfo> event, const std::string& SenderName, SharedStatusCallback_t pStatusCallback) override {}
void Event(std::shared_ptr<const EventInfo> event, const std::string& SenderName, SharedStatusCallback_t pStatusCallback) override;

private:
std::shared_ptr<KCC::KafkaConsumer> pKafkaConsumer;
std::shared_ptr<KCC::KafkaConsumer> pKafkaConsumer = nullptr;
std::set<kafka::Topic> mTopics;
size_t PollBackoff_ms = 1;
std::shared_ptr<asio::steady_timer> pPollTimer = odc::asio_service::Get()->make_steady_timer();
void Poll();
std::shared_ptr<asio::steady_timer> pPollTimer = nullptr;
void PortUp();
void PortDown();
void Poll(std::weak_ptr<asio::steady_timer> wTimer);
void ProcessRecord(const KCC::ConsumerRecord& record);
std::shared_ptr<EventInfo> TemplateDeserialise(const KCC::ConsumerRecord& record, const std::unique_ptr<TemplateDeserialiser>& pTemplateDeserialiser);
std::shared_ptr<EventInfo> CBORDeserialise(const KCC::ConsumerRecord& record, const std::unique_ptr<CBORDeserialiser>& pCBORDeserialiser);
Expand Down

0 comments on commit 12fbb92

Please sign in to comment.