Skip to content

Commit

Permalink
- 生产端核心类构造方法添加注释
Browse files Browse the repository at this point in the history
  • Loading branch information
developer-wenren committed Jun 15, 2021
1 parent a2d894f commit 52d921c
Showing 1 changed file with 49 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
Expand Down Expand Up @@ -46,7 +46,7 @@

/**
* A Kafka client that publishes records to the Kafka cluster.
* <P>
* <p>
* The producer is <i>thread safe</i> and sharing a single producer instance across threads will generally be faster than
* having multiple instances.
* <p>
Expand Down Expand Up @@ -138,8 +138,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. Values can be
* either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
* string "42" or the integer 42).
* @param configs The producer configs
*
* @param configs The producer configs
*/
public KafkaProducer(Map<String, Object> configs) {
this(new ProducerConfig(configs), null, null);
Expand All @@ -150,21 +150,23 @@ public KafkaProducer(Map<String, Object> configs) {
* Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
* either the string "42" or the integer 42).
* @param configs The producer configs
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be
* called in the producer when the serializer is passed in directly.
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't
* be called in the producer when the serializer is passed in directly.
*
* @param configs The producer configs
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be
* called in the producer when the serializer is passed in directly.
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer, valueSerializer);
keySerializer, valueSerializer);
}

/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* @param properties The producer configs
*
* @param properties The producer configs
*/
public KafkaProducer(Properties properties) {
this(new ProducerConfig(properties), null, null);
Expand All @@ -173,15 +175,16 @@ public KafkaProducer(Properties properties) {
/**
* A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
* Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* @param properties The producer configs
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be
* called in the producer when the serializer is passed in directly.
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't
* be called in the producer when the serializer is passed in directly.
*
* @param properties The producer configs
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be
* called in the producer when the serializer is passed in directly.
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
keySerializer, valueSerializer);
keySerializer, valueSerializer);
}

@SuppressWarnings({"unchecked", "deprecation"})
Expand Down Expand Up @@ -247,6 +250,7 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
}

// 初始化内存缓冲区
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
Expand All @@ -255,8 +259,10 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial
metrics,
time);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
// 元数据对象
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
// 与 Kafka 服务端通信的网络客户端
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
Expand All @@ -266,6 +272,7 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);
// 消息发送的任务
this.sender = new Sender(client,
this.metadata,
this.accumulator,
Expand All @@ -278,6 +285,7 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial
clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
// 运行消息发送任务的线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

Expand All @@ -304,8 +312,8 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
// 关联消息发送的拦截器链
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka producer started");
Expand Down Expand Up @@ -380,7 +388,7 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
* });
* }
* </pre>
*
* <p>
* Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
* following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
*
Expand All @@ -396,14 +404,12 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
* expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
* to parallelize processing.
*
* @param record The record to send
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
*
* @throws InterruptException If the thread is interrupted while blocked
* indicates no callback)
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid objects given the configured serializers
* @throws TimeoutException if the time taken for fetching metadata or allocating memory for the record has surpassed <code>max.block.ms</code>.
*
* @throws TimeoutException if the time taken for fetching metadata or allocating memory for the record has surpassed <code>max.block.ms</code>.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
Expand Down Expand Up @@ -489,7 +495,8 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call

/**
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
*
* @param topic The topic we want metadata for
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The amount of time we waited in ms
*/
Expand Down Expand Up @@ -524,14 +531,14 @@ private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedExce
private void ensureValidRecordSize(int size) {
if (size > this.maxRequestSize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the maximum request size you have configured with the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
" configuration.");
" bytes when serialized which is larger than the maximum request size you have configured with the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
" configuration.");
if (size > this.totalMemorySize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}

/**
Expand All @@ -556,7 +563,7 @@ private void ensureValidRecordSize(int size) {
* consumer.commit();
* }
* </pre>
*
* <p>
* Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur
* we need to set <code>retries=&lt;large_number&gt;</code> in our config.
*
Expand All @@ -576,6 +583,7 @@ public void flush() {

/**
* Get the partition metadata for the give topic. This can be used for custom partitioning.
*
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
Expand Down Expand Up @@ -622,10 +630,10 @@ public void close() {
* <code>close(0, TimeUnit.MILLISECONDS)</code>. This is done since no further sending will happen while
* blocking the I/O thread of the producer.
*
* @param timeout The maximum time to wait for producer to complete any pending requests. The value should be
* non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.
* @param timeout The maximum time to wait for producer to complete any pending requests. The value should be
* non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.
* @param timeUnit The time unit for the <code>timeout</code>
* @throws InterruptException If the thread is interrupted while blocked
* @throws InterruptException If the thread is interrupted while blocked
* @throws IllegalArgumentException If the <code>timeout</code> is negative.
*/
@Override
Expand All @@ -644,7 +652,7 @@ private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {
if (timeout > 0) {
if (invokedFromCallback) {
log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " +
"This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
"This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
} else {
// Try to close gracefully.
if (this.sender != null)
Expand All @@ -662,7 +670,7 @@ private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {

if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
log.info("Proceeding to force close the producer since pending requests could not be completed " +
"within timeout {} ms.", timeout);
"within timeout {} ms.", timeout);
this.sender.forceClose();
// Only join the sender thread when not calling from callback.
if (!invokedFromCallback) {
Expand All @@ -689,7 +697,7 @@ private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {
* if the record has partition returns the value otherwise
* calls configured partitioner class to compute the partition.
*/
private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
if (partition != null) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
Expand All @@ -701,7 +709,7 @@ private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[]
return partition;
}
return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
cluster);
cluster);
}

private static class FutureFailure implements Future<RecordMetadata> {
Expand Down Expand Up @@ -759,7 +767,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
if (this.interceptors != null) {
if (metadata == null) {
this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, Record.NO_TIMESTAMP, -1, -1, -1),
exception);
exception);
} else {
this.interceptors.onAcknowledgement(metadata, exception);
}
Expand Down

0 comments on commit 52d921c

Please sign in to comment.