✅Kafka如何保证消息不丢失?
典型回答
Kafka作为一个消息中间件,他需要结合消息生产者和消费者一起才能工作,一次消息发送包含以下是三个过程:
1)Producer 端发送消息给 Kafka Broker 。
2)Kafka Broker 将消息进行同步并持久化数据。
3)Consumer 端从Kafka Broker 将消息拉取并进行消费。
Kafka只对已提交的消息做最大限度的持久化保证不丢失,但是没办法保证100%。
但是,Kafka还是提供了很多机制来保证消息不丢失的。要想知道Kafka如何保证消息不丢失,需要从生产者、消费者以及kafka集群三个方面来分析。
Producer
消息的生产者端,最怕的就是消息发送给Kafka集群的过程中失败,所以,我们需要有机制来确保消息能够发送成功,但是,因为存在网络问题,所以基本没有什么办法可以保证一次消息一定能成功。
所以,就需要有一个确认机制来告诉生产者这个消息是否有发送成功,如果没成功,需要重新发送直到成功。
我们通常使用Kafka发送消息的时候,通常使用的producer.send(msg)其实是一种异步发送,发送消息的时候,方法会立即返回,但是并不代表消息一定能发送成功。(producer.send(msg).get() 是同步等待返回的。)
那么,为了保证消息不丢失,通常会建议使用**producer.send(msg, callback)**方法,这个方法支持传入一个callback,我们可以在消息发送时进行重试。
同时,我们也可以通过给producer设置一些参数来提升发送成功率:
acks=-1 // 表示 Leader 和 Follower 都接收成功时确认;可以最大限度保证消息不丢失,但是吞吐量低。
retries=3 // 生产端的重试次数
retry.backoff.ms = 300 //消息发送超时或失败后,间隔的重试时间acks = 0: 表示Producer请求立即返回,不需要等待Leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。
acks = -1: 表示分区Leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为Producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
acks = 1: 表示Leader副本必须应答此Producer请求并写入消息到本地日志,之后Producer请求被认为成功。如果此时Leader副本应答请求之后挂掉了,消息会丢失。这个方案,提供了不错的持久性保证和吞吐。
Broker
Kafka的集群有一些机制来保证消息的不丢失,比如复制机制、持久化存储机制以及ISR机制。
- 持久化存储:Kafka使用持久化存储来存储消息。这意味着消息在写入Kafka时将被写入磁盘,这种方式可以防止消息因为节点宕机而丢失。
- ISR复制机制:Kafka使用ISR机制来确保消息不会丢失,Kafka使用复制机制来保证数据的可靠性。每个分区都有多个副本,副本可以分布在不同的节点上。当一个节点宕机时,其他节点上的副本仍然可以提供服务,保证消息不丢失。
在服务端,也有一些参数配置可以调节来避免消息丢失:
replication.factor //表示分区副本的个数,replication.factor >1 当leader 副本挂了,follower副本会被选举为leader继续提供服务。
min.insync.replicas //表示 ISR 最少的副本数量,通常设置 min.insync.replicas >1,这样才有可用的follower副本执行替换,保证消息不丢失
unclean.leader.election.enable = false //是否可以把非 ISR 集合中的副本选举为 leader 副本。Consumer
作为Kafka的消费者端,只需要确保投递过来的消息能正常消费,并且不会胡乱的提交偏移量就行了。
Kafka消费者会跟踪每个分区的偏移量,消费者每次消费消息时,都会将偏移量向后移动。当消费者宕机或者不可用时,Kafka会将该消费者所消费的分区的偏移量保存下来,下次该消费者重新启动时,可以从上一次的偏移量开始消费消息。
另外,Kafka消费者还可以组成消费者组,每个消费者组可以同时消费多个分区。当一个消费者组中的消费者宕机或者不可用时,其他消费者仍然可以消费该组的分区,保证消息不丢失。
为了保证消息不丢失,建议使用手动提交偏移量的方式,避免拉取了消息以后,业务逻辑没处理完,提交偏移量后但是消费者挂了的问题:
enable.auto.commit=false