Skip to content

Commit

Permalink
Build in KafkaPort, added KafkaConsumerPort
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Stephens committed Jul 26, 2024
1 parent 4aa47fe commit 7045679
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 73 deletions.
55 changes: 48 additions & 7 deletions Code/Ports/KafkaPort/KafkaClientCache.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,35 @@
/* opendatacon
*
* Copyright (c) 2014:
*
* DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi
* yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA==
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* KafkaClientCache.h
*
* Created on: 19/07/2024
* Author: Neil Stephens
*/

#ifndef KAFKACLIENTCACHE_H
#define KAFKACLIENTCACHE_H

#include <kafka/KafkaClient.h>
#include <opendatacon/asio.h>
#include <opendatacon/util.h>
#include <memory>
#include <unordered_map>
#include <string>
Expand Down Expand Up @@ -74,12 +104,21 @@ class KafkaClientCache
return std::static_pointer_cast<ClientType>(client);
}

auto new_client = std::make_shared<ClientType>(properties);
clients[client_key] = new_client;
poll_timers[client_key] = {std::numeric_limits<size_t>::max(),nullptr};
MaxPollTime(client_key, MaxPollIntervalms);
Clean();
return new_client;
try
{
auto new_client = std::make_shared<ClientType>(properties);
clients[client_key] = new_client;
poll_timers[client_key] = {std::numeric_limits<size_t>::max(),nullptr};
MaxPollTime(client_key, MaxPollIntervalms);
Clean();
return new_client;
}
catch(const kafka::KafkaException& e)
{
if(auto log = odc::spdlog_get("KafkaPort"))
log->error("{}: Failed to create KafkaClient: {}", client_key, e.what());
return nullptr;
}
}

// Singleton pattern to manage the cache
Expand Down Expand Up @@ -115,4 +154,6 @@ class KafkaClientCache
std::mutex mtx;
std::unordered_map<std::string, std::weak_ptr<kafka::clients::KafkaClient>> clients;
std::unordered_map<std::string, std::pair<size_t,std::shared_ptr<asio::steady_timer>>> poll_timers;
};
};

#endif // KAFKACLIENTCACHE_H
40 changes: 40 additions & 0 deletions Code/Ports/KafkaPort/KafkaConsumerPort.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* opendatacon
*
* Copyright (c) 2014:
*
* DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi
* yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA==
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* KafkaConsumerPort.cpp
*
* Created on: 26/07/2024
* Author: Neil Stephens
*/

#include "KafkaConsumerPort.h"
#include "KafkaPort.h"
#include <opendatacon/asio.h>
#include <opendatacon/spdlog.h>
#include <kafka/KafkaConsumer.h>
#include <chrono>
#include <memory>

void KafkaConsumerPort::Build()
{
pKafkaConsumer = KafkaPort::Build<KCC::KafkaConsumer>("Consumer");
//TODO: set up the consumer
}

49 changes: 49 additions & 0 deletions Code/Ports/KafkaPort/KafkaConsumerPort.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* opendatacon
*
* Copyright (c) 2014:
*
* DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi
* yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA==
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* KafkaConsumerPort.h
*
* Created on: 26/07/2024
* Author: Neil Stephens
*/

#ifndef KAFKACONSUMERPORT_H
#define KAFKACONSUMERPORT_H

#include "KafkaPort.h"
#include <kafka/KafkaConsumer.h>

using namespace odc;
namespace KCC = kafka::clients::consumer;

class KafkaConsumerPort: public KafkaPort
{
public:
KafkaConsumerPort(const std::string& Name, const std::string& Filename, const Json::Value& Overrides)
:KafkaPort(Name,Filename,Overrides){};
virtual ~KafkaConsumerPort(){};

virtual void Build() override;
virtual void Event(std::shared_ptr<const EventInfo> event, const std::string& SenderName, SharedStatusCallback_t pStatusCallback) override {}
private:
std::shared_ptr<KCC::KafkaConsumer> pKafkaConsumer;
};

#endif // KAFKACONSUMERPORT_H
3 changes: 1 addition & 2 deletions Code/Ports/KafkaPort/KafkaPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
*/

#include "KafkaPort.h"
#include "KafkaPortConf.h"

