前言
在文章系列文章中中,我们已经做到使用filebeat收集日志事件、logstash处理日志事件、发送日志事件到kafka集群,并在消费者中消费的过程。其中,为kafka集群生产消息的,是logstash服务,而非我们自定义的生成者。在本文中,将主要介绍KafkaProducer
类相关的一些接口和理论知识(基于kafka 1.1版本)。
KafkaProducer类
Package org.apache.kafka.clients.producerpublic class KafkaProducerextends java.lang.Object implements Producer
-
KafkaProducer
类用于向kafka集群发布消息记录,其中<K, V>
为泛型,指明发送的消息记录key/value对的类型。 - kafka producer(生产者)是线程安全的,多个线程共享一个producer实例,相比于多个producer实例,这样做的效率更高、更快。
构造函数
构造函数 | 描述 |
---|---|
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs) | 配置信息为Map形式,构造Producer |
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) | 配置信息为Map形式,可以指定自定义的用于序列化key和value的类。 |
KafkaProducer(java.util.Properties properties) | 配置信息放在Properties对象中,构造Producer。如,可以从配置文件***.properties 中读取,或者新建Properties对象,再设置配置信息。 |
KafkaProducer(java.util.Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) | 配置信息放在Properties对象中,可指定自定义的key和value的序列化类。 |
- 从上述构造函数可以看出,可以通过Map和Properties两种形式传递配置信息,用于构造
Producer
对象,配置信息均为key/value
对。 - 可配置的信息可参见官方配置表,其中Value可以是String类型,也可以是其他合适的类型。
- kafka的包
org.apache.kafka.common.serialization
中,提供了许多已经实现好的序列化和反序列化的类,可以直接使用。你也可以实现自己的序列化和反序列化类(实现Serializer
接口),选择合适的构造函数构造你的Producer
类。 -
想用kafka自带的序列化类,可在配置信息中配置,如:
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- producer被创建之后,使用完之后,一定要记得将其
close
,否则会造成资源泄露的问题。
方法
修饰&返回 | 方法 | 描述 |
---|---|---|
void | abortTransaction() | 中止进行中的事务 |
void | beginTransaction() | 开始事务,在开始任何新事务之前,都英应该调用此方法 |
void | close() | 关闭该producer,释放资源 |
void | close(long timeout, java.util.concurrent.TimeUnit timeUnit) | 等待timeout指定的时长后关闭该producer,以便producer可以将还未完成发送的消息发送完。timeUnit为时间单位。如果超时,则强制关闭。 |
void | commitTransaction() | 提交进行中的事务 |
void | flush() | 调用次方法,使kafka生成者发送缓冲区中的消息记录(record)可以被立即发送(即使linger.ms 大于0)。并且一直阻塞,直到这些消息记录都发送完成 |
void | initTransactions() | 当构造producer时,如果配置了transactional.id ,那么在调用其transaction相关函数之前,都必须先调用该函数 |
java.util.Map<MetricName,? extends Metric> | metrics() | 列出producer中维护的所有内部监控(metrics)设置 |
java.util.List<PartitionInfo> | partitionsFor(java.lang.String topic) | 从指定的主题(topic)中,获取分区(partition)的元数据 |
java.util.concurrent.Future<RecordMetadata> | send(ProducerRecord<K,V> record) | 异步发送消息记录(record)到指定的主题 |
java.util.concurrent.Future<RecordMetadata> | send(ProducerRecord<K,V> record, Callback callback) | 异步发送一个消息记录到指定topic,当发送被确认完成之后,调用回调函数(callback) |
void | sendOffsetsToTransaction(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.lang.String consumerGroupId) | 发送指定的偏移量列表(offsets)到消费者组协调者(consumer group coordinator),并且将这些偏移量(offsets)标记为当前事务的一部分 |
- 如果
close()
方法是在回调方法callback
中被调用,那么kafka将会输出一条警告日志,并且将其替换为close(0, TimeUnit.MILLISECONDS)
,这样做的目的是为了避免发送线程(sender thread)永远阻塞。 -
flush()
函数的后置条件(方法顺利执行完毕之后必须为真的条件)是:待发送缓冲区中所有待发送的记录record都发送完成。发送完成指的是成功收到了在构建producer时设置的acks 配置的确认acks。 - 当一个线程阻塞于flush调用(等待其完成)时,其他线程可以继续发送消息记录record,但不保证这些在
flush()
调用开始之后发送的消息记录能够真正完成。可以通过设置重试配置retries=<large_number>
来降低消息记录不被送达的情况。 - 对于
事务性的producer
,不需要调用flush()
函数,因为commitTransaction()
函数在提交事务之前,会将缓冲中的记录进行flush,这样可以确保那些在beginTransaction()
之前被send(ProducerRecord)
的消息记录将在事务提交之前完成。 - 在第一次调用
beginTransaction()
方法之前,必须调用一次initTransactions()
方法。 -
initTransactions()
方法的主要作用是:- 确保被其他producer实例初始化的、具有相同transactional.id的事务被完成。如果这些事务在进行过程中失败,则事务被中止;如果事务已经开始,但还没完成,那么
initTransactions()
函数会等待它完成。 - 获取内部producer的id和epoch,用于后续该producer产生的事务性消息。
- 确保被其他producer实例初始化的、具有相同transactional.id的事务被完成。如果这些事务在进行过程中失败,则事务被中止;如果事务已经开始,但还没完成,那么
-
send(ProducerRecord<K,V> record, Callback callback)
方法将在record被送入到待发送的buffer之后,立即返回。因此,可允许并行发送record,而不用阻塞等待每个record发送完成。该方法的返回值RecordMetadata
为该record被发送到的分区partition的元数据
,如偏移量offset,创建时间CreateTime等。要想阻塞等待发送完成,可以调用Future的get()
方法,如:producer.send(record).get();
小结
上面基本上对KafkaProducer类的主要接口做了解释,主要参考了,从上面的一些方法中,可以看到kafka的一些特性,如:发送缓冲区机制,事务性producer等,这些复杂概念将在后续文章中再做深入探索。