SpringBoot整合Kafka(包含Kafka_2.12-3.3.1单节点安装,kafka可视化程序efak v3.0.1安装)
SpringBoot整合Kafka(包含Kafka_2.12-3.3.1单节点安装,kafka可视化程序efka v3.0.1安装)
kafka、efak安装包下载
kafka安装
-
资源下载:
-
下载tgz安装包:http://archive.apache.org/dist/kafka/
//解压
tar -zxvf /home/soft/kafka_2.12-3.3.1.tgz
//更名
mv kafka_2.12-3.3.1/ kafka
-
目录结构:进入 kafka目录后目录结构
-
配置kafka内嵌的zookeeper (位置:vim ./config/zookeeper.properties )注意端口号和存储地址
dataDir=/home/hzh04/Desktop/kafka/data-zookeeper
clientPort=2181
-
启动zookeeper:
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
如果zookeeper需要开启kerberos认证需要给zookeeper.properties添加配置(demo中没有配置):
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
-
修改kafka配置文件(位置:./config/server.properties)关于zookeeper相关配置与zookeeper保持一致:
# *** listeners & zookeeper.connect ***
listeners=PLAINTEXT://0.0.0.0:9092
zookeeper.connect=0.0.0.0:2181
advertised.listeners=PLAINTEXT://ip:9092
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/hzh04/Desktop/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
-
启动kafka
./bin/kafka-server-start.sh ./config/server.properties
-
查看java程序进程:jps
表示已经启动成功了
-
订阅测试(demo中测试主题为:kafka1):
-
消费者进入主题(topic):
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka1 --from-beginning
-
生产者进入主题(topic):
./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka1
表示已经能正常通讯了,kafka单节点搭建成功
如果kafka不能实现topic监听,就试着以下操作:
-
停止kafka服务;
-
停止zookeeper服务;
-
清除kafka配置文件(server.properties)中log.dirs=''路径下的文件,或者换个地址;
-
清除zookeeper配置(kafka原生zookeeper配置文件是:zookeeper.properties)文件中配置的dataDir=''路径下的文件,或者换个地址;
-
重新启动zookeeper、kafka;
efak(kafka管理工具)安装
-
资源下载:
-
下载tgz安装包:http://www.kafka-eagle.org/
-
解压完压缩包配置环境变量:
# 编辑环境变量
vim /etc/profile
export KE_HOME= /opt/software/kafka-eagle
PATH=$PATH:$KE_HOME/bin
# 刷新环境变量:
source /etc/profile
-
修改配置(./conf/system-config.properties):
vim ./conf/system-config.properties
# Multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here
efak.zk.cluster.alias=cluster1
cluster1.zk.list=ip:2181# Add zookeeper acl
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123# Kafka broker nodes online list
cluster1.efak.broker.size=10# Zkcli limit -- Zookeeper cluster allows the number of clients to connect to
# If you enable distributed mode, you can set value to 4 or 8
kafka.zk.limit.size=16# EFAK webui port -- WebConsole port access address
efak.webui.port=8048######################################
# EFAK enable distributed
######################################
efak.distributed.enable=false
# master worknode set status to master, other node set status to slave
efak.cluster.mode.status=slave
# deploy efak server address
efak.worknode.master.host=localhost
efak.worknode.port=8085# Kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option
cluster1.efak.offset.storage=kafka# Whether the Kafka performance monitoring diagram is enabled
efak.metrics.charts=false# EFAK keeps data for 30 days by default
efak.metrics.retain=30# If offset is out of range occurs, enable this property -- Only suitable for kafka sql
efak.sql.fix.error=false
efak.sql.topic.records.max=5000# Delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete
efak.topic.token=keadmin# Kafka sasl authenticate
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
# If not set, the value can be empty
cluster1.efak.sasl.client.id=
# Add kafka cluster cgroups
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=kafka_ads01# Default use sqlite to store data
efak.driver=org.sqlite.JDBC
# It is important to note that the '/hadoop/kafka-eagle/db' path must be exist.
efak.url=jdbc:sqlite:/home/hzh04/Desktop/kafka-eagle/kafka-eagle-web/db/ke.db
efak.username=admin
efak.password=CHENYB%@!# (Optional) set mysql address
#efak.driver=com.mysql.jdbc.Driver
#efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#efak.username=root
#efak.password=smartloli
-
授权:
cd ../bin/
chmod +x ke.sh
-
启动efak
./ke.sh start
-
查看java程序进程 jps
web 访问地址:http://ip:8048/
账号:admin
密码:123456
表示启动成功
springboot整合
-
maven pom
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
-
application.yml核心配置
spring:kafka:bootstrap-servers: ip(kafka节点或者集群):9092# 生产者producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者consumer:group-id: kafka1key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-
主题配置(生产者最好配置指定主题,不要开放权限过大,导致脏数据存留)
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** topic 主题配置*/
@Configuration
public class KafkaTopicConfig {@Beanpublic NewTopic createTopic_kafka1(){return new NewTopic("kafka1",4,(short) -1);}@Beanpublic NewTopic createTopic_kafka_topic_001(){return new NewTopic("kafka_topic_001",4,(short) -1);}@Beanpublic NewTopic createTopic_kafka_89757(){return new NewTopic("kafka_89757",4,(short) -1);}@Beanpublic NewTopic createTopic_plc1(){return new NewTopic("plc1",4,(short) -1);}
}
-
监听者配置
-
@SendTo("kafka_89757") 表示消息转发
import com.hzh.demo.kafka.config.KafkaConnectCondition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Conditional;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {@KafkaListener(groupId = "top_group_1",topics = {"kafka_topic_001","kafka_89757"})public String consumerTopic(ConsumerRecord<String, String> record, Consumer consumer){String msg = record.value();System.out.println("------------------- consumerTopic 收到消息:" + msg + "-------------------");consumer.commitAsync();return "received msg is " + msg;}@KafkaListener(groupId = "top_group_1",topics = "kafka1")@SendTo("kafka_89757")public String consumerTopic2(ConsumerRecord<String, String> record, Consumer consumer){String msg = record.value();System.out.println("------------------- consumerTopic2 收到消息:" + msg + "-------------------");consumer.commitAsync();return "received msg is " + msg;}@KafkaListener(groupId = "top_group_1",topics = "kafka_89757")public String consumerTopic3(ConsumerRecord<String, String> record, Consumer consumer){String msg = record.value();System.out.println("------------------- consumerTopic3 收到消息:" + msg + "-------------------");consumer.commitAsync();return "received msg is " + msg;}@KafkaListener(topics = "plc1")public String consumerTopic_plc1(ConsumerRecord<String, String> record, Consumer consumer){String msg = record.value();System.out.println("------------------- consumerTopic_plc1 收到消息:" + msg + "-------------------");consumer.commitAsync();return "received msg is " + msg;}
}
-
生产者编写
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("kafka")
public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send/{msg}")public String send(@PathVariable("msg") String msg){kafkaTemplate.send("kafka1",msg);return "success";}@GetMapping("/send2/{msg}")public String send2(@PathVariable("msg") String msg) {kafkaTemplate.send("kafka_topic_001", msg);return "success";}
}
-
测试结果
-
增值服务
-
生产者监听
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 消息监控*/
@Component
public class KafkaListenerResult {@Autowiredprivate KafkaTemplate kafkaTemplate;//配置监听@PostConstructprivate void listener() {kafkaTemplate.setProducerListener(new ProducerListener() {//kafka消息监控@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {System.out.println("消息监控 --- success message=" + producerRecord.value());}@Overridepublic void onError(ProducerRecord producerRecord, Exception exception) {System.out.println("消息监控 --- error message={}" + producerRecord.value());}});}
}
-
连接检查
# 需要在生产者或者监听者开启注解,程序运行时会调用
@Conditional(KafkaConnectCondition.class) // 连接检查
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;/*** 检查kafka连接状态*/
public class KafkaConnectCondition implements Condition {@Overridepublic boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {Environment environment = context.getEnvironment();String kafkaServers = environment.getProperty("spring.kafka.bootstrap-servers");System.out.println("获取到的kafkaServers:"+kafkaServers);if (null == kafkaServers || kafkaServers.equals("")){return false;}String serverPort = kafkaServers.split(",")[0];URI uri = URI.create("http://" + serverPort);return this.isConnectable(uri.getHost(), uri.getPort());}/*** 判断kafka服务能否正常连接* @param host* @param port* @return*/private boolean isConnectable(String host, int port) {boolean result = true;Socket socket = new Socket();try {socket.connect(new InetSocketAddress(host, port),3000);} catch (IOException e) {System.out.println("========注意!!!!!未能连接上kafka服务,意味着kafka监听将不开启");result = false;} finally {try {socket.close();} catch (IOException e) {System.out.println("关闭kafka服务socket出错" + e.getMessage());result = false;}}System.out.println("========kafka服务能正常连接========");return result;}
}
-
postman api 测试
整合完成