欢迎来到DIVCSS5查找CSS资料与学习DIV CSS布局技术!
说到Kafka的生产者,我们不得不提一个概念KafkaProducer,接下来我们我们了解了一下KafkaProducer的具体使用方法以及Kafka生产者客户端的内部原理进行分析。
 
KafkaProducer的使用
 
我们先来看一下KafkaProducer的结构
 
  public class ProducerRecord<K,V>{
 
        private final String topic; //主题
 
        private final Integer partition; //分区号
 
        private final Headers headers; //消息头部
 
        private final K key; //键
 
        private final V value; //值
 
        private final Long timestamp; //消息的时间戳
 
        //省略其他成员方法和构造方法
 
    }
 
其中topic和partition字段分别代表消息要发往的主题和分区号。
 
headers字段是消息的头部。
 
key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。
 
value是指消息体,一般不为空。
 
timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。
 
在创建真正的生产者实例前需要配置相应的参数。核心参数如下:
 
生产者客户端必要的参数配置
 
bootstrap.servers: 该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单。至少要设置两个以上的broker地址信息。
 
key.serializer和value.serializer:broker端接收的消息必须以字节数组(byte[])的形式存在。在发往broker之前需要将消息中对应的key和value做相应的序列化操作来转换成字节数组。
 
构建生产者实例
 
这仅仅是核心必填的参数,当然还有其他的参数配置。参数配置完成以后,我们创建一个生产者实例。
 
KafkaProducer<Stri ng, String> producer= new KafkaProducer<>(props) ;
 
KafkaProducer有多个构造方法,在实际应用而言,一般都选用 public KafkaProducer(Properties properties)这个构造方法来创建 KafkaProducer 实例。
 
KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将KafkaProducer 实例进行池化来供其他线程调用。
 
消息的发送
 
构建消息
 
构建消息,即创建 ProducerRecord 对象,构造方法有很多,根据需要选择。
 
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) 
 
public ProducerRecord(String topic, Integer partition,Long timestamp, K key, V value) 
 
public ProducerRecord(String topic, Integer partition,K key, V value, Iterable<Header> headers) 
 
public ProducerRecord(String topic, Integer partition, K key, V value) 
 
public ProducerRecord(String topic, K key, V value) 
 
public ProducerRecord(String topic, V value)
 
发送消息
 
创建生产者实例和构建之后,接下来就可以发送消息了,发送消息有以下三种模式:
 
发后即忘(fire-and-forget)
 
只管往Kafka中发送消息而不关心消息是否正确到达。
 
同步(sync)
 
异步(async)
 
KafkaProducer的send()方法并非是void类型,而是Future类型,send方法有2个重载方法,
 
public Future<RecordMetadata> send(ProducerRecord<K, V> record) 
 
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
 
要实现同步的发送方式,可以利用返回的Future对象实现第一种方法
 
try {
 
        producer.send(record)。get();
 
} catch (ExecutionException | InterruptedException e) {
 
        e.printStackTrace();
 
}
 
实际上send()方法本身是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。示例中在执行send()方法之后直接链式调用了get()方法来阻塞等待kafka的响应,知道消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。
 
 
也可以在执行完send()方法之后不直接调用get()方法,如下面的同步方法第二种方式的实现。
 
try {
 
         Future<RecordMetadata> future = producer.send(record);
 
         RecordMetadata metadata = future.get();
 
         System.out.println(metadata.topic() +"-" +metadata.partition()+":"+metadata.offset());
 
} catch (ExecutionException | InterruptedException e) {
 
         e.printStackTrace();
 
}
 
这样可以获取一个RecordMetadata对象,在RecordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。如果在应用代码中需要这些消息,则可以使用这个方式。如果不需要,则可以直接使用producer.send(record)。get()方法。
 
Future表示一个任务的生命周期,并提供了相应的方法来判断任务是否已经完成或取消,以及获取任务的结果和取消任务等。
 
序列化(Serializer)
 
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。
 
消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。
 
分区器(Partitioner)
 
消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正的发往broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
 
如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key字段来计算partition的值。分区器的作用是为消息分配分区。
 
拦截器(Interceptor)
 
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求。
 
生产者客户端的整体架构
 
消息在真正发往Kafka之前,有可能需要经历拦截器、序列化器和分区器等一系列的作用,那么在此之后又会发送什么呢?
 
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
 
RecordAccumulator 主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
 
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。
 
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较消耗资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。
 
总结
 
对于KafkaProducer而言,它是线程安全的,我么可以在多线程的环境中复用它。而对于消费者客户端KafkaConsumer而言,它是非线程安全的。

如需转载,请注明文章出处和来源网址:http://www.divcss5.com/html/h64933.shtml