> 文章列表 > Kafka Streams基础操作——连接kafka生产者与消费者

Kafka Streams基础操作——连接kafka生产者与消费者

Kafka Streams基础操作——连接kafka生产者与消费者

目录

1.添加依赖

2.创建两个kafka的topic主题

3.IDEA代码实现一个主题到另一个主题之间的桥梁:

4.运行IDEA代码

5.开启生产者并生产消息

6.开启消费者,就能消费消息


1.添加依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.8.0</version></dependency>

2.创建两个kafka的topic主题

[root@lxm147 config]# kafka-topics.sh --create --zookeeper lxm147:2181 --topic mystreamin --partitions 1 --replication-factor 1
Created topic mystreamin.
[root@lxm147 config]# kafka-topics.sh --create --zookeeper lxm147:2181 --topic mystreamout --partitions 1 --replication-factor 1
Created topic mystreamout.

3.IDEA代码实现一个主题到另一个主题之间的桥梁:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;import java.util.Properties;public class MyStreamDemo {public static void main(String[] args) {Properties prop = new Properties();prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "mystream");prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "lxm147:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());// TODO 创建流的构造器StreamsBuilder builder = new StreamsBuilder();// TODO 将mystreamin的数据取出,再放入到mystreamout中KStream<Object, Object> mystreamin = builder.stream("mystreamin");mystreamin.to("mystreamout");Topology topo = builder.build();KafkaStreams kafkaStreams = new KafkaStreams(topo, prop);kafkaStreams.start();}
}

4.运行IDEA代码

5.开启生产者并生产消息

kafka-console-producer.sh --topic mystreamin --broker-list lxm147:9092
>hello
>world

6.开启消费者,就能消费消息

kafka-console-consumer.sh --bootstrap-server lxm147:9092 --topic mystreamout --from-beginning

大数据