找回密码
 立即注册
首页 业界区 安全 [Kafka/Java] KafkaProducer:原理、配置、调优 ...

[Kafka/Java] KafkaProducer:原理、配置、调优

全叶农 7 天前


概述:KafkaProducer

Kafka Producer : 消息生产者


  • Kafka 生产者是负责将消息发送到 Kafka 集群的组件。生产者可以是各种应用程序,如 Web 服务器、日志收集器等,它们将数据以消息的形式发送到 Kafka 主题。
生产者消息发送流程


  • 消息创建:生产者应用程序创建要发送的消息,消息包含键(Key)、值(Value)和可选的时间戳等信息。
  • 序列化:将消息的键和值转换为字节数组,以便在网络上传输。Kafka 提供了多种序列化器,如 StringSerializer、IntegerSerializer 等,也可以自定义序列化器。
  • 分区选择:根据消息的键或其他规则,确定消息要发送到的主题分区。
  • 消息累加器:将消息暂时存储在消息累加器(RecordAccumulator)中,它会对消息进行批量处理,提高发送效率。
  • 发送请求:当消息达到一定数量或达到一定时间间隔时,生产者将消息从消息累加器中取出,封装成请求(ProduceRequest)发送到 Kafka 集群的 Broker 节点。
  • 接收响应:生产者等待 Broker 节点的响应,确认消息是否成功发送。
深度剖析

1.png

Kafka生产者源码解析(一)——KafkaProducer - CSDN 【推荐】
2.png

9.3、Kafka 生产者详解 - Zhihu 【不推荐】
3.png

kafka实践(十二):生产者(KafkaProducer)源码详解和调试 - CSDN 【推荐】
发送原理


  • Kafka 生产者采用异步发送的方式,通过消息累加器和 Sender 线程实现高效的消息发送。


  • 消息累加器负责缓存消息
  • Sender 线程负责将缓存中的消息发送到 Kafka 集群。
这种设计使得生产者可以在后台异步处理消息发送,提高了发送效率。
核心依赖: kafka-clients
  1. <dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     kafka-clients</artifactId>
  4.     <version>2.7.2</version>
  5. </dependency>
复制代码
KafkaProducer的初始化过程分析

总述:Kafka 初始化的过程

分享Kafka初始化生产者的大体过程
初始化过程中会新建很多对象:

  • 分区器 --- Partitioner partitioner
  • 重试时间 --- long retryBackoffMs
  • 序列化器 --- Serializer keySerializer,Serializer valueSerializer
  • 拦截器 --- List interceptorList
  • 累加器 --- RecordAccumulator accumulator
  • 元数据 --- ProducerMetadata metadata
  • 创建sender线程 --- Sender sender
KafkaProducer 的初始化
  1. //org.apache.kafka.clients.producer.KafkaProducer
  2. //org.apache.kafka.clients.producer.ProducerConfig
  3. // 设置属性
  4. Properties properties = new Properties();
  5. // 指定连接的kafka服务器的地址,配置多台的服务  用,分割, 其中一个宕机,生产者 依然可以连上(集群)
  6. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "[kafka server ip]:[kafka server port]");
  7. // 1.分区器---Partitioner partitioner
  8. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
  9. // 2.重试时间---long retryBackoffMs
  10. properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 10L);
  11. // 3.key和value的序列化器---Serializer<K> keySerializer,Serializer<V> valueSerializer
  12. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  13. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  14. // 4.拦截器---List<ProducerInterceptor<K, V>> interceptorList
  15. properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyInterceptor.class);
  16. // 构建kafka生产者对象
  17. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
复制代码
分区器

定义


  • 对应初始化时设置的参数:
  1. ProducerConfig.PARTITIONER_CLASS_CONFIG
复制代码

  • 分区器是在发送消息时用来计算消息将要发送到哪个分区的,支持自定义分区器
