Kafka学习笔记
Kafka学习笔记
概念
Kafka是一个分布式的基于发布/订阅模式的消息队列,主要用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
消息队列的两种模式
点对点模式
消费者主动拉取数据,消息收到后清楚消息;一个消息只能被消费一次。
发布/订阅模式
- 可以有多个topic主题(浏览、点赞、收藏、评论等)
- 消费者消费数据后,不删除数据;数据可以被重复消费
- 每个消费者相互独立,都可以消费到数据
Kafka整体架构
- 为了方便扩展,提高吞吐量,一个topic分为多个partition
- 配合分区的概念,提出了消费组的概念,组内每个消费者并行消费;每个消费者只能消费一个分区的数据
- 为了提高可用性,为每个partition增加若干副本
- ZK中记录谁是leader,2.8以后可以不配置ZK
由于数据可能非常多,kafka将topic分为多个partition;一个topic可以存储在多个分区中。
1个topic存储在多个分区中,那么消费者也可以并行消费;为每个分区指定一个消费者消费数据
为了提高分区的可靠性,引入了副本,副本分为leader和follower,不管是生产者还是消费者,都是对leader中的数据进行处理;当leader出现故障,就用follower进行替代。
zooker记录集群中机器上下线的信息,还会记录每个分区中谁是leader副本。
windows下Kafka安装启动
下载地址:kafka官网下载
各个目录的内容:
- bin:启动脚本,里面还有一个windows目录,其中放置的是windows下的启动命令;
- config:配置文件;
- libs:依赖的jar;
- logs:日志;
- site-docs:文档
启动服务
在windows目录下,打开cmd输入下面的命令,启动zookeeper服务
zookeeper-server-start.bat ..\\..\\config\\zookeeper.properties
启动kafka服务
kafka-server-start.bat ..\\..\\config\\server.properties
启动消费者,指定topic的名称为first
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic first --from-beginning
启动生产者,指定topic的名称为first
kafka-console-producer.bat --broker-list localhost:9092 --topic first
Kafka生产者
发送消息原理
在消息发送的过程中,涉及到了两个线程:main线程和sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断地从RecordAccumulator中拉取消息发送到Kafka Broker。
生产者发送消息流程
- 生产者调用send方法发送数据
- 数据经过拦截器处理,一般不调用此拦截器
- 序列化器:将数据序列化
- 分区器:决定将数据发送到哪个分区;实际是分区器是一个双端队列;
- 发送数据需要满足以下两个条件:
- batch.size:数据累计达到batch.size后,sender才会发送数据,默认16k
- linger.ms:如果数据没有达到batch.size,那么sender等待linger.ms设置的时间到了之后就会发送数据
- 集群收到发送的数据后,会进行应答
- 0:生产者发送过来的数据,不需要等待数据落盘应答
- 1:生产者发送过来的数据,Leader收到数据后应答;
- -1:生成者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答,-1和all等价;
- 发送数据失败后会进行重试,默认重试次数是整数的最大值;
生产者的重要参数
数据发送的两种方式
数据发送分为同步发送和异步发送。异步发送分为带回调的异步发送和不带回调的异步发送。
同步发送
异步发送
生产者发送消息分区策略
分区策略指的是消息发送到哪个分区的策略。
Kafka中的分区策略可以通过DefaultPartitioner这个类中的方法进行指定。
- 如果指定了partition,则把数据写入指定的分区
- 没有指定partition的值但是有key,则将key的hash值与topic的partition数进行取余得到partition值
- 既没有指定partition值又没有key值的情况下,kafka采用Sticky Partition(粘性分区器)策略,遂选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机选择一个和上次不一样的分区进行使用
生产环境中,可以使用表名作为key,让指定类型的数据都发送到同一个分区中。
自定义分区
如果有需求的话,可以自定义分区。只要实现Partitioner接口,然后重写partition()方法
生产经验
如何提高吞吐量
数据可靠性
数据可靠性和前面提到的ACK应答级别有关。
当ACK应答级别为0时,生产者发送过来的数据,不需要等到数据持久化到磁盘中就可以进行应答,
问题: 如果Leader发生故障,此时数据没有持久化,而已经进行了应答,那么会丢失数据
当ACK应答级别为1时,生产者发送过来的数据,Leader收到数据后应答;
问题:应答完成后,没有把数据同步到副本,然后Leader挂了,这时候会通过选举选出新的Leader,但是新的Leader中没有数据,生产者也不会重新发送消息,因为已经收到应答了
当应答ACK级别为-1时,生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答;
问题:如果某个Follwer出现故障,导致不能同步数据。
Kafka的解决方案是:如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。这样就不用等长期联系人不上或者已经故障的节点。
ISR队列:Leader维护了一个动态的in-sync replica set (ISR),意为和Leader保持同步的Follower+Leader集合(Leader:0,isr:0, 1, 2)
如果分区副本设置为1个,或者ISR里应答的最小副本数量(min.insync.replicas,默认为1)设置为,和ack=1的效果相同,任然有丢数的风险(leader:0,isr:0)。
数据完全可靠条件=ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
数据可靠性总结
- acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
- acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
- acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follower应答,可靠性高,效率低;
在生产环境中,acks=0很少使用,一般用于传输普通日志,允许丢失个别数据;acks=-1一般用于对可靠性要求比较高的场景,比如钱等;
数据去重
数据传递语义
- 至少一次 = ACK级别设置为-1 + 分区副本数量>=2 + ISR里应答的最小副本数量>=2
- 最多一次 = ACK级别设置0
- 总结
- 至少一次可以保证数据不丢失,但是不能保证数据不重复
- 最多一次可以保证数据不重复,但是不能保证数据不丢失
- 精确一次:对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不能丢失;
数据重复场景
当acks=-1时,生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答;当Leader出现故障,而其它Follower还没收到数据,那么生产者会继续发送重复的数据
数据重复的解决方案:幂等性;开启参数enable.idempotence,默认为true,false为关闭。
幂等性:Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条数据;
精确一次 = 幂等性+至少一次(ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)
重复消息的判断标准:具有PID,Partition,SeqNumber相同主键的消息提交时;Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;SequenceNumber是单调自增的。所以幂等性只能保证在单分区单会话内消息不重复。
Kafka生产者事务
开启事务,必须开启幂等性;
数据乱序
- kafka在1.x版本之前保证数据单分区有序,条件如下:
- max.in.flight.request.per.connection=1,不需要考虑是否开启幂等性;
- kafka在1.x及以后的版本中保证数据单分区有序,条件如下:
- 未开启幂等性:max.in.flight.request.per.connection需要设置为1;
- 开启幂等性:max.in.flight.requests.per.connection需要设置小于等于5,因为在kafka 1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,所以无论如何,都可以保证最近5个request的数据有序
Kafka Broker
broker重要参数
参数名称 | 描述 |
---|---|
replica.lag.time.max.ms | ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。 |
auto.leader.rebalance.enable | 默认是 true。 自动 Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G |
log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引 |
log.retention.hours | Kafka 中数据保存的时间,默认 7 天 |
log.retention.minutes | Kafka 中数据保存的时间,分钟级别,默认关闭 |
log.retention.ms | Kafka 中数据保存的时间,毫秒级别,默认关闭 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是 5 分钟 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。 |
log.cleanup.policy | 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。 |
num.io.threads | 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。 |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的 50%的 1/3 |
num.network.threads | 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最 |
大值,9223372036854775807。一般不建议修改, | |
交给系统自己管理。 | |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建 |
议修改,交给系统自己管理 |
kafka副本
副本基本信息
Leader选举流程
Leader和Follower故障处理
分区副本分配
kafka消费者
Kafka消费方式
- pull模式:consumer采用从Broker中拉取数据。kafka采用这种方式。缺点是如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。
- push模式:由于由broker决定消息发送速率,很难适应所有消费者的消费速率。