Skip to content

Commit

Permalink
upgraed modern-cpp-kafka (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
RPG-18 authored Mar 27, 2023
1 parent 36435db commit faf59a2
Show file tree
Hide file tree
Showing 35 changed files with 870 additions and 505 deletions.
36 changes: 18 additions & 18 deletions 3rdparty/modern-cpp-kafka/include/kafka/AdminClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <vector>


namespace KAFKA_API { namespace clients {
namespace KAFKA_API { namespace clients { namespace admin {

/**
* The administrative client for Kafka, which supports managing and inspecting topics, etc.
Expand All @@ -27,11 +27,7 @@ class AdminClient: public KafkaClient
{
public:
explicit AdminClient(const Properties& properties)
: KafkaClient(ClientType::AdminClient,
KafkaClient::validateAndReformProperties(properties),
ConfigCallbacksRegister{},
EventsPollingOption::Auto,
Interceptors{})
: KafkaClient(ClientType::AdminClient, KafkaClient::validateAndReformProperties(properties))
{
}

Expand Down Expand Up @@ -83,9 +79,9 @@ AdminClient::getPerTopicResults(const rd_kafka_topic_result_t** topicResults, st
for (std::size_t i = 0; i < topicCount; ++i)
{
const rd_kafka_topic_result_t* topicResult = topicResults[i];
if (rd_kafka_resp_err_t topicError = rd_kafka_topic_result_error(topicResult))
if (const rd_kafka_resp_err_t topicError = rd_kafka_topic_result_error(topicResult))
{
std::string detailedMsg = "topic[" + std::string(rd_kafka_topic_result_name(topicResult)) + "] with error[" + rd_kafka_topic_result_error_string(topicResult) + "]";
const std::string detailedMsg = "topic[" + std::string(rd_kafka_topic_result_name(topicResult)) + "] with error[" + rd_kafka_topic_result_error_string(topicResult) + "]";
errors.emplace_back(topicError, detailedMsg);
}
}
Expand All @@ -99,9 +95,9 @@ AdminClient::getPerTopicPartitionResults(const rd_kafka_topic_partition_list_t*

for (int i = 0; i < (partitionResults ? partitionResults->cnt : 0); ++i)
{
if (rd_kafka_resp_err_t partitionError = partitionResults->elems[i].err)
if (const rd_kafka_resp_err_t partitionError = partitionResults->elems[i].err)
{
std::string detailedMsg = "topic-partition[" + std::string(partitionResults->elems[i].topic) + "-" + std::to_string(partitionResults->elems[i].partition) + "] with error[" + rd_kafka_err2str(partitionError) + "]";
const std::string detailedMsg = "topic-partition[" + std::string(partitionResults->elems[i].topic) + "-" + std::to_string(partitionResults->elems[i].partition) + "] with error[" + rd_kafka_err2str(partitionError) + "]";
errors.emplace_back(partitionError, detailedMsg);
}
}
Expand Down Expand Up @@ -148,10 +144,14 @@ AdminClient::createTopics(const Topics& topics,

for (const auto& conf: topicConfig.map())
{
rd_kafka_resp_err_t err = rd_kafka_NewTopic_set_config(rkNewTopics.back().get(), conf.first.c_str(), conf.second.c_str());
const auto& k = conf.first;
const auto& v = topicConfig.getProperty(k);
if (!v) continue;

const rd_kafka_resp_err_t err = rd_kafka_NewTopic_set_config(rkNewTopics.back().get(), k.c_str(), v->c_str());
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
std::string errMsg = "Invalid config[" + conf.first + "=" + conf.second + "]";
const std::string errMsg = "Invalid config[" + k + "=" + *v + "]";
KAFKA_API_DO_LOG(Log::Level::Err, errMsg.c_str());
return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg});
}
Expand Down Expand Up @@ -189,7 +189,7 @@ AdminClient::createTopics(const Topics& topics,

std::list<Error> errors;

if (rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
if (const rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
{
errors.emplace_back(respErr, rd_kafka_event_error_string(rk_ev.get()));
}
Expand Down Expand Up @@ -262,7 +262,7 @@ AdminClient::deleteTopics(const Topics& topics, std::chrono::milliseconds timeou

std::list<Error> errors;

if (rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
if (const rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
{
errors.emplace_back(respErr, rd_kafka_event_error_string(rk_ev.get()));
}
Expand All @@ -281,7 +281,7 @@ inline admin::ListTopicsResult
AdminClient::listTopics(std::chrono::milliseconds timeout)
{
const rd_kafka_metadata_t* rk_metadata = nullptr;
rd_kafka_resp_err_t err = rd_kafka_metadata(getClientHandle(), true, nullptr, &rk_metadata, convertMsDurationToInt(timeout));
const rd_kafka_resp_err_t err = rd_kafka_metadata(getClientHandle(), true, nullptr, &rk_metadata, convertMsDurationToInt(timeout));
auto guard = rd_kafka_metadata_unique_ptr(rk_metadata);

if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
Expand All @@ -303,7 +303,7 @@ AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
{
auto rk_queue = rd_kafka_queue_unique_ptr(rd_kafka_queue_new(getClientHandle()));

rd_kafka_DeleteRecords_unique_ptr rkDeleteRecords(rd_kafka_DeleteRecords_new(createRkTopicPartitionList(topicPartitionOffsets)));
const rd_kafka_DeleteRecords_unique_ptr rkDeleteRecords(rd_kafka_DeleteRecords_new(createRkTopicPartitionList(topicPartitionOffsets)));
std::array<rd_kafka_DeleteRecords_t*, 1> rk_del_records{rkDeleteRecords.get()};

rd_kafka_DeleteRecords(getClientHandle(), rk_del_records.data(), rk_del_records.size(), nullptr, rk_queue.get());
Expand Down Expand Up @@ -331,7 +331,7 @@ AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,

std::list<Error> errors;

if (rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
if (const rd_kafka_resp_err_t respErr = rd_kafka_event_error(rk_ev.get()))
{
errors.emplace_back(respErr, rd_kafka_event_error_string(rk_ev.get()));
}
Expand All @@ -344,5 +344,5 @@ AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
return admin::DeleteRecordsResult(combineErrors(errors));
}

} } // end of KAFKA_API::clients
} } } // end of KAFKA_API::clients::admin

32 changes: 5 additions & 27 deletions 3rdparty/modern-cpp-kafka/include/kafka/AdminClientConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,20 @@

#include <kafka/Project.h>

#include <kafka/Properties.h>
#include <kafka/ClientConfig.h>


namespace KAFKA_API { namespace clients { namespace admin {

/**
* Configuration for the Kafka Consumer.
*/
class Config: public Properties
class AdminClientConfig: public Config
{
public:
Config() = default;
Config(const Config&) = default;
explicit Config(const PropertiesMap& kvMap): Properties(kvMap) {}

/**
* The string contains host:port pairs of brokers (splitted by ",") that the administrative client will use to establish initial connection to the Kafka cluster.
* Note: It's mandatory.
*/
static const constexpr char* BOOTSTRAP_SERVERS = "bootstrap.servers";

/**
* Protocol used to communicate with brokers.
* Default value: plaintext
*/
static const constexpr char* SECURITY_PROTOCOL = "security.protocol";

/**
* Shell command to refresh or acquire the client's Kerberos ticket.
*/
static const constexpr char* SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";

/**
* The client's Kerberos principal name.
*/
static const constexpr char* SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
AdminClientConfig() = default;
AdminClientConfig(const AdminClientConfig&) = default;
explicit AdminClientConfig(const PropertiesMap& kvMap): Config(kvMap) {}
};

} } } // end of KAFKA_API::clients::admin
Expand Down
1 change: 1 addition & 0 deletions 3rdparty/modern-cpp-kafka/include/kafka/BrokerMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ inline std::vector<std::shared_ptr<BrokerMetadata::Node>>
BrokerMetadata::nodes() const
{
std::vector<std::shared_ptr<BrokerMetadata::Node>> ret;
ret.reserve(_nodes.size());
for (const auto& nodeInfo: _nodes)
{
ret.emplace_back(nodeInfo.second);
Expand Down
46 changes: 46 additions & 0 deletions 3rdparty/modern-cpp-kafka/include/kafka/ClientCommon.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#pragma once

#include <kafka/Project.h>

#include <kafka/Error.h>

#include <functional>


namespace KAFKA_API { namespace clients {

/**
* Callback type for logging.
*/
using LogCallback = std::function<void(int, const char*, int, const char* msg)>;

/**
* Callback type for error notification.
*/
using ErrorCallback = std::function<void(const Error&)>;

/**
* Callback type for statistics info dumping.
*/
using StatsCallback = std::function<void(const std::string&)>;

/**
* SASL OAUTHBEARER token info.
*/
struct SaslOauthbearerToken
{
using KeyValuePairs = std::map<std::string, std::string>;

std::string value;
std::chrono::microseconds mdLifetime{};
std::string mdPrincipalName;
KeyValuePairs extensions;
};

/**
* Callback type for OAUTHBEARER token refresh.
*/
using OauthbearerTokenRefreshCallback = std::function<SaslOauthbearerToken(const std::string&)>;

} } // end of KAFKA_API::clients

172 changes: 172 additions & 0 deletions 3rdparty/modern-cpp-kafka/include/kafka/ClientConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#pragma once

#include <kafka/Project.h>

#include <kafka/Properties.h>


namespace KAFKA_API { namespace clients {

/**
* Configuration for Kafka clients..
*/
class Config: public Properties
{
public:
Config() = default;
Config(const Config&) = default;
explicit Config(const PropertiesMap& kvMap): Properties(kvMap) {}

/**
* To poll the events manually (otherwise, it would be done with a background polling thread).
* Note: Once it's enabled, the interface `pollEvents()` should be manually called, in order to trigger
* 1) The offset-commit callbacks, for consumers.
* 2) The message-delivery callbacks, for producers.
*/
static const constexpr char* ENABLE_MANUAL_EVENTS_POLL = "enable.manual.events.poll";

/**
* Log callback.
* Type: `std::function<void(int, const char*, int, const char* msg)>`
*/
static const constexpr char* LOG_CB = "log_cb";

/**
* Log callback.
* Type: `std::function<void(const Error&)>`
*/
static const constexpr char* ERROR_CB = "error_cb";

/**
* Statistics callback.
* Type: `std::function<void(const std::string&)>`
*/
static const constexpr char* STATS_CB = "stats_cb";

/**
* OAUTHBEARER token refresh callback.
* Type: `std::function<SaslOauthbearerToken(const std::string&)>`
*/
static const constexpr char* OAUTHBEARER_TOKEN_REFRESH_CB = "oauthbearer_token_refresh_cb";

/**
* Interceptors for thread start/exit, brokers' state change, etc.
* Type: `Interceptors`
*/
static const constexpr char* INTERCEPTORS = "interceptors";

/**
* The string contains host:port pairs of brokers (splitted by ",") that the consumer will use to establish initial connection to the Kafka cluster.
* Note: It's mandatory.
*/
static const constexpr char* BOOTSTRAP_SERVERS = "bootstrap.servers";

/**
* Client identifier.
*/
static const constexpr char* CLIENT_ID = "client.id";

/**
* Log level (syslog(3) levels).
*/
static const constexpr char* LOG_LEVEL = "log_level";

/**
* A comma-separated list of debug contexts to enable.
*/
static const constexpr char* DEBUG = "debug";

/**
* Timeout for network requests.
* Default value: 60000
*/
static const constexpr char* SOCKET_TIMEOUT_MS = "socket.timeout.ms";

/**
* Protocol used to communicate with brokers.
* Default value: plaintext
*/
static const constexpr char* SECURITY_PROTOCOL = "security.protocol";

/**
* SASL mechanism to use for authentication.
* Default value: GSSAPI
*/
static const constexpr char* SASL_MECHANISM = "sasl.mechanisms";

/**
* SASL username for use with the PLAIN and SASL-SCRAM-.. mechanism.
*/
static const constexpr char* SASL_USERNAME = "sasl.username";

/**
* SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism.
*/
static const constexpr char* SASL_PASSWORD = "sasl.password";

/**
* Shell command to refresh or acquire the client's Kerberos ticket.
*/
static const constexpr char* SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";

/**
* The client's Kerberos principal name.
*/
static const constexpr char* SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";

/**
* Set to "default" or "oidc" to control with login method to be used.
* If set to "oidc", the following properties must also be specified:
* sasl.oauthbearer.client.id
* sasl.oauthbearer.client.secret
* sasl.oauthbearer.token.endpoint.url
* Default value: default
*/
static const constexpr char* SASL_OAUTHBEARER_METHOD = "sasl.oauthbearer.method";

/**
* Public identifier for the applicaition.
* Only used with "sasl.oauthbearer.method=oidc".
*/
static const constexpr char* SASL_OAUTHBEARER_CLIENT_ID = "sasl.oauthbearer.client.id";

/**
* Client secret only known to the application and the authorization server.
* Only used with "sasl.oauthbearer.method=oidc".
*/
static const constexpr char* SASL_OAUTHBEARER_CLIENT_SECRET = "sasl.oauthbearer.client.secret";

/**
* Allow additional information to be provided to the broker. Comma-separated list of key=value pairs.
* Only used with "sasl.oauthbearer.method=oidc".
*/
static const constexpr char* SASL_OAUTHBEARER_EXTENSIONS = "sasl.oauthbearer.extensions";

/**
* Client use this to specify the scope of the access request to the broker.
* Only used with "sasl.oauthbearer.method=oidc".
*/
static const constexpr char* SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope";

/**
* OAuth/OIDC issuer token endpoint HTTP(S) URI used to retreve token.
* Only used with "sasl.oauthbearer.method=oidc".
*/
static const constexpr char* SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url";

/**
* SASL/OAUTHBEARER configuration.
* The format is implementation-dependent and must be parsed accordingly.
*/
static const constexpr char* SASL_OAUTHBEARER_CONFIG = "sasl.oauthbearer.config";

/**
* Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set.
* Should only be used for development or testing, and not in production.
* Default value: false
*/
static const constexpr char* ENABLE_SASL_OAUTHBEARER_UNSECURE_JWT = "enable.sasl.oauthbearer.unsecure.jwt";
};

} } // end of KAFKA_API::clients

Loading

0 comments on commit faf59a2

Please sign in to comment.