分区好处


  • 提高并发性能:Kafka 主题可以划分为多个分区,每个分区可以在不同的 Broker 节点上存储和处理,生产者可以并行地向多个分区发送消息,消费者也可以并行地从多个分区消费消息,从而提高系统的并发处理能力。
  • 实现数据负载均衡:通过合理的分区策略,可以将消息均匀地分布到不同的分区中,避免某些 Broker 节点负载过高,实现数据的负载均衡。
  • 支持数据的顺序性:在同一个分区内,消息是按照写入顺序存储的,可以保证分区内消息的顺序性。
默认分区规则


  • 如果消息的键为 null:Kafka 会使用轮询(Round Robin)的方式将消息均匀地分配到各个分区中。
  • 如果消息的键不为 null:Kafka 会对键进行哈希计算,然后根据哈希值将消息分配到相应的分区中,保证具有相同键的消息总是发送到同一分区
示例1:默认的分区器
  1. // 这是kafka client 初始化生产者的[源码]
  2. // 如果没有设置自定义分区器,则partitioner为null,会影响到后续初始化逻辑以及发送消息时的逻辑
  3. this.partitioner = config.getConfiguredInstance(
  4.         ProducerConfig.PARTITIONER_CLASS_CONFIG,
  5.         Partitioner.class,
  6.         Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
复制代码
示例2:自定义分区器

如需使用自定义分区器,需要考虑好分区负载问题,切勿为了解决需求盲目使用自定义分区;
分区不合理可能影响broker性能,也是对低负载分区资源的浪费,严重情况下某一分区的消费者负载过大,或某一分区broker负载过大,可能导致雪崩
  1. // 这是自定义分区器的[示例代码]
  2. public class MyPartitioner implements Partitioner {
  3.     public int partition(String topic, Object key, byte[] keyBytes,
  4.                          Object value, byte[] valueBytes, Cluster cluster) {
  5.         List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
  6.         int num = partitionInfos.size();
  7.         // 与[org.apache.kafka.clients.producer.internals.DefaultPartitioner]
  8.         // 计算分区时一样的算法
  9.         int parId = Utils.toPositive(Utils.murmur2(valueBytes)) % num;
  10.         return parId;
  11.     }
  12.     public void close() {//do nothing}
  13.     public void configure(Map<String, ?> configs) {//do nothing}
  14. }
复制代码
补充:
  1. import org.apache.kafka.clients.producer.Partitioner;
  2. import org.apache.kafka.common.Cluster;
  3. import java.util.Map;
  4. public class CustomPartitioner implements Partitioner {
  5.     @Override
  6.     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  7.         // 获取主题的分区数量
  8.         int numPartitions = cluster.partitionsForTopic(topic).size();
  9.         // 自定义分区逻辑,这里简单地将键转换为字符串并取哈希值对分区数量取模
  10.         if (key == null) {
  11.             return 0;
  12.         } else {
  13.             return Math.abs(key.toString().hashCode()) % numPartitions;
  14.         }
  15.     }
  16.     @Override
  17.     public void close() {
  18.         // 关闭分区器时的清理操作
  19.     }
  20.     @Override
  21.     public void configure(Map<String, ?> configs) {
  22.         // 配置分区器时的初始化操作
  23.     }
  24. }
复制代码
在生产者配置中指定自定义分区器:
props.put("partitioner.class", "com.example.CustomPartitioner");
分区器使用


  • 用户可以通过实现该接口自定义分区器,在生产者调用send方法发送消息时,会使用【用户自定义的分区器】计算消息要发送到哪个分区。
自定义分区器简单实现见上面代码块
  1. org.apache.kafka.clients.producer.KafkaProducer#partition
复制代码

  • 在生产者调用send方法发送消息时,如果使用用户自定义的分区器,允许在第一次将消息放入本地缓存失败时,进行一次尝试:重现分配分区本地缓存
  1. // 这是kafka生产者调用send方法发送消息时的部分[源码]
  2. // check if we have an in-progress batch
  3. Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
  4. synchronized (dq) {
  5.     // After taking the lock, validate that the partition hasn't changed and retry.
  6.     if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
  7.         continue;
  8.     RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
  9.     if (appendResult != null) {
  10.         // 这是第一次就将消息添加到缓存后返回
  11.         // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
  12.         boolean enableSwitch = allBatchesFull(dq);
  13.         topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
  14.         return appendResult;
  15.     }
  16. }
  17. // 这是第一次尝试将消息放入本地缓存失败后,判断如果使用户自定义的分区器,则返回一个对象
  18. // 该对象将在append方法的调用处进行重新计算分区并重试一次
  19. // we don't have an in-progress record batch try to allocate a new batch
  20. if (abortOnNewBatch) {
  21.     // Return a result that will cause another call to append.
  22.     return new RecordAppendResult(null, false, false, true, 0);
  23. }
复制代码
重试时间

定义


  • 对应初始化时设置的参数: ProducerConfig.RETRY_BACKOFF_MS_CONFIG
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
尝试重试对给定主题分区的失败请求之前等待的时间。这避免了在某些失败场景下以紧密循环的方式重复发送请求
重试时间的使用


  • 初始化累加器时作为入参,保存到累加器字段[long retryBackoffMs]中,用于sender线程发送消息时检测重试超时
  • 初始化元信息对象[org.apache.kafka.clients.Metadata]时,放入字段[long refreshBackoffMs]中,用于更新元信息前判断等待时间
总结:该字段主要用于向服务器循环发送请求时停顿的等待时间
序列化器

定义


  • 对应初始化时设置的参数
  1. ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
  2. ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
复制代码
Serializer class for key that implements the org.apache.kafka.common.serialization.Serializerinterface.
实现org.apache.kafka.common.serialization.Serializer接口的key序列化程序类。
Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
实现org.apache.kafka.commun.serialization.Serializer接口的值的序列化程序类。
自定义序列化器
  1. // 序列化对象
  2. public class UserSerializer implements Serializer<User> {
  3.     public void configure(Map<String, ?> configs, boolean isKey) {
  4.         //do nothing
  5.     }
  6.     public byte[] serialize(String topic, User data) {
  7.         try {
  8.             byte[] name;
  9.             int nameSize;
  10.             if (data == null) {
  11.                 return null;
  12.             }
  13.             if (data.getName() != null) {
  14.                 name = data.getName().getBytes("UTF-8");
  15.                 //字符串的长度
  16.                 nameSize = data.getName().length();
  17.             } else {
  18.                 name = new byte[0];
  19.                 nameSize = 0;
  20.             }
  21.             /*id的长度4个字节,字符串的长度描述4个字节,
  22.             字符串本身的长度nameSize个字节*/
  23.             ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + nameSize);
  24.             buffer.putInt(data.getId());//4
  25.             buffer.putInt(nameSize);//4
  26.             buffer.put(name);//nameSize
  27.             return buffer.array();
  28.         } catch (Exception e) {
  29.             throw new SerializationException("Error serialize User:" + e);
  30.         }
  31.     }
  32.     public void close() {
  33.         //do nothing
  34.     }
  35. }
  36. // 反序列化
  37. public class UserDeserializer implements Deserializer<User> {
  38.     public void configure(Map<String, ?> configs, boolean isKey) {
  39.         //do nothing
  40.     }
  41.     public User deserialize(String topic, byte[] data) {
  42.         try {
  43.             if (data == null) {
  44.                 return null;
  45.             }
  46.             if (data.length < 8) {
  47.                 throw new SerializationException("Error data size.");
  48.             }
  49.             ByteBuffer buffer = ByteBuffer.wrap(data);
  50.             int id;
  51.             String name;
  52.             int nameSize;
  53.             id = buffer.getInt();
  54.             nameSize = buffer.getInt();
  55.             byte[] nameByte = new byte[nameSize];
  56.             buffer.get(nameByte);
  57.             name = new String(nameByte, "UTF-8");
  58.             return new User(id, name);
  59.         } catch (Exception e) {
  60.             throw new SerializationException("Error Deserializer DemoUser." + e);
  61.         }
  62.     }
  63.     public void close() {
  64.         //do nothing
  65.     }
  66. }
  67. // 消息中的实体类
  68. public class User {
  69.     private int id;
  70.     private String name;
  71.     public User(int id) {
  72.         this.id = id;
  73.     }
  74.     public User(int id, String name) {
  75.         this.id = id;
  76.         this.name = name;
  77.     }
  78.     public int getId() {
  79.         return id;
  80.     }
  81.     public void setId(int id) {
  82.         this.id = id;
  83.     }
  84.     public String getName() {
  85.         return name;
  86.     }
  87.     public void setName(String name) {
  88.         this.name = name;
  89.     }
  90.     @Override
  91.     public String toString() {
  92.         return "User{" +
  93.                 "id=" + id +
  94.                 ", name='" + name + '\'' +
  95.                 '}';
  96.     }
  97. }
复制代码
使用自定义序列化器时需要在消费者消费消息时使用反序列化器将消息反序列化,一般常用的就是字符串序列化器
org.apache.kafka.common.serialization.StringSerializer
拦截器


  • 对应初始化时设置的参数
  1. ProducerConfig.INTERCEPTOR_CLASSES_CONFIG
复制代码
Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors
通过实现org.apache.kafka.clients.producer.ProducerInterceptor接口,您可以在生产者接收到的记录发布到kafka集群之前拦截这些记录。
默认情况下,没有拦截器
自定义序拦截器
  1. // 这是自定义序列化器[示例代码]
  2. public class MyInterceptor implements ProducerInterceptor<String, String> {
  3.     private long successCount = 0L;
  4.     private long errorCount = 0L;
  5.     //该方法:Producer在将消息序列化和分配分区之前会调用拦截器的这个方法来对消息进行相应的操作
  6.     @Override
  7.     public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
  8.         //要把发送的value都带上时间戳
  9.         return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + record.value(), record.headers());
  10.     }
  11.     //发送消息情况统计
  12.     //该方法:会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前
  13.     @Override
  14.     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  15.         if (exception == null) {
  16.             successCount++;
  17.         } else {
  18.             errorCount++;
  19.         }
  20.     }
  21.     //该方法:可以关闭拦截器,主要用于执行一些资源清理工作
  22.     @Override
  23.     public void close() {
  24.         //producer发送数据结束并close后,会自动调用拦截器的close方法来输出统计的成功和失败次数
  25.         System.out.println("成功次数=" + successCount);
  26.         System.out.println("失败次数=" + errorCount);
  27.     }
  28.     @Override
  29.     public void configure(Map<String, ?> configs) {
  30.     }
  31. }
