> 文章列表 > Kafka系列——详解Kafka Connect以及其和客户端API之间的区别

Kafka系列——详解Kafka Connect以及其和客户端API之间的区别

Kafka系列——详解Kafka Connect以及其和客户端API之间的区别

如何选择Connect API和客户端API,它们的区别是什么

在向 Kafka 写入数据或从 Kafka 读取数据时,要么使用传统的生产者和消费者客户端,要么使用后面即将介绍的 Connect API 和连接器

在具体介绍 Connect API 之前,思考这样一个问题:“什么时候适合用哪一个?
我们知道,Kafka 客户端是要被内嵌到应用程序里的,应用程序使用它们向 Kafka 写入数据或从 Kafka 读取数据。
如果你是应用的开发人员,你会修改应用程序的代码,使用 Kafka 客户端将应用程序连接到Kafka,将数据推送到 Kafka 或者从 Kafka 读取数据。

如果要将 Kafka 连接到数据存储系统,则需要使用 Connect,因为这些系统不是你开发的。你无法或者也不想修改它们的代码。
Connect 可以用于从外部数据存储系统读取数据,或者将数据推送到外部存储系统。如果数据存储系统提供了相应的连接器,那么非开发人员就可以通过配置连接器的方式来使用 Connect。

如果你要连接的数据存储系统没有相应的连接器,那么可以考虑使用客户端 API 或Connect API 开发一个应用程序。但建议首选 Connect,因为它提供了一些开箱即用的特性,比如配置管理、偏移量存储、并行处理、错误处理,而且支持多种数据类型和标准 的 REST 管理 API
开发一个连接 Kafka 和外部数据存储系统的小应用程序看起来很简单, 但其实还有很多细节需要处理,比如数据类型和配置选项,这些无疑加大了开发的复杂性——Connect 处理了大部分细节,让你可以专注于数据的传输。

Kafka Connect

Connect 是 Kafka 的一部分,它为在 Kafka 和外部数据存储系统之间移动数据提供了一种可靠且可伸缩的方式。它为连接器插件提供了一组 API 和一个运行时——Connect 负责运行这些插件,它们则负责移动数据。
Connect 以 worker 进程集群的方式运行,我们基于worker 进程安装连接器插件,然后使用 REST API 来管理和配置 connector,这些 worker进程都是长时间持续运行的作业
连接器启动额外的 task,有效地利用工作节点的资源,以并行的方式移动大量的数据。
数据源的连接器负责从源系统读取数据,并把数据对象提供给 worker 进程
数据池的连接器负责从 worker 进程获取数据,并把它们写入目标系统
Connect 通过 connector 在 Kafka 里存储不同格式的数据。Kafka 支持 JSON,而且Confluent Schema Registry 提供了 Avro 转换器。开发人员可以选择数据的存储格式,这些完全独立于他们所使用的连接器。

运行Connect

Connect 随着 Kafka 一起发布,无需单独安装。如果你打算在生产环境使用 Connect 来移动大量的数据,或者打算运行多个连接器,那么最好把 Connect 部署在独立于 broker 的服务器上

在所有的机器上安装 Kafka,并在部分服务器上启动 broker,然后在其他服务器上启动 Connect

启动 Connect 进程与启动 broker 差不多,在调用脚本时传入一个属性文件即可

bin/connect-distributed.sh config/connect-distributed.properties

Connect 进程有以下几个重要的配置参数:

  • bootstrap.servers:该参数列出了将要与 Connect 协同工作的 broker 服务器连接器将会向这些 broker 写入数据或者从它们那里读取数据。不需要指定集群所有的 broker, 不过建议至少指定3个。
  • group.id:具有相同 group id 的 worker 属于同一个 Connect 集群。集群的连接器和它们的任务可以运行在任意一个 worker 上。
  • **key.converter 和 value.converter:**Connect 可以处理存储在 Kafka 里的不同格式的数据。这两个参数分别指定了消息的键和值所使用的转换器。默认使用 Kafka 提供的JSONConverter,当然也可以配置成 Conflfluent Schema Registry 提供的 AvroConverter。

有些转换器还包含了特定的配置参数。将 key.converter.schema.enable 设置成true或者false来指定 JSON 消息是否可以包含 schema。值转换器也有类似配置,但它的参数名是value.converter.schema.enable。Avro 消息也包含了 schema,但需要通过key.converter.schema.registry.url 和 value.converter.schema.registry.url 来指定Schema Registry 的位置。

一般通过 Connect 的 REST API 来配置和监控 rest.host.name 和 rest.port 连接器可以给 REST API 指定特定的端口

在启动 worker 集群之后,可以通过 REST API 来验证它们是否运行正常。

gwen$ curl http://localhost:8083/
{“version”:“0.10.1.0-SNAPSHOT”,“commit”:“561f45d747cd2a8c”}