KafkaPort::KafkaPort(const std::string& Name, const std::string& Filename, const Json::Value& Overrides):
DataPort(Name, Filename, Overrides)
Expand Down Expand Up @@ -60,7 +59,7 @@ void KafkaPort::ProcessElements(const Json::Value& JSONRoot)
{
pConf->NativeKafkaProperties.put(memberName, JSONRoot["NativeKafkaProperties"][memberName].asString());
}
else if(auto log = spdlog::get("KafkaPort"))
else if(auto log = odc::spdlog_get("KafkaPort"))
{
log->error("NativeKafkaProperties member '{}' is not a simple value; ignoring.", memberName);
}
Expand Down
2 changes: 2 additions & 0 deletions Code/Ports/KafkaPort/KafkaPort.def
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ LIBRARY KafkaPort
EXPORTS
new_KafkaProducerPort
delete_KafkaProducerPort
new_KafkaConsumerPort
delete_KafkaConsumerPort
52 changes: 51 additions & 1 deletion Code/Ports/KafkaPort/KafkaPort.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
#define KAFKAPORT_H

#include "KafkaClientCache.h"
#include <atomic>
#include "KafkaPortConf.h"
#include <opendatacon/DataPort.h>
#include <atomic>

using namespace odc;

Expand All @@ -49,6 +50,55 @@ class KafkaPort: public DataPort
protected:
std::atomic_bool enabled {false};
std::shared_ptr<KafkaClientCache> pKafkaClientCache = KafkaClientCache::Get();
template <class KafkaClientType> std::shared_ptr<KafkaClientType> Build(std::string TypeString = "")
{
auto pConf = static_cast<KafkaPortConf*>(this->pConf.get());

if(!pConf->NativeKafkaProperties.contains("bootstrap.servers"))
{
pConf->NativeKafkaProperties.put("bootstrap.servers", "localhost:9092");
if(auto log = odc::spdlog_get("KafkaPort"))
log->error("{}: bootstrap.servers property not found, defaulting to localhost:9092", Name);
}

if(pConf->NativeKafkaProperties.getProperty("enable.manual.events.poll") == "false")
if(auto log = odc::spdlog_get("KafkaPort"))
log->warn("{}: enable.manual.events.poll property is set to false, forcing to true", Name);
pConf->NativeKafkaProperties.put("enable.manual.events.poll", "true");

pConf->NativeKafkaProperties.put("error_cb", [this](const kafka::Error& error)
{
if(auto log = odc::spdlog_get("KafkaPort"))
log->error("{}: {}",Name,error.toString());
});

pConf->NativeKafkaProperties.put("log_cb", [this](int level, const char* filename, int lineno, const char* msg)
{
auto spdlog_lvl = spdlog::level::level_enum(6-level);
if(auto log = odc::spdlog_get("KafkaPort"))
log->log(spdlog_lvl,"{} ({}:{}): {}",Name,filename,lineno,msg);
});

pConf->NativeKafkaProperties.put("stats_cb", [this](const std::string& jsonString)
{
if(auto log = odc::spdlog_get("KafkaPort"))
log->info("{}: Statistics: {}",Name,jsonString);
});

if(pConf->ShareKafkaClient)
{
if(pConf->SharedKafkaClientKey == "")
{
auto bs_servers = pConf->NativeKafkaProperties.getProperty("bootstrap.servers").value();
pConf->SharedKafkaClientKey = TypeString+":"+bs_servers;
}

return pKafkaClientCache->GetClient<KafkaClientType>(
pConf->SharedKafkaClientKey,
pConf->NativeKafkaProperties);
}
return pKafkaClientCache->GetClient<KafkaClientType>(Name, pConf->NativeKafkaProperties);
}
};

#endif // KAFKAPORT_H
72 changes: 9 additions & 63 deletions Code/Ports/KafkaPort/KafkaProducerPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/

#include "KafkaProducerPort.h"
#include "KafkaPortConf.h"
#include "KafkaPort.h"
#include <opendatacon/asio.h>
#include <opendatacon/spdlog.h>
#include <kafka/KafkaProducer.h>
Expand All @@ -34,85 +34,31 @@