复制代码
累加器

定义


  • 这是Kafka中非常重要的内部组件,主要用于缓存消息以便批量发送,从而减少网络传输的资源消耗、并提升性能
简单概述:
对于生产者的作用是使用累加器,可以让生产者不必每次发送消息就即刻推送到broker,可以将一个topic的同一分区消息写入同一份缓存,等待sender线程【批量获取】这批消息一次性发送到broker。减少生产者发起网络调用的次数。


  • 对broker而言的作用:broker也可以一次性将接收到的这一批多个消息以顺序IO的方式追加到文件中,提高了储存效率。
This class acts as a queue that accumulates records into MemoryRecords instances to be sent to the server.
The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
this behavior is explicitly disabled.
此类充当一个队列,将记录累积到要发送到服务器的MemoryRecords实例中。
累加器使用一定量的内存,当内存耗尽时,追加调用将被阻塞,除非此行为被明确禁用。
累加器核心作用


  • 消息缓存:RecordAccumulator实际上是在客户端开辟出的一块内存区域,主要用来缓存消息。这种缓存机制允许Sender线程后续批量发送消息,而不是每调用一次send方法就直接将消息发送给broker。
  • 批量发送:当触发发送条件如MemoryRecords缓存或者Deque队列满了或者一个队列等待时间达到配置时间,sender线程将一次性将这批消息发送给broker。这种方式可以减少网络请求的数量,提高系统的吞吐量。这些触发条件可参考下面参数配置的buffer.memory,batch.size,linger.ms,max.block.ms
  • 资源管理:如果生产者发送消息的速度超过发送到服务器的速度,那么累加器中空间不足的话,就会导致生产者无法继续发送消息。在这种情况下,生产者可以通过设置max.block.ms参数来控制是否阻塞或者抛出异常。如果max.block.ms参数的默认值为60000(即60秒),那么超过这个时间限制后,如果累加器仍然没有足够的空间,生产者将无法继续发送消息。
