> 文章列表 > 【基础】Kafka -- 主题与分区

【基础】Kafka -- 主题与分区

【基础】Kafka -- 主题与分区

Kafka -- 主题分区

  • 主题的管理
    • 创建主题
      • 简单创建与查看
      • 指定分区副本分配创建
      • 指定参数创建
    • 查看主题
      • 主题的简单查看
      • 带附加功能的查看
    • 修改主题
      • 修改分区
      • 修改配置
    • 删除主题
  • 主题配置管理
    • 配置查看与变更
      • 配置查看
      • 配置变更
    • 主题端参数
  • KafkaAdminClient 主题管理
    • 基本使用
      • 创建主题
      • 查看主题
      • 删除主题
      • 修改主题
    • 主题合法性校验
  • 分区的管理
    • 优先副本选举
    • 分区重新分配
    • 修改副本因子

主题的管理

主题管理包括创建主题、查看主题消息、修改主题以及删除主题等操作,Kafka 提供的 kafka-topics.sh 脚本来执行这些操作,脚本位于$KAFKA_HOME/bin/目录下,该脚本实际上是调用了 kafka.admin.TopicCommand 类来执行主题管理的操作。

创建主题

简单创建与查看

若 broker 端的配置参数auto.create.topics.enable设置为 true,那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为 1)、副本因子为default.replication.factor(默认值为 1)的主题。同理,当一个消费者开始从未知主题中读取消息或任意客户端向未知主题发送元数据时也会按照上述参数创建相应的主题。

更加通用的方式是使用 kafka-topics.sh 来创建主题,下列代码创建了一个分区数为 4、副本因子为 2 的主题 topic-create:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-create --partitions 4 --replication-factor 2
Created topic topic-create-diff.

命令中参数的含义如下:

  • --bootstrap-server:zookeeper 连接地址;

  • --create:表示创建主题的指令;

    • --topic:主题名称;

    • --partitions:主题分区数;

    • --replication-factor:分区副本数;

脚本执行成功护,Kafka 会在 log.dir 或 log.dirs 配置参数所指定的目录下创建相应的主题分区,该目录默认为 /tmp/kafka-logs/。在不同的节点分别执行下列命令来查看当前节点创建的主题分区:

