> 文章列表 > SpringBoot集成Kafka详解

SpringBoot集成Kafka详解

SpringBoot集成Kafka详解

一、使用idea创建SpringBoot项目

1.1 使用Spring Initializr创建一个SpringBoot程序

SpringBoot集成Kafka详解
点击Next

1.2 添加依赖

SpringBoot集成Kafka详解

依赖说明:

  • Lombok简化实体类开发。

  • Spring Web让项目集成web开发所有依赖,包括Spring MVC,内置tomcat等。

  • Spring for Apache Kafka就是SpringKafka的集成依赖。

配置完成之后点击Finish

1.3 查看pom文件

SpringBoot集成Kafka详解
因为是第一次集成Kafka,这时候只要等待Maven下载好Kafka的相关依赖,下载好后红色会消失。

这样就创建好了一个集成了KafkaSpringBoot Web项目

二、创建生产者

2.1 配置生产者application.yml文件

# 连接Kafka
spring:kafka:bootstrap-servers: localhost:9092# 生产者 key value的序列化方式producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

2.2 创建生产者接口

SpringBoot集成Kafka详解

在controller包下创建生产者,把接口传递进来的数据发送给Kafka

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
@RequestMapping("/producer")
public class ProducerController {@Resourceprivate KafkaTemplate<String, String> kafka;@PostMappingpublic String data(@RequestBody String msg) {// 通过Kafka发出数据kafka.send("test", msg);return "ok";}
}

三、创建消费者

3.1 配置消费者application.yml文件

# 连接Kafka
spring:kafka:bootstrap-servers: localhost:9092# 消费者 key value的反序列化方式consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#指定消费者组的 group_idgroup-id: kafka-test

3.2 创建消费者

import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaConsumer {// 指定要监听的 topic@KafkaListener(topics = "test")public void consumeTopic(String msg) {// 参数: 从topic中收到的 value值System.out.println("收到的信息: " + msg);}}

3.3 说明

由于生产者和消费者都写在这个Demo当中。

所以整体application.yml文件如下(上面这么写只是为了区分和理解):

# 连接Kafka
spring:kafka:bootstrap-servers: 127.0.0.1:9092# 生产者 key value的序列化方式producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者 key value的反序列化方式consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#指定消费者组的 group_idgroup-id: kafka-test

四、测试生产者消费者

4.1 使用ApiPost发送一个Post请求调用生产者接口

SpringBoot集成Kafka详解
发现点击发送之后,控制台迅速返回了一个ok

说明生产者生产数据成功,已经向test这个topic中发送了hello kafka这条数据。

4.2 观察消费者控制台

SpringBoot集成Kafka详解

发现消费者已经接收了来自生产者的数据,并且把数据打印在了控制台上。

以上就是SpringBoot集成Kafka的基本方式。即使是今后复杂的使用,也是从简单转换而来。