关于累加器的实现,涉及到内部缓存管理broker服务器元数据统计sender线程交互等逻辑,后续再进行分享。
累加器的相关配置参数

这些参数都是有关累加器的重要配置,直接影响kafka生产者发送消息的性能
配置解释默认值buffer.memoryProducer 用来缓冲等待被发送到服务器的记录的总字节数。如果记录发送的速度比发送到服务器的速度快, Producer 就会阻塞,如果阻塞的时间超过 max.block.ms 配置的时长,则会抛出一个异常。
这个配置与 Producer 的可用总内存有一定的对应关系,但并不是完全等价的关系,因为 Producer 的可用内存并不是全部都用来缓存。一些额外的内存可能会用于压缩(如果启用了压缩),以及维护正在运行的请求。33554432batch.size当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。
当记录的大小超过了配置的字节数, Producer 将不再尝试往批次增加记录。
发送到 broker 的请求会包含多个批次的数据,每个批次对应一个 partition 的可用数据
小的 batch.size 将减少批处理,并且可能会降低吞吐量(如果 batch.size = 0的话将完全禁用批处理)。 很大的 batch.size 可能造成内存浪费,因为我们一般会在 batch.size 的基础上分配一部分缓存以应付额外的记录。16384linger.msproducer 会将两个请求发送时间间隔内到达的记录合并到一个单独的批处理请求中。通常只有当记录到达的速度超过了发送的速度时才会出现这种情况。然而,在某些场景下,即使处于可接受的负载下,客户端也希望能减少请求的数量。这个设置是通过添加少量的人为延迟来实现的;即,与其立即发送记录, producer 将等待给定的延迟时间,以便将在等待过程中到达的其他记录能合并到本批次的处理中。这可以认为是与 TCP 中的 Nagle 算法类似。这个设置为批处理的延迟提供了上限:一旦我们接受到记录超过了分区的 batch.size ,Producer 会忽略这个参数,立刻发送数据。但是如果累积的字节数少于 batch.size ,那么我们将在指定的时间内“逗留”(linger),以等待更多的记录出现。这个设置默认为0(即没有延迟)。例如:如果设置linger.ms=5 ,则发送的请求会减少并降低部分负载,但同时会增加5毫秒的延迟。0max.block.ms该配置控制 KafkaProducer.send()和KafkaProducer.partitionsFor() 允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。初始化元数据


  • 对应初始化时设置的参数
  1. ProducerConfig.METADATA_MAX_AGE_CONFIG