# master节点
[root@master kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x   2 root root 167 417 17:00 topic-create-0
drwxr-xr-x   2 root root 167 417 17:00 topic-create-2
# node01节点
[root@node01 kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x   2 root root 167 418 08:40 topic-create-1
drwxr-xr-x   2 root root 167 417 16:44 topic-create-2
drwxr-xr-x   2 root root 167 417 16:44 topic-create-3
# node02节点
[root@node02 kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x   2 root root 167 418 08:40 topic-create-0
drwxr-xr-x   2 root root 167 417 17:01 topic-create-1
drwxr-xr-x   2 root root 167 417 17:01 topic-create-3

可以看到,topic-create 主题存在 0、1、2、3 四个分区,每个分区存在两个副本。主题、分区、副本和 log 日志的关系如下图所示,其中,主题与分区都是提供给用户的抽象,而副本层和 log 层才是实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 当中才能提供有效的数据冗余。

在这里插入图片描述

除上述方法外,还可以使用 kafka-topics.sh 的--describe指令来查看分区副本的分配细节:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create
Topic: topic-create     TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4       ReplicationFactor: 2    Configs:Topic: topic-create     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2Topic: topic-create     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 2,1Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1

指定分区副本分配创建

上述的方法在进行主题的创建时,其分区副本是按照内部既定的逻辑来进行分配的。kafka-topics.sh 脚本还提供了--replica-assignment参数来手动指定分区副本的分配方案:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-create-diff --replica-assignment 0:1,1:2,2:0,2:1
Created topic topic-create-diff.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create-diff
Topic: topic-create-diff        TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4       ReplicationFactor: 2    Configs:Topic: topic-create-diff        Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1Topic: topic-create-diff        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2Topic: topic-create-diff        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0Topic: topic-create-diff        Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1

这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区与分区之间用英文逗号“,”隔开,分区内多个副本用英文冒号“:”隔开。

使用这种方法需要注意以下几点:

  • 同一个分区内的副本不能有重复,如0:0,1:1这种将会提示 AdminCommandFailedException 异常;

  • 各分区所指定的副本数应相同,如0:1,1,2,1:2这种将会提示 AdminOperationException 异常;

  • 不允许跳过分区进行分配,如0:1,,1:2,2:0这种将会提示 NumberFormatException 异常;

指定参数创建

下列命令使用--config参数来创建一个主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-config --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config max.message.bytes=10000
Created topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

示例中设置了 cleanup.policy 参数为 compact,以及 max.message.bytes 参数为 10000,这两个参数都是主题端的配置。

查看主题

主题的简单查看

kafka-topics.sh 脚本提供了--list--describe指令来方便的查看主题信息。

使用--list可以查看当前所有的可用主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --list
topic-config
topic-create
topic-create-diff

使用--describe可以查看单个或者多个主题的详细信息,若不适用--topic指定主题则显示所有主题的详细信息:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe
Topic: topic-create-diff        TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4       ReplicationFactor: 2    Configs:Topic: topic-create-diff        Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1Topic: topic-create-diff        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2Topic: topic-create-diff        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0Topic: topic-create-diff        Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Topic: topic-create     TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4       ReplicationFactor: 2    Configs:Topic: topic-create     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2Topic: topic-create     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 2,1Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create,topic-create-diff
Topic: topic-create-diff        TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4       ReplicationFactor: 2    Configs:Topic: topic-create-diff        Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1Topic: topic-create-diff        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2Topic: topic-create-diff        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0Topic: topic-create-diff        Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: topic-create     TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4       ReplicationFactor: 2    Configs:Topic: topic-create     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2Topic: topic-create     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 2,1Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1

带附加功能的查看

在使用--describe指令时可以额外指定--topics-with-overridesunder-replicated-partitions以及--unavailable-partitions参数来实现一些附加功能。

topics-with-overrides

使用该参数可以查看所有包含覆盖配置的主题,它只会列出包含了与集群不同配置的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topics-with-overrides
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000

under-replicated-partitions

使用该参数可以找出所有包含失效副本的分区,手动停掉 id=1 的 kafka broker,进行查询:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --under-replicated-partitionsTopic: topic-create     Partition: 1    Leader: 2       Replicas: 1,2   Isr: 2Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2

重新启动 id=1 的 broker,再次查询,不显示任何结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --under-replicated-partitions
[root@master kafka_2.12-3.4.0]#

unavailable-partitions

使用该参数可以查询主题中没有 leader 副本的分区,这些分区已处于离线状态,对于外界的生产者和消费者来说处于不可用的状态。手动停掉 id=1 和 id=2 的 kafka broker,进行查询:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --unavailable-partitionsTopic: topic-create     Partition: 1    Leader: none    Replicas: 1,2   Isr: 1Topic: topic-create     Partition: 3    Leader: none    Replicas: 2,1   Isr: 1

重新启动 id=1 和 id=2 的 broker,再次查询,不显示任何结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --unavailable-partitions
[root@master kafka_2.12-3.4.0]#

修改主题

kafka-topics.sh 提供了--alert指令用于修改主题的分区数、修改配置等。

修改分区

下列指令可以修改主题的分区数:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --alter --topic topic-config --partitions 3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 3       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0Topic: topic-config     Partition: 1    Leader: 1       Replicas: 1     Isr: 1Topic: topic-config     Partition: 2    Leader: 2       Replicas: 2     Isr: 2

Kafka 只支持增加分区数而不支持减少分区数,如下:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --alter --topic topic-config --partitions 1
Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 1.
[2023-04-18 16:06:33,274] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 1.(kafka.admin.TopicCommand$)

修改配置

在当前最新的 kafka 3.4.0 版本中已经不允许使用 kafka-topics.sh 脚本来变更主题的配置了,新版本推荐使用 kafka-configs.sh 脚本实现相关的功能。

删除主题

如果一个主题确定不会再使用,那么最好将其删除释放一些资源。kafka-topics.sh 脚本提供了--delete指令用于主题的删除:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-delete --partitions 1 --replication-factor 1
Created topic topic-delete.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --delete --topic topic-delete

主题的删除与 broker 端配置参数delete.topic.enable有关,该参数配置为 true 时才能够删除主题,默认值为 true。

在删除主题时,若删除的是 Kafka 内部的主题或删除的主题不存在,则会报错。

主题配置管理

kafka-configs.sh 脚本用于对配置进行管理和操作,其可以实现相关配置在运行状态下的动态变更。该脚本包含变更配置指令--alter以及查看配置指令--describe两种类型,增删改操作都可以看作是对配置的变更。该脚本不仅支持操作主题的配置,其同样支持操作 broker、用户、客户端的配置。

配置查看与变更

配置查看

使用 kafka-configs.sh 脚本查看主题配置的命令如下:

[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}max.message.bytes=10000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10000, DEFAULT_CONFIG:message.max.bytes=1048588}

其中:

  • --entity-type指定要查看配置的实体类型;

  • --entity-config指定要查看配置的实体名称;

上述两个参数的对应关系如下:

entity-type entity-name
主题类型,取值为 topics 指定主题名称
broker 类型,取值为 brokers 指定 brokerId
客户端类型,取值为 clients 指定 clientId
用户类型,取值为 users 指定用户名

配置变更

使用 kafka-configs.sh 脚本实现配置的变更时,需要将--alter指令与add-config以及delete-config参数一起使用,前者用于实现配置的增、改操作,后者用于实现配置的删除。

add-config

下列命令对主题原有配置进行了覆盖,若需要修改多个参数,则将多个参数使用英文逗号","隔开即可:

[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --alter --entity-type topics --entity-name topic-config --add-config max.message.bytes=20000
Completed updating config for topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}max.message.bytes=20000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=20000, DEFAULT_CONFIG:message.max.bytes=1048588}

delete-config

下列命令对主题新增的配置进行了删除,还原为默认配置:

[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --alter --entity-type topics --entity-name topic-config --delete-config max.message.bytes,cleanup.policy
Completed updating config for topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:

主题端参数

主题相关的所有配置参数在 broker 层面都有对应的参数,若没有指定或修改主题的任何配置参数,那么就使用 broker 端对应的参数作为默认值。

与主题相关的参数有很多,在进行配置时自行搜索查看即可。所有配置的查看与修改方法都与上述介绍的方法相同。

KafkaAdminClient 主题管理

一般情况下,我们使用 kafka-topics.sh 脚本来管理主题。但是在某些场景下,我们需要将主题管理类的功能集成到公司内部的系统当中进行统一管理,那么就需要使用程序调用 API 的方式去实现。KafkaAdminClient 类便提供了这些基本功能。

基本使用

创建主题

下列代码展示了如何使用 KafkaAdminClient 类创建一个分区数为 4、副本因子为 1 的主题 topic-admin:

public class TestDemo {public static void main(String[] args) {// 定义服务地址以及主题名称String brokerList = "192.168.86.133:9092";String topic = "topic-admin";// 配置参数Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);// 获取 AdminClient 对象AdminClient adminClient = AdminClient.create(properties);// 定义主题创建的信息NewTopic newTopic = new NewTopic(topic, 4, (short) 1);// 执行主题创建并获取执行结果CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));try {result.all().get();} catch (Exception exception) {exception.printStackTrace();}// 关闭 AdminClient 对象实例adminClient.close();}
}

查询新创建的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin
Topic: topic-admin      TopicId: 8mKJR8KBQKuUOXXJCFS1SQ PartitionCount: 4       ReplicationFactor: 1    Configs:Topic: topic-admin      Partition: 0    Leader: 1       Replicas: 1     Isr: 1Topic: topic-admin      Partition: 1    Leader: 0       Replicas: 0     Isr: 0Topic: topic-admin      Partition: 2    Leader: 2       Replicas: 2     Isr: 2Topic: topic-admin      Partition: 3    Leader: 1       Replicas: 1     Isr: 1

其中,AdminClient 对象的获取调用的是create()方法,其定义如下:

public static AdminClient create(Properties props) {return KafkaAdminClient.createInternal(new AdminClientConfig(props), (TimeoutProcessorFactory)null);
}

在进行主题信息的定义时,需要创建一个 NewTopic 对象,其包含下述属性:

public class NewTopic {// 主题名称private final String name;// 分区数private final int numPartitions;// 副本因子private final short replicationFactor;// 分配方案private final Map<Integer, List<Integer>> replicasAssignments;// 配置private Map<String, String> configs = null;...
}

指定分区副本分配

若想指定分区副本的分配方案来创建一个主题,可以将代码中“定义主题创建的信息”处的代码替换为下述代码:

// 指定分区副本分配方案
Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
replicasAssignments.put(0, List.of(1));
replicasAssignments.put(1, List.of(0));
replicasAssignments.put(2, List.of(1));
replicasAssignments.put(3, List.of(0));
// 定义主题创建的信息
NewTopic newTopic = new NewTopic(topic, replicasAssignments); // topic:"topic-admin-test1"

查询新创建的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin-test1
Topic: topic-admin-test1        TopicId: pptWSwHbT8m4W_dJBEoWkg PartitionCount: 4       ReplicationFactor: 1    Configs:Topic: topic-admin-test1        Partition: 0    Leader: 1       Replicas: 1     Isr: 1Topic: topic-admin-test1        Partition: 1    Leader: 0       Replicas: 0     Isr: 0Topic: topic-admin-test1        Partition: 2    Leader: 1       Replicas: 1     Isr: 1Topic: topic-admin-test1        Partition: 3    Leader: 0       Replicas: 0     Isr: 0

指定主题配置参数

若想在创建主题时指定需要覆盖的配置,则需要在 NewTopic 对象中传入 configs 配置集,如下所示:

// 指定主题配置
Map<String, String> configs = new HashMap<>();
configs.put("cleanup.policy", "compact");
configs.put("max.message.bytes", "25000");
NewTopic newTopic = new NewTopic(topic, 4, (short) 1); // topic:"topic-admin-test2"
newTopic.configs(configs);

查看新创建的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin-test2
Topic: topic-admin-test2        TopicId: IFegwoC6Tp-UwlnfwkrlmA PartitionCount: 4       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=25000Topic: topic-admin-test2        Partition: 0    Leader: 0       Replicas: 0     Isr: 0Topic: topic-admin-test2        Partition: 1    Leader: 2       Replicas: 2     Isr: 2Topic: topic-admin-test2        Partition: 2    Leader: 1       Replicas: 1     Isr: 1Topic: topic-admin-test2        Partition: 3    Leader: 0       Replicas: 0     Isr: 0

在使用 AdminClient 之后要调用close()方法来释放资源。

查看主题

查看主题可以调用listTopics()或者describeTopics()方法实现。

下列代码使用listTopics()方法查看主题:

public class TestDemo {public static void main(String[] args) {// 定义服务地址以及主题名称String brokerList = "192.168.86.133:9092";// 配置参数Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);// 获取 AdminClient 对象AdminClient adminClient = AdminClient.create(properties);// 查询主题并获取查询结果ListTopicsResult listTopicsResult = adminClient.listTopics();try {listTopicsResult.names().get().forEach(System.out::println);} catch (Exception exception) {exception.printStackTrace();}// 关闭 AdminClient 对象实例adminClient.close();}
}

查询结果如下:

topic-admin-test1
topic-admin-test2
topic-create
topic-config
topic-create-diff
topic-admin

下列代码使用describeTopics()方法查看主题:

public class TestDemo {public static void main(String[] args) {// 定义服务地址以及主题名称String brokerList = "192.168.86.133:9092";// 配置参数Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);// 获取 AdminClient 对象AdminClient adminClient = AdminClient.create(properties);// 查询主题并获取查询结果DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList("topic-admin-test1", "topic-admin-test2"));try {describeTopicsResult.all().get().forEach((key, value) -> {System.out.println(key + "||" + value);});} catch (Exception exception) {exception.printStackTrace();}// 关闭 AdminClient 对象实例adminClient.close();}
}

查询结果如下:

topic-admin-test1||(name=topic-admin-test1, internal=false, partitions=(partition=0, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=1, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)),(partition=2, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=3, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)))
topic-admin-test2||(name=topic-admin-test2, internal=false, partitions=(partition=0, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)),(partition=1, leader=node02:9092 (id: 2 rack: null), replicas=node02:9092 (id: 2 rack: null), isr=node02:9092 (id: 2 rack: null)),(partition=2, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=3, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)))

删除主题

调用deleteTopics()方法即可实现主题的删除:

public class TestDemo {public static void main(String[] args) {// 定义服务地址以及主题名称String brokerList = "192.168.86.133:9092";// 配置参数Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);// 获取 AdminClient 对象AdminClient adminClient = AdminClient.create(properties);// 删除主题DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("topic-admin-test1", "topic-admin-test2"));try {deleteTopicsResult.all().get();} catch (Exception exception) {exception.printStackTrace();}// 关闭 AdminClient 对象实例adminClient.close();}
}

删除后执行主题查询,结果如下:

topic-create
topic-config
topic-create-diff
topic-admin

修改主题

使用describeConfigs()方法可以查看主题的具体配置信息,具体使用方法如下:

public class TestDemo {public static void main(String[] args) {// 定义服务地址以及主题名称String brokerList = "192.168.86.133:9092";// 配置参数Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);// 获取 AdminClient 对象AdminClient adminClient = AdminClient.create(properties);// 配置要查询的实体类型和实体名称ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");// 执行查询并获取结果DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(List.of(configResource));try {Config config = describeConfigsResult.all().get().get(configResource);System.out.println("==========" + configResource.name() + "==========");config.entries().forEach(System.out::println);} catch (Exception exception) {exception.printStackTrace();}// 关闭 AdminClient 对象实例adminClient.close();}
}

返回结果如下,该方法会列出主题当中所有的配置信息:

==========topic-admin==========
ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.bytes, value=1073741824, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.format.version, value=3.0-IV1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=max.message.bytes, value=1048588, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])

使用alterConfigs()方法对主题的配置进行修改的方法如下:

public class TestDemo {public static void main(String[] args) {// 定义服务地址以及主题名称String brokerList = "192.168.86.133:9092";// 配置参数Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);// 获取 AdminClient 对象AdminClient adminClient = AdminClient.create(properties);// 配置要查询的实体类型和实体名称ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");// 配置要更改的配置参数ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");Config configs = new Config(List.of(entry));Map<ConfigResource, Config> configMap = new HashMap<>();configMap.put(configResource, configs);// 执行配置更改并获取结果AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);try {alterConfigsResult.all().get();} catch (Exception exception) {exception.printStackTrace();}// 关闭 AdminClient 对象实例adminClient.close();}
}

修改完成后再次执行查询,可以看到配置已经被成功修改:

==========topic-admin==========
...
ConfigEntry(name=cleanup.policy, value=compact, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
...

KafkaAdminClient 还提供了createPartitions()方法用于增加某个主题的分区,其基本使用方法如下:

public class TestDemo {public static void main(String[] args) {// 定义服务地址以及主题名称String brokerList = "192.168.86.133:9092";// 配置参数Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);// 获取 AdminClient 对象AdminClient adminClient = AdminClient.create(properties);// 配置要查询的实体类型和实体名称ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");// 配置要更改的分区NewPartitions newPartitions = NewPartitions.increaseTo(5);Map<String, NewPartitions> newPartitionsMap = new HashMap<>();newPartitionsMap.put("topic-admin", newPartitions);// 执行增加分区数的操作并获取结果CreatePartitionsResult partitionsResult = adminClient.createPartitions(newPartitionsMap);try {partitionsResult.all().get();} catch (Exception exception) {exception.printStackTrace();}      // 关闭 AdminClient 对象实例adminClient.close();}
}

直接在服务器上调用 kafka-topic.sh 脚本查看主题分区的增加结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin
Topic: topic-admin      TopicId: 8mKJR8KBQKuUOXXJCFS1SQ PartitionCount: 5       ReplicationFactor: 1    Configs: cleanup.policy=compactTopic: topic-admin      Partition: 0    Leader: 1       Replicas: 1     Isr: 1Topic: topic-admin      Partition: 1    Leader: 0       Replicas: 0     Isr: 0Topic: topic-admin      Partition: 2    Leader: 2       Replicas: 2     Isr: 2Topic: topic-admin      Partition: 3    Leader: 1       Replicas: 1     Isr: 1Topic: topic-admin      Partition: 4    Leader: 2       Replicas: 2     Isr: 2

主题合法性校验

一般在生产环境下,Kafka 的auto.create.topics.enable参数将被设置为 false,集不允许自动创建主题。主题的创建一般由运维人员通过 kafka-topics.sh 脚本创建,若普通用户想要通过 KafkaAdminClient 提供的方法创建主题,建议在代码中对主题的命名、分区数、分区副本数进行校验,对不符合标准的申请进行过滤。

分区的管理

优先副本选举

分区采用多副本的机制以提升可靠性,但是只有 leader 副本对外提供读写服务,follower 副本只负责在内部进行消息的同步。若一个分区的 leader 副本不可用,则意味着整个分区不可用,此时就需要从 follower 副本中挑选一个作为新的 leader 继续对外提供服务。

在进行主题的创建时,主题的分区以及副本会尽可能均匀的分不到 Kafka 集群的各个 broker 节点上,leader 副本的分配也比较均匀。如下所示,创建一个分区数、分区副本数均为 3 的主题 topic-partition 并观察其分区副本的分布:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-partition --partitions 3 --replication-factor 3
Created topic topic-partition.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition  TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3       ReplicationFactor: 3    Configs:Topic: topic-partition  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1Topic: topic-partition  Partition: 1    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0Topic: topic-partition  Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到,leader 副本均匀的分布在 id 为 0、2、1 的 broker 节点当中。此时,若某个分区 leader 副本所在的 broker 节点宕机,该分区的一个 follower 节点就会成为新的 leader 节点。当之前的 leader 节点恢复并重新加入集群后,其只能作为一个新的 follower 节点,不在对外提供服务。

我们重启 brokerId=2 的节点,观察分区副本的分布如下:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition  TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3       ReplicationFactor: 3    Configs:Topic: topic-partition  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2Topic: topic-partition  Partition: 1    Leader: 1       Replicas: 2,1,0 Isr: 1,0,2Topic: topic-partition  Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到分区 1 的 leader 节点从 2 变为了 1,而分区 2 的 leader 节点也为 1,这就导致了原本均衡的负载状态被打破,节点 1 的负载最高。

为处理上述负载失衡的情况,Kafka 引入了优先副本 preferred replica 的概念。优先副本即 AR 集合中的第一个副本,即查询主题信息时其中的 Replicas 集合。如上述查询结果所示,分区 1 的 AR 集合为【2,1,0】,其中第一个副本为 2,即分区 1 的优先副本为 2。

基于优先副本的概念,Kafka 提供了分区自动平衡的功能,该功能对应的 broker 端参数为auto.leader.rebalance.enable(默认值为 true)。自动平衡功能开启时,Kafka 会启动一个定时任务,该任务会轮询所有的 broker 节点,计算每个 broker 节点的分区不平衡率(=非优先副本的 leader 数量/分区总数)。若该值超过了leader.imbalance.per.broker.percentage(默认值为 10%)参数所配置的比率,就会自动执行优先副本选举的动作对分区进行平衡。自动平衡分区动作的执行周期由参数leader.imbalance.check.interval.seconds(默认值 300s)控制。

经过一段时间后再次查询分区副本分布:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition  TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3       ReplicationFactor: 3    Configs:Topic: topic-partition  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2Topic: topic-partition  Partition: 1    Leader: 2       Replicas: 2,1,0 Isr: 1,0,2Topic: topic-partition  Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到分区 1 的 leader 节点自动平衡为 2。

分区重新分配

在日常的生产工作中,可能会碰到以下的场景:

  • 某节点宕机后,该节点上的分区副本将处于失效的状态,Kafka 并不会将失效的分区副本自动的迁移到集群中可用的 broker 节点上;

  • 当需要对集群中的某个节点进行有计划的下线时,需要通过某种方式将该节点上的副本迁移到其他可用节点上;

  • 当集群中增加了新的节点,只有新创建的主题分区才有可能被分配到这个节点上,已经存在的主题需要某种重分配的方式使分区分配更加合理;

为解决上述问题,Kafka 提供了 kafka-reassign-partitions.sh 脚本来执行分区的重分配。创建一个分区数为 4 分区副本因子为 2 的主题 topic-assign 用于测试:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-assign --partitions 4 --replication-factor 2
Created topic topic-assign.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign
Topic: topic-assign     TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4       ReplicationFactor: 2    Configs:Topic: topic-assign     Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0Topic: topic-assign     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2Topic: topic-assign     Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1Topic: topic-assign     Partition: 3    Leader: 1       Replicas: 1,2   Isr: 1,2

此时,如果我们想将 brokerId=1 的节点下线,首先需要将该节点上的分区副本迁移出去,整个迁移的操作如下所示:

首先创建一个 json 文件,文件内容包含要进行分区重分配的主题清单:

{   "topics":[{"topic":"topic-assign"    }],"version":1
}

然后根据该文件内容,指定索要分配的 broker 的节点列表来生成一份候选的重分配方案:

[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --generate --topics-to-move-json-file /root/reassignment.json --broker-list 0,2
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[1,2],"log_dirs":["any","any"]}]}Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[0,2],"log_dirs":["any","any"]}]}