void KafkaProducerPort::Build()
{
//create a kafka producer
auto pConf = static_cast<KafkaPortConf*>(this->pConf.get());

if(!pConf->NativeKafkaProperties.contains("bootstrap.servers"))
{
pConf->NativeKafkaProperties.put("bootstrap.servers", "localhost:9092");
if(auto log = spdlog::get("KafkaPort"))
log->error("bootstrap.servers property not found, defaulting to localhost:9092");
}

if(pConf->NativeKafkaProperties.getProperty("enable.manual.events.poll") == "false")
if(auto log = spdlog::get("KafkaPort"))
log->warn("enable.manual.events.poll property is set to false, forcing to true");
pConf->NativeKafkaProperties.put("enable.manual.events.poll", "true");

pConf->NativeKafkaProperties.put("error_cb", [this](const kafka::Error& error)
{
if(auto log = spdlog::get("KafkaPort"))
log->error("{}: {}",Name,error.toString());
});

pConf->NativeKafkaProperties.put("log_cb", [this](int level, const char* filename, int lineno, const char* msg)
{
auto spdlog_lvl = spdlog::level::level_enum(6-level);
if(auto log = spdlog::get("KafkaPort"))
log->log(spdlog_lvl,"{} ({}:{}): {}",Name,filename,lineno,msg);
});

pConf->NativeKafkaProperties.put("stats_cb", [this](const std::string& jsonString)
{
if(auto log = spdlog::get("KafkaPort"))
log->info("{}: Statistics: {}",Name,jsonString);
});

//TODO: consider also forcing enable.idempotence=true depending on the retry model
// see https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#idempotent-producer
//TODO: consider also setting the acks property to "all" depending on the retry model

//FIXME: KafkaProducer ctor can throw - catch it, and at least log it
if(pConf->ShareKafkaClient)
{
if(pConf->SharedKafkaClientKey == "")
{
auto bs_servers = pConf->NativeKafkaProperties.getProperty("bootstrap.servers").value();
pConf->SharedKafkaClientKey.append("Producer:").append(bs_servers);
}

pKafkaProducer = pKafkaClientCache->GetClient<KCP::KafkaProducer>(
pConf->SharedKafkaClientKey,
pConf->NativeKafkaProperties);
}
else
{
pKafkaProducer = pKafkaClientCache->GetClient<KCP::KafkaProducer>(
"Producer: " + Name,
pConf->NativeKafkaProperties);
}
pKafkaProducer = KafkaPort::Build<KCP::KafkaProducer>("Producer");
}

void KafkaProducerPort::Event(std::shared_ptr<const EventInfo> event, const std::string& SenderName, SharedStatusCallback_t pStatusCallback)
{
if(!enabled) return;
if(!pKafkaProducer) return;

//TODO: build the KCP::ProducerRecord from the EventInfo
KCP::ProducerRecord record(kafka::Topic("example-topic"), kafka::NullKey, kafka::Value("test"));

auto deliveryCb = [](const KCP::RecordMetadata& metadata, const kafka::Error& error)
auto deliveryCb = [this,event](const KCP::RecordMetadata& metadata, const kafka::Error& error)
{
auto log = spdlog::get("KafkaPort");
auto log = odc::spdlog_get("KafkaPort");
if (!error)
{
if(log && log->should_log(spdlog::level::trace))
log->trace("Message delivered: {}", metadata.toString());
log->trace("{}: Message delivered: {}", Name, metadata.toString());
}
else
{
if(log)
log->error("Message failed to be delivered: {}", error.message());
//TODO: retry?
log->error("{}: Message failed to be delivered: {} {} {}: {}",
ToString(event->GetEventType()),event->GetIndex(),
event->GetPayloadString(), error.message());
}
};

Expand Down
12 changes: 12 additions & 0 deletions Code/Ports/KafkaPort/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/

#include "KafkaProducerPort.h"
#include "KafkaConsumerPort.h"

extern "C" KafkaProducerPort* new_KafkaProducerPort(const std::string& Name, const std::string& File, const Json::Value& Overrides)
{
Expand All @@ -36,3 +37,14 @@ extern "C" void delete_KafkaProducerPort(KafkaProducerPort* aKafkaProducerPort_p
delete aKafkaProducerPort_ptr;
return;
}

extern "C" KafkaConsumerPort* new_KafkaConsumerPort(const std::string& Name, const std::string& File, const Json::Value& Overrides)
{
return new KafkaConsumerPort(Name,File,Overrides);
}

extern "C" void delete_KafkaConsumerPort(KafkaConsumerPort* aKafkaConsumerPort_ptr)
{
delete aKafkaConsumerPort_ptr;
return;
}

0 comments on commit 7045679

Please sign in to comment.