> 文章列表 > Kafka学习笔记

Kafka学习笔记

Kafka学习笔记

Kafka学习笔记

概念

Kafka是一个分布式的基于发布/订阅模式的消息队列,主要用于大数据实时处理领域。

发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

消息队列的两种模式

点对点模式

消费者主动拉取数据,消息收到后清楚消息;一个消息只能被消费一次。
Kafka学习笔记

发布/订阅模式

  1. 可以有多个topic主题(浏览、点赞、收藏、评论等)
  2. 消费者消费数据后,不删除数据;数据可以被重复消费
  3. 每个消费者相互独立,都可以消费到数据
    Kafka学习笔记

Kafka整体架构

  1. 为了方便扩展,提高吞吐量,一个topic分为多个partition
  2. 配合分区的概念,提出了消费组的概念,组内每个消费者并行消费;每个消费者只能消费一个分区的数据
  3. 为了提高可用性,为每个partition增加若干副本
  4. ZK中记录谁是leader,2.8以后可以不配置ZK

由于数据可能非常多,kafka将topic分为多个partition;一个topic可以存储在多个分区中。
Kafka学习笔记
Kafka学习笔记

1个topic存储在多个分区中,那么消费者也可以并行消费;为每个分区指定一个消费者消费数据
Kafka学习笔记

为了提高分区的可靠性,引入了副本,副本分为leader和follower,不管是生产者还是消费者,都是对leader中的数据进行处理;当leader出现故障,就用follower进行替代。
Kafka学习笔记
zooker记录集群中机器上下线的信息,还会记录每个分区中谁是leader副本。
Kafka学习笔记

windows下Kafka安装启动

下载地址:kafka官网下载Kafka学习笔记
各个目录的内容:

  • bin:启动脚本,里面还有一个windows目录,其中放置的是windows下的启动命令;
  • config:配置文件;
  • libs:依赖的jar;
  • logs:日志;
  • site-docs:文档

启动服务

在windows目录下,打开cmd输入下面的命令,启动zookeeper服务

zookeeper-server-start.bat ..\\..\\config\\zookeeper.properties

启动kafka服务

kafka-server-start.bat ..\\..\\config\\server.properties

启动消费者,指定topic的名称为first

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic first --from-beginning

启动生产者,指定topic的名称为first

kafka-console-producer.bat --broker-list localhost:9092 --topic first

Kafka生产者

发送消息原理

在消息发送的过程中,涉及到了两个线程:main线程和sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断地从RecordAccumulator中拉取消息发送到Kafka Broker。

生产者发送消息流程

  1. 生产者调用send方法发送数据
  2. 数据经过拦截器处理,一般不调用此拦截器
  3. 序列化器:将数据序列化
  4. 分区器:决定将数据发送到哪个分区;实际是分区器是一个双端队列;
  5. 发送数据需要满足以下两个条件:
    • batch.size:数据累计达到batch.size后,sender才会发送数据,默认16k
    • linger.ms:如果数据没有达到batch.size,那么sender等待linger.ms设置的时间到了之后就会发送数据
  6. 集群收到发送的数据后,会进行应答
    • 0:生产者发送过来的数据,不需要等待数据落盘应答
    • 1:生产者发送过来的数据,Leader收到数据后应答;
    • -1:生成者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答,-1和all等价;
  7. 发送数据失败后会进行重试,默认重试次数是整数的最大值;

Kafka学习笔记

生产者的重要参数

数据发送的两种方式

数据发送分为同步发送和异步发送。异步发送分为带回调的异步发送和不带回调的异步发送。

同步发送

异步发送

生产者发送消息分区策略

分区策略指的是消息发送到哪个分区的策略。
Kafka中的分区策略可以通过DefaultPartitioner这个类中的方法进行指定。

  1. 如果指定了partition,则把数据写入指定的分区
  2. 没有指定partition的值但是有key,则将key的hash值与topic的partition数进行取余得到partition值
  3. 既没有指定partition值又没有key值的情况下,kafka采用Sticky Partition(粘性分区器)策略,遂选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机选择一个和上次不一样的分区进行使用

