> 文章列表 > Kafka系统整理 一

Kafka系统整理 一

Kafka系统整理 一

一、Kafka 概述

1.1 定义

Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列 (Message Queue), 主要应用于大数据实时处理领域。

kafka最新定义:kafka是一个开源的分布式事件流平台(Event Streaming Platform), 被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

1.2 消息队列

目 前企 业中比 较常 见的 消息 队列产 品主 要有 KafkaActiveMQ RabbitMQ
RocketMQ 等。
在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ
RabbitMQRocketMQ

1.3 应用场景

主要应用场景包括:缓存/消峰解耦异步通信。
1. 缓冲/消峰:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
2. 解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
3. 异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们

1.4 消息队列的两种模式

 1.5 Kafka 基础架构

 

1Producer消息生产者,就是向 Kafka broker 发消息的客户端。
2Consumer消息消费者,向 Kafka broker 取消息的客户端。
3Consumer GroupCG):消费者组,由多个 consumer 组成。消费者组内每个消
费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不
影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
4Broker一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个
broker 可以容纳多个 topic
5Topic可以理解为一个队列,生产者和消费者面向的都是一个 topic
6Partition为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服
务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列
7Replica副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个
Follower
8Leader每个分区多个副本的,生产者发送数据的对象,以及消费者消费数
据的对象都是 Leader
9Follower每个分区多个副本中的,实时从 Leader 中同步数据,保持和
Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader

 

 二、Kafka 快速入门

先部署zookeeper

 1. 集群规划

 2. 集群部署

0官方下载地址:http://kafka.apache.org/downloads.html
1解压安装包
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C
/opt/module/

2修改解压后的文件名称

[atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0/ kafka
3进入到/opt/module/kafka 目录,修改配置文件
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties
输入以下内容:
#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/ka
fka
4分发安装包

[atguigu@hadoop102 module]$ xsync kafka/
5分别在 hadoop103 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties
中的 broker.id=1broker.id=2
注:broker.id 不得重复,整个集群中唯一。
6配置环境变量
1)在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
[atguigu@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh

增加如下内容:

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
(2)刷新一下环境变量。
[atguigu@hadoop102 module]$ source /etc/profile
(3)分发环境变量文件到其他节点,并 source
[atguigu@hadoop102 module]$
sudo /home/atguigu/bin/xsync
/etc/profile.d/my_env.sh
[atguigu@hadoop103 module]$ source /etc/profile
[atguigu@hadoop104 module]$ source /etc/profile

7启动集群
1)先启动 Zookeeper 集群,然后启动 Kafka
[atguigu@hadoop102 kafka]$ zk.sh start
(2)依次在 hadoop102hadoop103hadoop104 节点上启动 Kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon
config/server.properties
8关闭集群
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh

也可以写执行脚本

#! /bin/bash
case $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -
daemon /opt/module/kafka/config/server.properties"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "done
};;
esac
2)添加执行权限
[atguigu@hadoop102 bin]$ chmod +x kf.sh
3)启动集群命令
[atguigu@hadoop102 ~]$ kf.sh start
4)停止集群命令
[atguigu@hadoop102 ~]$ kf.sh stop
注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,
Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了(kill -9 xxx)。

三、主要命令记录 

1 主题命令

1)查看操作主题命令参数
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh
2)查看当前服务器中的所有 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --list
3)创建 first topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
选项说明:
--topic 定义 topic
--replication-factor 定义副本数
--partitions 定义分区数
4)查看 first 主题的详情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --describe --topic first
5)修改分区数(注意:分区数只能增加,不能减少)
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --alter --topic first --partitions 3
7)删除 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --delete --topic first

2 生产者命令

 1)查看操作生产者命令参数

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh
2)发送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --
bootstrap-server hadoop102:9092 --topic first
>hello 
>world

3 消费者命令

1)查看操作消费者命令参数
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh
2)消费消息
(1)消费 first 主题中的数据。
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --topic first
(2)把主题中所有的数据都读取出来(包括历史数据)。
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --
bootstrap-server hadoop102:9092 --from-beginning --topic first

588库资源