复制代码
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
以毫秒为单位的时间段,在此之后,即使我们没有看到任何主分区的变化,我们也会强制刷新元数据,以主动发现任何新的代理或分区
---元数据刷新时间
  1. ProducerConfig.METADATA_MAX_IDLE_CONFIG
复制代码
Controls how long the producer will cache metadata for a topic that's idle. If the elapsed time since a topic was last produced to exceeds the metadata idle duration, then the topic's metadata is forgotten and the next access to it will force a metadata fetch request.
控制生产者为空闲主题缓存元数据的时间。如果自上次生成主题以来经过的时间超过了元数据空闲持续时间,则该主题的元数据将被遗忘,下一次对其的访问将强制执行元数据获取请求。
---生产者客户端为[某一主题]缓存元数据的时间,超过该时间后获取该主题元数据将强制从broker获取


  • 初始化元数据信息分为2部分:
第1部分初始化ProducerMetadata对象,设置元数据,topic信息缓存空闲时间
  1. this.metadata = new ProducerMetadata(retryBackoffMs,
  2.         config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
  3.         config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
  4.         logContext,
  5.         clusterResourceListeners,
  6.         Time.SYSTEM);
复制代码


  • 第2部分为加载broker节点信息Node
Node:org.apache.kafka.common.Node
  1. this.metadata.bootstrap(addresses);
  2. ...
  3. public synchronized void bootstrap(List<InetSocketAddress> addresses) {
  4.     this.needFullUpdate = true;
  5.     this.updateVersion += 1;
  6.     this.cache = MetadataCache.bootstrap(addresses);
  7. }
  8. ...
  9. static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
  10.     Map<Integer, Node> nodes = new HashMap<>();
  11.     int nodeId = -1;
  12.     for (InetSocketAddress address : addresses) {
  13.         nodes.put(nodeId, new Node(nodeId, address.getHostString(), address.getPort()));
  14.         nodeId--;
  15.     }
  16.     return new MetadataCache(null, nodes, Collections.emptyList(),
  17.             Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
  18.             null, Collections.emptyMap(), Cluster.bootstrap(addresses));
  19. }
