> 文章列表 > docker安装kafka,并集成springboot进行测试

docker安装kafka,并集成springboot进行测试

docker安装kafka,并集成springboot进行测试

大家好,今天我们开始学习kafka中间件,今天我们改变一下策略,不刷视频学习,改为实践学习,在网上找一些案例功能去做,来达到学习实践的目的。

首先,是安装相关组件。

1. docker安装安装

1.1 yum-utils软件包

yum install -y yum-utils

1.2 设置阿里云镜像


yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

1.3 安装docker

yum install docker-ce docker-ce-cli containerd.io 

1.4 启动docker

systemctl start docker

1.5 测试

docker version
docker run hello-world
docker images

至此,docker就安装完毕了。接下来就是安装zookeeper和kafka了,我这里用的是kafka2.x的版本,因此需要结合zookeeper去是使用。现在最新的kafka3.x已经可以抛弃zookeeper去单独使用了,小伙伴们有兴趣的话可以自己去动手安装实践下。

2. 安装zookeeper和kafka

2.1 docker安装zookeeper

docker pull wurstmeister/zookeeper

2.2 启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper 

2.3 docker查看zookeeper容器是否启动

docker ps

 出现以上信息,就代表zookeeper已经安装并启动成功。

2.4 安装kafka

docker pull wurstmeister/kafka

2.5 启动kafka

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=124.223.205.125:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://124.223.205.125:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" -e TZ="Asia/Shanghai" wurstmeister/kafka 

2.6 用docker ps查看kafka是否启动

出现以上信息,就代表kafka启动成功了。

下来就测试一下

3. 发送消息和消费消息

3.1 进入kafka容器

docker exec -it 容器id /bin/bashcd /opt/kafka_2.13-2.8.1/bin/

 3.2 连接生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic shopping

接下来就可以发送消息了。

 3.3 另起一个窗口,重复3.1的动作进入kafka容器,然后连接消费者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shopping --from-beginning

这是就能就收消息了。

 到达这里,我们的kafka就安装并测试成功了。

4. 接下来我们就创建Springboot工程来连接kafka进行消息的生产和消费

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.volga</groupId><artifactId>kafka</artifactId><version>0.0.1-SNAPSHOT</version><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><!-- 阿里巴巴 fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

4.2 我们创建一个订单的实体类

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {/*** 订单id*/private long orderId;/*** 订单号*/private String orderNum;/*** 订单创建时间*/private LocalDateTime createTime;
}

4.3 创建生产者

@Component
@Slf4j
public class KafkaProvider {/*** 消息 TOPIC*/private static final String TOPIC = "shopping";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {// 构建一个订单类Order order = Order.builder().orderId(orderId).orderNum(orderNum).createTime(createTime).build();// 发送消息,订单类的 json 作为消息体ListenableFuture<SendResult<String, String>> future =kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {log.info("生产者产生消息 失败 ## Send message fail ...");}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("生产者产生消息 成功 ## Send message success ...");}});}
}

4.4 创建消费者

@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "shopping", groupId = "group_id") //这个groupId是在yml中配置的public void consumer(String message) {log.info("消费者消费信息 ## consumer message: {}", message);}
}

4.5 创建测试类

@SpringBootTest
public class SpringBootKafakaApplicationTests {@Autowiredprivate KafkaProvider kafkaProvider;@Testpublic void sendMessage() throws InterruptedException {System.out.println("是否为空??+"+kafkaProvider);// 发送 10 个消息for (int i = 0; i < 10; i++) {long orderId = i+1;String orderNum = UUID.randomUUID().toString();kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());}TimeUnit.MINUTES.sleep(1);}
}

4.6 要创建一个Application方法,不然项目会启动报错

@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class,args);}
}

4.7 配置application.yml

spring:kafka:# 指定 kafka 地址,我这里部署在的虚拟机,开发环境是Windows,kafkahost是虚拟机的地址, 若外网地址,注意修改为外网的IP( 集群部署需用逗号分隔)bootstrap-servers: 服务器ip:9092consumer:# 指定 group_idgroup-id: group_idauto-offset-reset: earliest# 指定消息key和消息体的序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 指定消息key和消息体的序列化方式key-deserializer: org.apache.kafka.common.serialization.StringSerializervalue-deserializer: org.apache.kafka.common.serialization.StringSerializerlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

以上就创建项目成功了,我们运行测试方法,就能获取kafka中的消息了。

### 生产消息

 ### 消费消息

这里就是简单实现了kafka的消息生产和消费,后续的kafka复杂场景的实现会持续更新。

我是空谷有来人,谢谢支持。