生产环境中,可以使用表名作为key,让指定类型的数据都发送到同一个分区中。

自定义分区

如果有需求的话,可以自定义分区。只要实现Partitioner接口,然后重写partition()方法
Kafka学习笔记

生产经验

如何提高吞吐量

数据可靠性

数据可靠性和前面提到的ACK应答级别有关。

当ACK应答级别为0时,生产者发送过来的数据,不需要等到数据持久化到磁盘中就可以进行应答,
问题: 如果Leader发生故障,此时数据没有持久化,而已经进行了应答,那么会丢失数据
Kafka学习笔记
当ACK应答级别为1时,生产者发送过来的数据,Leader收到数据后应答;
问题:应答完成后,没有把数据同步到副本,然后Leader挂了,这时候会通过选举选出新的Leader,但是新的Leader中没有数据,生产者也不会重新发送消息,因为已经收到应答了
Kafka学习笔记
当应答ACK级别为-1时,生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答;
问题:如果某个Follwer出现故障,导致不能同步数据。
Kafka的解决方案是:如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。这样就不用等长期联系人不上或者已经故障的节点。
Kafka学习笔记
ISR队列:Leader维护了一个动态的in-sync replica set (ISR),意为和Leader保持同步的Follower+Leader集合(Leader:0,isr:0, 1, 2)

如果分区副本设置为1个,或者ISR里应答的最小副本数量(min.insync.replicas,默认为1)设置为,和ack=1的效果相同,任然有丢数的风险(leader:0,isr:0)。
数据完全可靠条件=ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

数据可靠性总结

  • acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
  • acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
  • acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follower应答,可靠性高,效率低;
    在生产环境中,acks=0很少使用,一般用于传输普通日志,允许丢失个别数据;acks=-1一般用于对可靠性要求比较高的场景,比如钱等;

数据去重

数据传递语义
  • 至少一次 = ACK级别设置为-1 + 分区副本数量>=2 + ISR里应答的最小副本数量>=2
  • 最多一次 = ACK级别设置0
  • 总结
    • 至少一次可以保证数据不丢失,但是不能保证数据不重复
    • 最多一次可以保证数据不重复,但是不能保证数据不丢失
  • 精确一次:对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不能丢失;
数据重复场景

当acks=-1时,生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答;当Leader出现故障,而其它Follower还没收到数据,那么生产者会继续发送重复的数据
Kafka学习笔记
数据重复的解决方案:幂等性;开启参数enable.idempotence,默认为true,false为关闭。

幂等性:Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条数据;
精确一次 = 幂等性+至少一次(ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)
重复消息的判断标准:具有PID,Partition,SeqNumber相同主键的消息提交时;Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;SequenceNumber是单调自增的。所以幂等性只能保证在单分区单会话内消息不重复。

Kafka学习笔记

Kafka生产者事务

开启事务,必须开启幂等性;
Kafka学习笔记

数据乱序

  1. kafka在1.x版本之前保证数据单分区有序,条件如下:
    • max.in.flight.request.per.connection=1,不需要考虑是否开启幂等性;
  2. kafka在1.x及以后的版本中保证数据单分区有序,条件如下:
    • 未开启幂等性:max.in.flight.request.per.connection需要设置为1;
    • 开启幂等性:max.in.flight.requests.per.connection需要设置小于等于5,因为在kafka 1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,所以无论如何,都可以保证最近5个request的数据有序
      Kafka学习笔记

Kafka Broker

broker重要参数

参数名称 描述
replica.lag.time.max.ms ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引
log.retention.hours Kafka 中数据保存的时间,默认 7 天
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是 5 分钟
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最
大值,9223372036854775807。一般不建议修改,
交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建
议修改,交给系统自己管理

kafka副本

副本基本信息

Leader选举流程

Leader和Follower故障处理

分区副本分配

kafka消费者

Kafka消费方式

  • pull模式:consumer采用从Broker中拉取数据。kafka采用这种方式。缺点是如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。
  • push模式:由于由broker决定消息发送速率,很难适应所有消费者的消费速率。

消费者工作流程

Kafka学习笔记