复制代码
只是初始化元数据最外层的代码,bootstrap是kafka生产者客户端初始化broker信息缓存的入口,执行这个方法后这个客户端将缓存kakfa broker的节点信息,每个节点的topic信息,分片信息等缓存。
在客户端发送消息时,将通过这些缓存信息给broker发起请求,所以这块缓存是生产者客户端非常重要的部分,详情请看:三、Kafka生产者4---核心组件[元数据]-Metadata - CSDN
创建sender线程


  • 这是一个无限循环运行在后台的线程,会一直等待累加器中【缓存的消息】达到【发送条件】,把消息发送给Broker
  • 发送的核心流程是:

  • 从累加器中批量获取消息并创建 ClientRequest 对象
  • 将 ClientRequest 对象交给 NetworkClient 客户端发送
  • NetworkClient 客户端将请求放入 KafkaChannel 的缓存
  • NetworkClient 执行网络 I/O,完成请求的发送
  • NetworkClient 收到响应,调用 ClientRequest 的回调函数,触发每个消息上注册的回调函数
完整案例

异步发送 API

普通异步发送
  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. public class AsyncProducer {
  4.     public static void main(String[] args) {
  5.         // 配置生产者属性
  6.         Properties props = new Properties();
  7.         props.put("bootstrap.servers", "localhost:9092");
  8.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  9.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  10.         // 创建生产者实例
  11.         Producer<String, String> producer = new KafkaProducer<>(props);
  12.         // 创建消息
  13.         ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key", "value");
  14.         // 异步发送消息
  15.         producer.send(record);
  16.         // 关闭生产者
  17.         producer.close();
  18.     }
  19. }
复制代码
带回调函数的异步发送
  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. public class AsyncProducerWithCallback {
  4.     public static void main(String[] args) {
  5.         // 配置生产者属性
  6.         Properties props = new Properties();
  7.         props.put("bootstrap.servers", "localhost:9092");
  8.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  9.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  10.         // 创建生产者实例
  11.         Producer<String, String> producer = new KafkaProducer<>(props);
  12.         // 创建消息
  13.         ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key", "value");
  14.         // 异步发送消息并添加回调函数
  15.         producer.send(record, new Callback() {
  16.             @Override
  17.             public void onCompletion(RecordMetadata metadata, Exception exception) {
  18.                 if (exception != null) {
  19.                     System.err.println("消息发送失败: " + exception.getMessage());
  20.                 } else {
  21.                     System.out.println("消息发送成功,分区: " + metadata.partition() + ", 偏移量: " + metadata.offset());
  22.                 }
  23.             }
  24.         });
  25.         // 关闭生产者
  26.         producer.close();
  27.     }
  28. }
复制代码
同步发送 API
  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. public class SyncProducer {
  4.     public static void main(String[] args) {
  5.         // 配置生产者属性
  6.         Properties props = new Properties();
  7.         props.put("bootstrap.servers", "localhost:9092");
  8.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  9.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  10.         // 创建生产者实例
  11.         Producer<String, String> producer = new KafkaProducer<>(props);
  12.         // 创建消息
  13.         ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key", "value");
  14.         try {
  15.             // 同步发送消息
  16.             RecordMetadata metadata = producer.send(record).get();
  17.             System.out.println("消息发送成功,分区: " + metadata.partition() + ", 偏移量: " + metadata.offset());
  18.         } catch (Exception e) {
  19.             System.err.println("消息发送失败: " + e.getMessage());
  20.         }
  21.         // 关闭生产者
  22.         producer.close();
  23.     }
  24. }
