✅Kafka如何实现批量消费?

✅Kafka如何实现批量消费?

典型回答

批量消费指的是一次性拉过来一批消息,然后进行批量处理。

Kafka想要实现批量消费有很多种方案。其中比较简单的就是基于@KafkaListener 实现,这也是比较推荐的方案。(还有些其他方案,比如用原生kafka肯定也能,包括Spring Cloud Stream也支持kafka的批量消费,但是用的都不多。)

首先需要依赖spring-kafka这个包:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.4.RELEASE</version>
</dependency>

接着需要配置一个消费者工厂:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    //开启批量消费
    factory.setBatchListener(true); 
    //设置手动提交
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
    return factory;
}

这里通过factory.setBatchListener(true); 的方式设置采用批量消费,但是需要注意的是,ConcurrentKafkaListenerContainerFactory的默认的提交方式是自动提交,如果在自动提交模式下,批量消费是有可能会丢消息的,所以,需要设置为手动提交。

接下来就可以用这个消费者工厂来配置监听器做批量的消息监听了。

@KafkaListener(topics = "my-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
    // 批量处理逻辑
    
    // 处理完毕后统一提交 offset
    ack.acknowledge();  //手动提交偏移量
}

这里使用@KafkaListener注解,然后配置containerFactory为刚刚我们创建的批量消费的工厂,然后再listen方法的入参中,使用List<ConsumerRecord> 来接收一批消息。之后就可以批量处理这些消息了,比如开个线程池并发处理。

但是需要注意,这里一定要确保在所有的消息都处理成功之后,再手动提交偏移量。手动提交偏移量的时候,会把这批消息中最大的offset + 1进行提交,所以,一定要确保所有消息都成功了再提交,否则就会丢消息。

✅Kafka 中的Offset是什么?

比如我看网上有些代码,在finally中去调用ack.acknowledge(); 这是不对的,因为这样你无法确保消息都处理成功。

如果又想并发处理,又想确保所有消息都成功再提交,那么可以用CompletionService来实现这样的功能。

扩展知识

批量消费如何防止消息丢失

✅Kafka的批量消费如何确保消息不丢?