> 文章列表 > Kafka3.0.0版本——生产者自定义分区器

Kafka3.0.0版本——生产者自定义分区器

Kafka3.0.0版本——生产者自定义分区器

目录

    • 一、生产者自定义分区器代码示例
      • 1.1、自定义分区器类
      • 1.2、生产者发送消息代码(生产者的配置中添加分区器参数)
      • 1.3、测试

一、生产者自定义分区器代码示例

1.1、自定义分区器类

  • 代码

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import java.util.Map;
    /* 1. 实现接口 Partitioner* 2. 实现 3 个方法:partition,close,configure* 3. 编写 partition 方法,返回分区号*/
    public class MyPartitioner implements Partitioner {/* 返回信息对应的分区* @param topic 主题* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字节数组* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValues = value.toString();int partition;// 判断消息是否包含 helloif (msgValues.contains("hello")){partition = 0;}else {partition = 1;}// 返回分区号return partition;}/* 关闭资源*/@Overridepublic void close() {}/* 配置方法*/@Overridepublic void configure(Map<String, ?> configs) {}
    }

1.2、生产者发送消息代码(生产者的配置中添加分区器参数)

  • 代码

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/* @author: xz* @since: 2023/4/9 20:56* @description: 使用自定义的分区器方法,在生产者的配置中添加分区器参数。*/
    public class CustomProducerMyPartitioner {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);//5、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//6、调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}//7、关闭资源kafkaProducer.close();}
    }
    

1.3、测试

  • 在 kafka集群上开启 Kafka 消费者

    [root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.28:9092 --topic news
    
  • 启动main方法,在 IDEA 控制台观察回调信息,发送消息内容包含hello,则发送到0号分区,如下图:
    在这里插入图片描述

  • 发送消息内容不包含hello,则发送到1号分区,如下图:

    在这里插入图片描述