复制代码
重要应用场景与FAQ

Q:生产者如何提高吞吐量?


  • 增加批次大小:通过调整 batch.size 参数,增加每个批次发送的消息数量,减少网络请求次数,提高发送效率。
  • 增加缓冲区大小:增大 buffer.memory 参数,为消息累加器分配更多的内存,允许缓存更多的消息,提高消息的批量处理能力。
  • 使用压缩算法:通过设置 compression.type 参数,如 gzip、snappy 或 lz4 等,对消息进行压缩,减少网络传输的数据量,提高吞吐量。
  • 异步发送:使用异步发送方式,让生产者在后台异步处理消息发送,避免同步发送时的阻塞等待。
Q:生产者如何提高数据可靠性?

ack 应答原理与发送速率


  • Kafka 生产者通过 acks 参数来控制消息发送的可靠性,acks 参数有以下几种取值:


  • acks = 0:生产者发送消息后,不需要等待 Broker 的确认,立即发送下一条消息。这种方式发送速度最快,但可能会丢失消息,因为如果消息在发送过程中出现问题,生产者不会得到通知。
  • acks = 1:生产者发送消息后,只要 Leader 分区成功写入消息,就会收到 Broker 的确认响应。这种方式可以保证消息在 Leader 分区不丢失,但如果 Leader 分区在发送确认响应后发生故障,而消息还未同步到 Follower 分区,消息可能会丢失。
  • acks = -1 或 all:生产者发送消息后,需要等待 Leader 分区和所有的 Follower 分区都【成功写入】消息,才会收到 Broker 的确认响应。这种方式可以保证消息不会丢失,但发送速度最慢,因为需要等待所有副本都写入成功。
可靠性分析


  • acks = 0:可靠性最低,可能会丢失大量消息,但发送速度最快,适用于对数据可靠性要求不高的场景,如日志收集。
  • acks = 1:可靠性适中,在大多数情况下可以保证消息不丢失,但在某些极端情况下可能会出现消息丢失,适用于对数据可靠性有一定要求的场景。
  • acks = -1 或 all:可靠性最高,几乎可以保证消息不会丢失,但发送速度最慢,适用于对数据可靠性要求极高的场景,如金融交易。
数据重复分析


  • 当 acks = -1 或 all 时,如果生产者在发送消息后没有收到 Broker 的确认响应,可能会重试发送消息,导致消息重复
  • 另外,在 Broker 节点故障恢复过程中,也可能会出现消息重复的情况。
数据去重


  • 业务层去重:在消费者端,通过业务逻辑对消息进行去重,如使用唯一标识(如消息 ID)来判断消息是否已经处理过。
  • Kafka 幂等性:Kafka 0.11.0.0 版本引入了幂等性特性,通过设置 enable.idempotence = true,Kafka 会自动为每个生产者分配一个唯一的 ID,并为每条消息分配一个唯一的序列号,Broker 会根据序列号来判断消息是否重复,从而实现消息的去重。
数据传递语义


  • 最多一次(At Most Once):消息可能会丢失,但不会重复发送,对应 acks = 0 的情况。
  • 至少一次(At Least Once):消息不会丢失,但可能会重复发送,对应 acks = 1 或 acks = -1 或 all 的情况。
  • 精确一次(Exactly Once):消息只会被处理一次,不会丢失也不会重复,通过 Kafka 的幂等性和事务特性可以实现精确一次的消息传递语义。
幂等性


  • Kafka 的幂等性是指生产者在重试发送消息时,Broker 能够保证相同的消息只会被处理一次。
通过设置 enable.idempotence = true,Kafka 会自动为每个生产者分配一个唯一的 ID(PID),并为每条消息分配一个唯一的序列号,Broker 会根据序列号来判断消息是否重复,从而避免消息的重复处理。
Q:生产者事务?


  • Kafka 0.11.0.0 版本引入了事务特性,允许生产者在一个事务中发送多条消息,保证这些消息要么全部成功发送,要么全部失败。