这个 REST URI 应该要返回当前 Connect 的版本号。比如运行的是 Kafka 0.10.1.0(预发行)快照版本。还可以检查已经安装好的连接器插件

gwen$ curl http://localhost:8083/connector-plugins
[{“class”:“org.apache.kafka.connect.file.FileStreamSourceConnector”},
{“class”:“org.apache.kafka.connect.file.FileStreamSinkConnector”}]

单机模式

Connect也支持单机模式。单机模式与分布式模式类似,只是在启动时用 bin/connect-standalone.sh 代替 bin/connect-distributed.sh,也可以通过命令行传入连接器的配置文件,这样就不需要使用 REST API 了。
在单机模式下,所有的连接器和任务都运行在单独的 worker 进程上。单机 模式使用起来更简单,特别是在开发和诊断问题的时候,或者是在需要让连接器和任务运行在某台特定机器上的时候(比如 Syslog 连接器会监听某个端口,所以你需要知道它运行在哪台机器上)。

连接器示例——文件数据源和文件数据池

这个例子使用了文件连接器和 JSON 转换器,它们都是 Kafka 自带的。首先要确保Zookeeper 和 Kafka 都处于运行状态。

接下来首先启动一个分布式的 worker 进程。为了实现高可用性,真实的生产环境一般需要至少2~3 个 worker 集群。为了简单起见,在本例中只启动 1 个。

bin/connect-distributed.sh config/connect-distributed.properties &

现在开始启动一个文件数据源。为了方便,直接让它读取 Kafka 的配置文件——把 Kafka的配置文件内容发送到主题上

echo ‘{“name”:“load-kafka-config”, “config”:{“connector.class”:“FileStream
Source”,“file”:“config/server.properties”,“topic”:“kafka-config-topic”}}’ |
curl -X POST -d @- http://localhost:8083/connectors --header “contentType:application/json”

{"name": "load-kafka-config","config": {"connector.class": "FileStreamSource","file": "config/server.properties","topic": "kafka-config-topic","name": "load-kafka-config"},"tasks": [{"connector": "load-kafka-config","task": 0}],"type": "source"
}

为了创建connector,这里写了一个JSON片段,里面包含了连接器的名字 load-kafka-config 和连接器的配置信 息,配置信息包含了连接器的类名、需要加载的文件名和主题的名字。

下面通过 Kafka 的控制台消费者来验证配置文件是否已经被加载到主题上了:

gwen$ bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092
--topic kafka-config-topic --from-beginning

如果一切正常,可以看到如下的输出:

{"schema":{"type":"string","optional":false},"payload":"# Licensed to the 
Apache Software Foundation (ASF) under one or more"} 
<more stuff here> 
{"schema":{"type":"string","optional":false},"pay
load":"############################# Server Basics 
#############################"} 
{"schema":{"type":"string","optional":false},"payload":""} 
{"schema":{"type":"string","optional":false},"payload":"# The id of the broker. 
This must be set to a unique integer for each broker."} 
{"schema":{"type":"string","optional":false},"payload":"broker.id=0"} 
{"schema":{"type":"string","optional":false},"payload":""} 
<more stuff here>

以上输出的是 config/server.properties 文件的内容,这些内容被一行一行地转成 JSON记录,并被连接器发送到 kafka-config-topic 主题上。
默认情况下,JSON 转换器会在每个记录里附带上 schema。这里的 schema 非常简单——只有一个 payload 列,它是字符串类型,并且包含了文件里的一行内容。

现在使用文件数据池的转换器把主题里的内容导到文件里导出的文件内容应该与原始server.properties 文件的内容完全一样,JSON 转化器将会把每个 JSON 记录转成单行文本:

echo '{"name":"dump-kafka-config", "config":
{"connector.class":"FileStreamSink","file":"copy-of-server-properties","topics":"kafka-config-topic"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"{"name":"dump-kafka-config","config":
{"connector.class":"FileStreamSink","file":"copy-of-server-properties",
"topics":"kafka-config-topic","name":"dump-kafka-config"},"tasks":
[]}

这次的配置发生了变化:这里使用了类名 FileStreamSink,而不是 FileStreamSource文件属性指向目标文件,而不是原先的文件;指定了 topics,而不是 topic。
可以使用数据池将多个主题写入一个文件,而一个数据源只允许被写入一个主题

如果一切正常,你会得到一个叫作 copy-of-server-properties 的文件,该文件的内容与config/server.properties 完全一样。 如果要删除一个连接器,可以运行下面的命令:

curl -X DELETE http://localhost:8083/connectors/dump-kafka-config

在删除连接器之后,如果查看 Connect 的日志,你会发现其他的连接器会重启它们的任务。
这是为了在 worker 进程间平衡剩余的任务,确保删除连接器之后可以保持负载的均衡。

参考这里哦