> 文章列表 > 【案例分享】clickhouse无认证kafka迁移到有认证kafka方案

【案例分享】clickhouse无认证kafka迁移到有认证kafka方案

【案例分享】clickhouse无认证kafka迁移到有认证kafka方案

场景描述

假设当前的clickhouse 与kafka对接使用的是无认证的方式, 要求将clickhouse迁移到有认证的kafka, 协议使用security_protocol=SASL_SSL。
假设当前已经接入了许多topic,希望有一个平滑的过渡,即可以逐个topic 从无认证的kafka集群迁移到另外一个有认证的kafka集群,并且迁移过程中,不影响其他topic的正常接入。

思路简介

  • 难点在于clickhouse需要支持同时对接具备不同认证的kafka集群
  • 解决方案: 只需要先在clickhouse的xml配置文件中添加kafka的相关认证信息,然后重启clickhouse 集群生效, 最后重建一下kafka表就可以了。

clickhouse迁移kafka 具体方案

前置条件:

  • 日志已经发送到新的有认证的kafka集群, kafka中已经创建好topic并且设置用户权限。 这里以华为云的kafka为例
  • clickhouse版本为 v21.12 以上

获取kafka 认证所需信息

  1. 下载kafka认证证书 phy_ca.crt, 将证书移动到clickhouse目录位置: path_to_ca, 确保文件权限正确
  2. 获取华为云kafka对应用户名密码

修改Clickhouse配置,支持kafka认证

注意:这个全局只需要配置一次即可, 如果clickhouse已经配置完成,则直接跳到下一步重建input表, 否则按照指导开始配置。

在所有的ck节点,做如下配置操作:

  1. 修改配置文件:
# 编辑clickhouse配置文件
vi config.xml

原有的<yandex></yandex>标签内, 添加以下配置:

    <named_collections><kafka_new><kafka><bootstrap_servers>1</bootstrap_servers><security_protocol>sasl_ssl</security_protocol><sasl_mechanism>PLAIN</sasl_mechanism><sasl_username></sasl_username><sasl_password></sasl_password><debug>all</debug><auto_offset_reset>latest</auto_offset_reset><compression_type>snappy</compression_type><ssl_ca_location>path_to_ca</ssl_ca_location></kafka></kafka_new></named_collections>

配置说明:

<named_collections> : 内置定义, 表示定义命名空间, 该配置可以支持多套kafka集群。 
<kafka_new>: 自定义, 可以在创建input时,引用这个自定义的配置名称。
<kafka>: 内置定义, 表示配置的是kafka引擎参数
其他: 内置定位, kafka的具体参数配置

1.1 补充: 对于v21.12版本以前的clickhouse, 需要逐个对topic进行配置, 下面给出配置案例指导:

 <kafka_mytopic1><bootstrap_servers>kafka1</bootstrap_servers><security_protocol>sasl_ssl</security_protocol><sasl_mechanism>PLAIN</sasl_mechanism><sasl_username></sasl_username><sasl_password></sasl_password><debug>all</debug><auto_offset_reset>latest</auto_offset_reset><ssl_ca_location></ssl_ca_location></kafka_mytopic1><kafka_mytopic2><bootstrap_servers>kafka2</bootstrap_servers><security_protocol>plaintext</security_protocol><debug>all</debug><auto_offset_reset>latest</auto_offset_reset><compression_type>snappy</compression_type></kafka_mytopic2>

显然, 这种配置不是特别方面,迁移中,配置clickhouse的频率以及重启的频率会增多

  1. 重启clickhouse 进程:
systemctl restart clickhouse-server

重建input表

重建input表时, 注意:需要额外添加kafka自定义配置,ENGINE = Kafka(kafka_new), 其他配置与之前保持一致即可。
示例:

CREATE TABLE input_index.table (`log_time` String,  `detail` String) ENGINE = Kafka(kafka_new) SETTINGS kafka_broker_list = '', kafka_topic_list = '', kafka_group_name = '', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 1, kafka_num_consumers = 1

验证是否成功

有新增日志,查询local表, 确认ck与kafka是否对接正常。

回滚操作

若切换失败,需要进行回滚操作。

  1. 若clickhouse 配置后无法正常启动, 则删除新增的配置项, 执行systemctl restart clickhouse-server即可恢复。
  2. 若input表重建后无法正常工作,则重建回原有的input表。

参考资料

https://github.com/ClickHouse/ClickHouse/issues/48370

https://kb.altinity.com/altinity-kb-integrations/altinity-kb-kafka/altinity-kb-adjusting-librdkafka-settings/#different-configurations-for-different-tables