通过以下步骤可以实现生产者事务:
  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. public class TransactionalProducer {
  4.     public static void main(String[] args) {
  5.         // 配置生产者属性
  6.         Properties props = new Properties();
  7.         props.put("bootstrap.servers", "localhost:9092");
  8.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  9.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  10.         props.put("enable.idempotence", "true");
  11.         props.put("transactional.id", "my-transactional-id");
  12.         // 创建生产者实例
  13.         Producer<String, String> producer = new KafkaProducer<>(props);
  14.         // 初始化事务
  15.         producer.initTransactions();
  16.         try {
  17.             // 开始事务
  18.             producer.beginTransaction();
  19.             // 发送消息
  20.             ProducerRecord<String, String> record1 = new ProducerRecord<>("test_topic", "key1", "value1");
  21.             ProducerRecord<String, String> record2 = new ProducerRecord<>("test_topic", "key2", "value2");
  22.             producer.send(record1);
  23.             producer.send(record2);
  24.             // 提交事务
  25.             producer.commitTransaction();
  26.         } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
  27.             // 生产者被隔离或权限不足,需要关闭生产者
  28.             producer.close();
  29.         } catch (KafkaException e) {
  30.             // 事务失败,回滚事务
  31.             producer.abortTransaction();
  32.         }
  33.         // 关闭生产者
  34.         producer.close();
  35.     }
  36. }
复制代码
数据有序

数据乱序


  • 异步发送消息时,如果生产者在发送消息时使用了多个线程或消息在网络传输过程中出现延迟,可能会导致消息到达 Broker 的顺序发送顺序不一致,从而出现数据乱序的情况。
Q:生产者核心参数配置?


  • bootstrap.servers:指定 Kafka 集群的地址,用于生产者连接 Kafka 集群。
  • key.serializer:指定消息键的序列化器,将键转换为字节数组。
  • value.serializer:指定消息值的序列化器,将值转换为字节数组。
  • acks:控制消息发送的可靠性,取值为 0、1、-1 或 all。
  • batch.size:指定每个批次发送的消息大小,单位为字节。
  • linger.ms:指定生产者在发送批次之前等待更多消息的时间,单位为毫秒。
  • buffer.memory:指定消息累加器的缓冲区大小,单位为字节。
  • compression.type:指定消息的压缩算法,如 gzip、snappy 或 lz4 等。
  • enable.idempotence:是否开启幂等性特性,取值为 true 或 false。
  • transactional.id:指定生产者的事务 ID,用于实现生产者事务。
Q: KafkaProducer 发送消息时异常,报: Kafka Topic xxx not present in metadata问题?


  • 推荐文献


  • 分区配置错误导致Kafka Topic xxx not present in metadata bug问题排查 - CSDN
  • Kafka常见问题之Kafka 报错:org.apache.kafka.common.errors.TimeoutException: Topic topic not present in metadata after 60000 ms - CSDN
Y 推荐文献


  • Apache Kafka


  • https://kafka.apache.org/documentation/#producerconfigs


  • 生产者配置 - kafka.apchecn.org
Q:kafka生产者发送消息报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected?


  • 推荐文献


  • kafka生产者发送消息报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected - CSDN
config/server.properties : listerners / advertised.listeners
Y 推荐文献


  • kafka实践(十二):生产者(KafkaProducer)源码详解和调试 - CSDN
  • Kafka生产者源码解析(一)——KafkaProducer - CSDN
  • Kafka消息中间件(二)(生产与消费全流程) - CSDN
X 参考文献


  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer - CSDN
  • kafka Producer生产者 - CSDN
    本文作者:        千千寰宇   
    本文链接:         https://www.cnblogs.com/johnnyzen   
    关于博文:评论和私信会在第一时间回复,或直接私信我。   
    版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA     许可协议。转载请注明出处!
    日常交流:大数据与软件开发-QQ交流群: 774386015        【入群二维码】参见左下角。您的支持、鼓励是博主技术写作的重要动力!   

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册