返回结果中包含两部分内容:Current partition replica assignment 后的内容展示的是当前分区副本的分配情况;Proposed partition reassignment configuration 后的内容展示的是重分配的候选方案。

接下来将提供的候选方案保存在一个 json 文件当中,执行下列命令进行分区重分配,并查看结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --execute --reassignment-json-file /root/assignment.json
Current partition replica assignment{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[1,2],"log_dirs":["any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic-assign-0,topic-assign-1,topic-assign-2,topic-assign-3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign                                                            Topic: topic-assign     TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4       ReplicationFactor: 2    Configs:Topic: topic-assign     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2Topic: topic-assign     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2Topic: topic-assign     Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0Topic: topic-assign     Partition: 3    Leader: 0       Replicas: 0,2   Isr: 2,0

可以看到分区的分配已经按照 json 文件中定义的方式进行了重分配。重分配的本质是首先增加新的副本,然后进行数据的同步,最后将就的副本删除。数据的复制会占用额外的资源。

修改副本因子

kafka-reassign-partition.sh 脚本还能够实现分区重分配的功能,将上一小节中的 json 文件进行修改,进行副本因子的增加,如下:

{"version": 1,"partitions": [{"topic": "topic-assign","partition": 0,"replicas": [2,1,0],"log_dirs": ["any","any","any"]},{"topic": "topic-assign","partition": 1,"replicas": [0,1,2],"log_dirs": ["any","any","any"]},{"topic": "topic-assign","partition": 2,"replicas": [2,1,0],"log_dirs": ["any","any","any"]},{"topic": "topic-assign","partition": 3,"replicas": [0,1,2],"log_dirs": ["any","any","any"]}]
}

执行脚本命令,并再次查看分区副本:

[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --execute --reassignment-json-file /root/assignment.json
Current partition replica assignment{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[0,2],"log_dirs":["any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic-assign-0,topic-assign-1,topic-assign-2,topic-assign-3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign
Topic: topic-assign     TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4       ReplicationFactor: 3    Configs:Topic: topic-assign     Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 0,2,1Topic: topic-assign     Partition: 1    Leader: 0       Replicas: 0,1,2 Isr: 0,2,1Topic: topic-assign     Partition: 2    Leader: 2       Replicas: 2,1,0 Isr: 0,2,1Topic: topic-assign     Partition: 3    Leader: 0       Replicas: 0,1,2 Isr: 0,2,1

可以看到所有分区的副本中都增加了在 brokerId=1 中的副本。该方法同样适用与减少分区副本数。