-
Notifications
You must be signed in to change notification settings - Fork 3.2k
FAQ
Yes, it is. But you need to read this: Broker version compatibility
Yes, librdkafka is completely thread-safe (unless otherwise noted in the API documentation).
Any API, short of the destructor functions, may be called at any time from any thread.
The common restrictions of object destruction still applies (e.g., you must not call rd_kafka_destroy()
while another thread is calling rd_kafka_poll()
or similar).
That is up to you, the library itself is implemented in C and the C++ interface is a thin layer on top of the C code. The C++ interface may lag behind the C interface functionally (but usually not by far).
For production use: use latest official release
For testing and development use: use latest master branch
This is covered in Proper termination sequence
Yes, the Admin API allows you to create and delete topics, create partitions, get and alter configuration for topics, brokers, and other cluster resources.
librdkafka also supports the deprecated automatic topic creation, which needs to be enabled with auto.create.topics.enable=true
on the broker.
A record tombstone is represented in Kafka compacted topics by a valid key and null payload.
For the producer: simply pass NULL for the value/payload pointer in the ..produce()
call.
For the consumer: treat message->payload == NULL
as tombstone.
- Where does librdkafka log?
- That's up to you, see rd_kafka_conf_set_log_cb. The default is a stderr logger.
- Even though I have log_level set to DEBUG I don't see anything on stdout /stderr?
- log_level is 6 (info) by default (anything but debug) but is automatically raised to 7 (debug) when debugging is enabled. So it serves the reverse purpose: to filter out higher-leveled logs. There is really very little use for it.
- I'm not seeing any logs
- librdkafka does not really log anything unless there are errors. Logs are logged through the log_cb which defaults to a stderr writer, there is also a builtin syslog writer you can configure with set_log_cb(). If you want to get librdkafka to log stuff even though there are no errors you can enable debugging by setting the debug config property to e.g. topic,broker.
- Is there any provision to give your own log file path to librdkafka?
- If you want to log to a file you need to implement your own log_cb, which is simple enough to do, just implement the log_cb interface and pass your function reference to set_log_cb.
If the remote peer, typically the broker (but could also be an active TCP gateway of some kind), closes the connection you'll see a log message like this:
%3|1500588440.537|FAIL|rdkafka#producer-1| 10.255.84.150:9092/1: Receive failed: Disconnected
There are a number of possible reasons, in order of how common they are:
- Broker's idle connection reaper closes the connection due to inactivity. This is controlled by the broker configuration property
connections.max.idle.ms
and defaults to 10 minutes. This is by far the most common reason for spontaneous disconnects. - The client sent an unsupported protocol request; see Broker version compatibility. This is considered a configuration error on the client. The broker should log an exception explaining why the connection was closed, see the broker logs.
- The client sent a malformed protocol request; this is an indication of a bug in the client. The broker should log an exception explaining why the connection was closed, see the broker logs.
- The broker is in an invalid state. The broker should log an exception explaining why the connection was closed, see the broker logs.
- TCP gateway/load-balancer/firewall session timeout. Try enabling TCP keep-alives on the client by setting
socket.keepalive.enable=true
.
Since a TCP close can't signal why the remote peer closed the connection there is no way for the client to know what went wrong. If the disconnect logs are getting annoying and the admin deems they are caused by the idle connection reaper, the log.connection.close
client configuration property can be set to false
to silence all spontaneous disconnect logs.
NOTE: Whenever a connection is closed for whatever reason, librdkafka will automatically reconnect after reconnect.backoff.jitter.ms
(default 500ms).
librdkafka will use the system resolver to resolve the broker hostname. On some systems, OSX in particular, the localhost entry in /etc/hosts
resolves both to an IPv4 and IPv6 address, so librdkafka will, in a round-robin fashion, attempt to connect to all addresses the hostname resolves to.
If the broker is only listening to the IPv4 address then the clients connection attempt to the IPv6 address will fail.
To limit the address families the clients connects to, set the broker.address.family
configuration property to v4
or v6
.
This is a somewhat common ask (see https://github.com/confluentinc/librdkafka/issues/137).
The problem is that it might actually be a disservice to the user:
librdkafka aims to abstract all the state details of interacting with Kafka, providing
an interface that the user controls through constraints rather than state knowledge.
For example, message.timeout.ms
is used to tell the Producer for how
long it may attempt to produce a message, handling all temporary errors without any need
of user interaction.
As soon as the application gets involved in this decision making things get complex quickly: E.g., is a lost broker connection really a problem? Only if it is the leader for a partition we're using, but the client will try to acquire the leader automatically, and there isn't much the application can do anyway.
With new features and functionality being constantly added to Kafka it means that an application would need to keep up to date to know if an "event" is of severe nature or not, but the client already does this.
As for alarms, I believe it is more useful to look at the data than the protocol: Set up a statistics callback and raise an alarm if partition message queues or round-trips are building up; this covers far more problems than monitoring for single broker disconnects: network congestion, broker overload, client overload, etc.
Having said that I'm not saying there will never be an event API, but we're yet to see a proper use-case for it.
More on the subject of tracking broker state:
Problem is there is no such thing as a single Kafka cluster state.
A consumer will need to speak to a group coordinator, and the leaders for the partitions it wants to consume.
A producer will need to speak to the leaders for the partitions it wants to produce to, as well as the transaction coordinator in case transactions are used.
These leaders and coordinators may change at any time, and may be temporarily or permanently unavailable.
- Will the consumer be operational if it can read 2/3 partitions?
- Will the producer be operational if it can write to 1/3 partitions?
- Is slowness to connect a fatal error? What's the definition of slow?
- How much time should be allowed to try a connection before moving on?
The delay of
ALL_BROKERS_DOWN
is the connection failure/timeout time (which will vary depending on reason) * the number of brokers. More aggressive timeouts might mean we fail a connection attempt that would have been successful if we would have just waited a bit longer. - What if there is a temporary outage, should all clients connecting to the cluster fail?
The answers to all these questions are: it depends - on your infrastructure, data contracts, etc.
Our recommendation is not to think of this problem as a connectivity, state or broker problem, but rather a data problem:
- Consumer: if less than P% of the expected data rate has been received in the last X minutes, something is wrong.
- Producer: if more than P% of messages failed producing during the last X minutes, something is wrong.
This is easily measured in the application logic.
- If
rd_kafka_topic_new()
is called with a NULLrd_kafka_topic_conf_t *
it will use the default topic configuration. - The default topic configuration is set by populating a
rd_kafka_topic_conf_t
object and then usingrd_kafka_conf_set_default_topic_conf(global_conf, my_default_config)
- this will replace any previously set default topic configuration. - As a convenience to the user, librdkafka allows default topic configuration to be specified on the global topic configuration object - this is handy in the case of configuration files since there is no need to split configuration between global and (default) topic, allowing you to set
request.required.acks
(et.al) withrd_kafka_conf_set()
rather thanrd_kafka_topic_conf_set()
. This functionality is called topic fallthru configuration. - Internally, topic fallthru configuration will create a default topic config object if it does not already exist. If you first call
rd_kafka_conf_set("some.topic.property")
and then callrd_kafka_conf_set_default_topic_config()
, your default topic config object will replace the implicitly created one, which you probably don't want. - The recommendation is to not use
rd_kafka_set_default_topic_conf()
but rely on the topic fallthru configuration. - You can still use topic-specific configuration, do note though that a topic's configuration is applied only the first time the topic is referenced, either by the application with
rd_kafka_topic_new()
or when the topic is created automatically internally because it was referenced inrd_kafka_subscribe()
or similar. - Specific configuration passed to
rd_kafka_topic_new()
will override the default topic configuration.
You can only set the topic configuration once per client instance, namely in your first call to rd_kafka_topic_new().
Topic config, as supplied to (the first call to) rd_kafka_topic_new() for a specific topic is local to that topic for the remainder of that rd_kafka_t instance's lifetime. That means that only the properties from conf1 will be used in the following example:
rd_kafka_topic_new(rk, "topic1", conf1);
rd_kafka_topic_new(rk, "topic1", conf2);
Topics are local to their rd_kafka_t instance and not shared between them in any way.
librdkafka creates 1 main thread and one thread per broker, for each client instance.
See "Number of broker TCP connections" below.
librdkafka will only attempt to connect to the brokers it needs to communicate with, one of the bootstrap.servers
, the partition leaders, group and transaction coordinator, etc.
If the first attempted bootstrap server is down, the next will be attempted in a randomized fashion.
Upon the first connection the broker will be queried for the full list of brokers in the cluster, this is called a Metadata request, and librdkafka will maintain a cache of all brokers in the cluster so it knows, based on broker node id, where to connect to perform its operation.
The initial bootstrap broker connections will only be used for Metadata queries, unless the hostname and port of a bootstrap broker exactly matches the hostname and port of a broker returned in the Metadata response (this is the advertised.listeners
broker configuration property), in which case the bootstrap broker connection will be associated with that broker's broker id and used for the full protocol set (such as producing or consuming).
librdkafka will automatically attempt to reconnect to a broker if the broker connection goes down, if the client still needs to interact with the broker in question, otherwise a new broker connection will not be made.
Modern Kafka clients do not connect to Zookeeper. Historical versions of Kafka exposed Zookeeper to clients to retrieve the full broker list (Kafka 0.7) and the initial consumer groups were implemented in the clients using Zookeeper (Kafka 0.8). But since Kafka 0.8 (and 0.9 for consumer groups) there is no longer any need for clients to connect to Zookeeper, the metadata and consumer group functionality has moved to the Kafka broker. In fact, it is highly recommended not to make Zookeeper reachable from clients due to security aspects.
librdkafka originally stems from librd, which was my contractor toolbox library of convenience functions, the rd
stands for rapid development.
In hindsight librdkafka should have been named simply libkafka.
SASL GSSAPI/Kerberos auth failure on Windows: Failed to initialize SASL authentication: InitializeSecurityContext failed: Target unknown (0x80090303)
This implies that you are requesting a service ticket for a principal which does not exist in the KDC.
The initial suspicion would be that there is not an SPN matching the broker's advertised hostname. The client will attempt to obtain a service ticket for <sasl.kerberos.service.name>/<advertised.listener>@REALM.COM
. There must be an SPN (or principal if this is a stand alone KDC on a linux server) otherwise an error like the one you see now will be returned.
You should also verify that all users in the exchange have strong encryption(AES) enabled for their accounts. This is a manual step and it is not enabled by default. The problem arises when the session key is negotiated and one of the peers does not support the default RC4 enc type.
Yes, broker based balanced consumer groups are supported and requires Apache Kafka broker version >= 0.9.
-
C:
rd_kafka_subscribe()
et.al -
C++:
KafkaConsumer
class.
The Consumer
class is the legacy simple consumer that works with any broker >= 0.8, it does not feature any consumer group support but allows full flexibility in regards to partitions to consume and their offsets.
-
C:
rd_kafka_consume_start()
et.al. -
C++:
Consumer
class
This API is deprecated but there is no plan to remove it, please see the rd_kafka_assign()
or KafkaConsumer::assign()
API for a more modern alternative.
The KafkaConsumer
class is the new high-level balanced consumer which requires broker version >= 0.9, the subscribed topics and partitions are assigned to the members in a consumer group so that only one consumer may consume messages from a single partition at any time.
-
C:
rd_kafka_subscribe()
,rd_kafka_assign()
, et.al. -
C++:
KafkaConsumer
class
If there are no stored offsets for a partition (and group in case of the KafkaConsumer) the consumer will default its starting offset to the topic configuration setting auto.offset.reset
which defaults to latest
- that is, it will start consuming at the current end of a partition.
If you are using the KafkaConsumer you probably do not have a per-topic configuration object but should use the default topic config, see default_topic_conf
.
The assign()
API takes a list of partitions with an optional starting offset for each partition.
See Manually setting the consumer start offset.
Since you dont need to create any topic objects when using KafkaConsumer you might wonder how to set topic configuration.
The default_topic_conf
configuration property solves this, it takes a topic configuration object and applies it to all internally created topics as their default configuration.
If you need special configuration for a sub-set of your topics you will need to create them prior to calling subscribe() or assign() and pass their explicit configuration objects at topic creation time.
If the current controller broker is also partition leader for one or more partitions that the consumer is fetching from, an outstanding FetchRequest will block sub-sequent OffsetCommitRequests queued on the same connection for up to fetch.wait.max.ms
. This is only a problem if no new messages are arriving at the partition. The workaround is to decrease fetch.wait.max.ms
(default 500ms).
Even when asynchronously committing offsets for each processed message, the OffsetCommit request-response round-trip times will quite quickly build up a queue of OffsetCommitRequests waiting in-queue or in-flight to be processed and responded to by the broker.
When this queue grows large enough the latency to make it through the queue is higher than the request timeout (socket.timeout.ms
, def 60s) and these requests will start failing with Local: Timed out
. Even if it does not come to that it still risks slowing down the consume rate if it is fetching messages from the same broker, due to the fact that a blocking FetchRequest will block sub-sequently queued OffsetCommitRequests, and vice versa.
The underlying problem is that the consumption and processing rate is higher than the commit rate, which inevitably leads to these queue build ups.
- Do you need to commit per message? If using async commits and the commit fails for whatever reason (such as timeout on queue!) that commit is still lost. If you really need per-message commits you probably want sync commits as well, but that will really slow things down, see FAQ item just above.
- What about not committing every message, but every say 100? or 1000? Whatever it takes to make the broker keep up with the commits. This is what the default auto offset committer does, but based on on the
auto.commit.interval.ms
time interval. - There are actually two layers of offset commit, first there is the offset store (
offset_store()
,store_offsets()
, etc) that locally stores (in memory) the offset to be committed, then there's the actual commit which commits the stored offset to the broker. An alternative is thus to explicitlystore()
each message's offset after processing but then relying on the auto offset committer to actually commit the offsets for you. Useenable.auto.offset.store=false
to disable automatic offset store (which is by default done prior to your processing).
librdkafka pre-fetches messages to an internal queue (so that consume()
or consumer_poll()
is instantaneous) and when there are no new messages available to fetch for the given topics+partitions the Fetch request will block on the broker for fetch.wait.max.ms
. This property defaults to 500ms which will thus cause 2 wake-ups per second in the case where there are no new messages - which will increase the CPU usage somewhat. This increased processing typically does not matter (since the consumer will be idle anyway) but it can be mitigated by increasing fetch.wait.max.ms
. Do note though that this will block that entire broker connection for that long, preventing metadata requests, commits, etc, from being sent until the Fetch request returns empty-handed after that long.
Messages are fetched in batches from the broker (the batch size is determined by the consumer) and since librdkafka strives for performance it will avoid copying the message payload unless necessary. To that end a message_t object is allocated for each message, holding its metadata, but the message's payload/value is referenced from a single zero-copy receive buffer, and that buffer is only freed when all message objects pointing to it are destroyed.
However, the queued.max.messages.kbytes
accounts for the size of the message objects (with referenced memory), when a message is destroyed (but the underlying buffer isn't) that space is decremented from the queued.max.messages.kbytes meter, allowing new messages to be fetched.
That is probably the reason why you are seeing an increase in memory usage when you destroy messages, which is of course counter-intuitive.
The default settings of librdkafka is optimized for the normal stream case and will aggressively prefetch messages, but if your application is pausing, seeking or processing messages slowly that pre-fetch buffer is not of much use and should be decreased. For this purpose, try adjusting queued.min.messages
and fetch.message.max.bytes
downwards.
You may also enable statistics and monitoring the partition's fetchq
metric.
Each broker connection is handled by its own thread, and each such thread has a partition fetcher that serves the assigned partitions for that broker. While there will only be at most one outstanding Fetch request for a single broker connection, other broker connections may have outstanding Fetch requests as well.
An assigned partition is included in the Fetch request if:
- it has a valid fetch offset (i.e., not currently acquiring a committed offset or a logical offset lookup (e.g.,
earliest
)), and - the partition's fetch queue, or the queue it has been forwarded to (a single global fetchq when using the high-level KafkaConsumer or having called
rd_kafka_poll_set_consumer()
), has less thanqueued.min.messages
enqueued, and - the same queue has less than
queued.max.messages.kbytes
enqueued, and - the partition is not backed off (
fetch.error.backoff.ms
after reaching end of partition or an error).
The broker will add messages to the Fetch response in the order the partitions were requested until there are no more messages for the requested partitions, or the maximum requested size (fetch.message.max.bytes
) or maximum response size is exceeded.
To provide fairness the order in which the partitions are added to the Fetch request is round-robin.
As the client parses the Fetch response the messages are appended to the partition queues (or forwarded-to global queue, see above) in the order they were returned from the broker, this means that all messages for partition A are appended, then partition B, then partition C, and so on.
The per-partition fetch decision logic can be seen in detail in rd_kafka_toppar_fetch_decide().
Each fetched partition has its own queue of fetched messages, when this queue size reaches queued.min.messages
or queued.max.messages.kbytes
the fetcher will stop fetching that partition until the queue drops below those thresholds.
When using the high-level consumer (KafkaConsumer) the partition queues are forwarded to a single queue so that you can read message from all partitions by calling rd_kafka_consumer_poll()
or KafkaConsumer::consume()
in a single place.
With the queues forwarded the queue thresholds are no longer checked per partition, but for the common queue, which means that all partitions stop fetching when the common queue size thresholds are exceeded.
If a set of partitions have the same rate of messages, but some of those partitions are on "faster" brokers (faster/closer/less-loaded broker) this shared queue behaviour may lead to these faster partitions occupying more space than the "slower" partitions.
One way around this partition rate inbalance is to consume messages from each partition separately. You do this by removing the forwarding of the partition queues (to the common queue) so that each partition queue is checked individually against the size thresholds, this will generate a more balanced partition queue load since each partition will be fetched up to the same threshold, instead of a few partitions taking up much of that space.
For each assigned partition, use get_partition_queue()
to get the partition's queue, then call forward(NULL)
on that queue to disable forwarding, then poll each queue separately.
Do note that you still need to call consumer_poll()
(or KafkaConsuner::consume()
) to serve rebalance events, etc, but it will not return any messages now.
Also make sure to adjust queued.min.messages
and queued.max.messages.kbytes
to limit the number of messages and bytes, since these are now applied per partition rather than globally.
Why does rd_kafka_produce()
always seem to succeed, even if there is a problem connecting to the Kafka brokers?
Because librdkafka has put your message on its outbound queue and will try to recover from the error for message.timeout.ms
before reporting failure. Delivery failures are reported asynchronously; meanwhile your program can be notified (also asynchronously) about connection issues if it registers an error callback.
Since the producer is asynchronous it needs a way to inform the application when a message has been delivered
or permanently fails delivery. It does this through a delivery report callback (DR callback), which is
triggered from the rd_kafka_poll()
call.
If you know that your producer application will have a steady stream of incoming messages to produce, you can call poll() from the produce loop, like so:
while (new data to produce) {
rd_kafka_produce(data); /* Non-blocking: Enqueues message to produce by background thread */
rd_kafka_poll(0); /* Non-blocking: Trigger any delivery report callbacks that are ready. */
}
rd_kafka_flush(-1); /* Wait for all outstanding messages to deliver, triggers delivery report callbacks too. */
A common misconception is that the poll() call in the above example will trigger the delivery report callback for the message just produced, but since the producer is asynchronous the message will most likely not even have been sent to the broker yet. But the poll() call may trigger delivery reports from previous produce calls.
In the case where the application does not know the incoming data rate to produce, or there are intervals of no messages, it is recommended to run poll() in a separate loop to trigger delivery reports (and other callbacks) in a timely manner. If the application has a main loop that is not driven by input data that is a good place to call poll.
Else spawn a new thread that simply calls poll, like so:
void *my_thread_main (..) {
while (!app_terminate)
rd_kafka_poll(1000); /* Use a timeout of 1s to avoid busy looping */
}
Note: a message is accounted for in the producer's maximum queue limit (queue.buffering.max.messages
and queue.buffering.max.kbytes
) until its delivery report callback has been triggered. To avoid ERR__QUEUE_FULL
errors, make sure to call poll regularily.
Because synchronous producing is a performance killer and scales very poorly, it is effectively bound by network and broker latency. If the round-trip to produce a message is 2ms the maximum achievable rate is 500 messages per second. In scenarios where the broker or network is getting slower for whatever reason this rate decreases even more, possibly causing backpressure in the application affecting the upstream data source.
It is thus better to design the application to use asynchronous sends and use the delivery report callback to take action when the message is finally delivered or fails permanently.
If you still think you need a sync producer, see How to implement a sync producer
Use a delivery callback (dr_msg_cb
) and inspect the rd_kafka_message_t.offset
field.
There are circumstances when using the idempotent or transactional producer where the offset field may be set to RD_KAFKA_OFFSET_INVALID
for successfully delivered messages.
No.
If consuming from cluster A and producing to cluster B it is not possible to use transactions since the input offsets that are given to send_offsets_to_transaction()
will be committed to cluster B, while the consumer would read the committed offsets from cluster A.