【问题标题】:Kafka Connect in Docker container: Connector not addedDocker 容器中的 Kafka Connect:未添加连接器
【发布时间】:2020-06-23 03:38:53
【问题描述】:

我正在尝试运行一个简单的 Kafka Connect 容器。我确实尝试了Confluent Connect Tutorial,但设置略有不同(没有 docker 机器,没有模式注册表)。

目前,我正在使用包含 Zookeeper 和 Kafka 的 Docker compose 设置。

version: '3.1'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    ports:
      - 2181
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_SYNC_LIMIT=2
  kafka:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    ports:
      - 9092
      - 9094:9094
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      # setup :9092 for access inside the docker network, 9094 for outside (ie host)
      - KAFKA_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://kafka:9094
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_NUM_PARTITIONS=10

这适用于不同的用途,所以我不认为这是一个问题。

现在我正在启动一个可以很好地连接到 Kafka 的 Kafka Connect 容器。我使用以下改编自 Connect 教程的命令:

docker run -d \
  --name=kafka-connect-test \
  --net=kafka-connect_default \
  --expose 28083 \
  -p 28083:28083 \
  -e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 \
  -e CONNECT_REST_PORT=28083 \
  -e CONNECT_GROUP_ID="quickstart-test" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-test-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-test-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="quickstart-test-status" \
  -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
  -e CONNECT_PLUGIN_PATH=/usr/share/java/kafka,/etc/kafka-connect/jars \
  -v /tmp/quickstart/file:/tmp/quickstart \
  -v /tmp/quickstart/jars:/etc/kafka-connect/jars \
  confluentinc/cp-kafka-connect:latest

最显着的区别是我使用StringConverter,因为我想使用kafkacat 插入测试数据。

容器启动良好,并且在我尝试过的所有暴露端点上运行且可访问。 由于我没有添加任何连接器,因此我查询了可用的连接器:

localhost:28083/connector-plugins:

[
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "5.4.0-ccs"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "5.4.0-ccs"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

所以现在我创建一个将数据从主题写入文件的文件接收器就足够了。我发帖到localhost:28083/connectors

{ "name": "file-sink", 
 "config": { 
     "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
     "tasks.max": 1, 
     "file": "/test.sink.txt",
     "topics": "test-topic"
 }
}

并接收201 - Created

但是,当使用 GET 查询该端点时,我得到一个空数组作为响应。尝试一下,我也可以将connector.class 更改为FileStreamSinkConnector 或只是FileStreamSink,仍然会得到201(不添加连接器)。

我做错了什么?

当事情明显出错时,为什么我会得到“成功”的响应?

【问题讨论】:

  • 顺便说一句,容器内的文件接收器似乎是个坏主意。如果您真的想要“生产之类”的东西,我建议您使用 Minio 和 S3 连接器
  • @cricket_007 是的,我知道,这些仅代表我使用 Connect 的第一步。没有从快速入门中获得 MySQL 示例,所以我想我会更基础。
  • 是的,我很少使用 JDBC 的。 Landoop Connect 教程很好。这篇博文也是blog.minio.io/…

标签: apache-kafka apache-kafka-connect


【解决方案1】:

问题是这样的:

-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \

internal 转换器就是这样 - 内部的,自 Apache Kafka 2.0 版起,deprecated。如果您在创建连接器后检查 Kafka Connect 工作器日志,您会看到:

ERROR Found configuration for connector 'connector-file-sink' in wrong format: class java.lang.String (org.apache.kafka.connect.storage.KafkaConfigBackingStore)

这是因为 Kafka Connect 将 Kafka 本身用作状态存储,当您创建连接器时,它会将其存储在 Kafka 主题 (CONNECT_CONFIG_STORAGE_TOPIC) 中。这默认为 JSON,看起来 Kafka Connect 不喜欢被改变(事实上,没有理由改变它)。

如果您像以前一样运行相同的 Docker 命令,但没有两条 CONNECT_INTERNAL_ 转换器行,您会发现一切正常。

这是正在创建的连接器(我使用PUT 而不是POST,因为它是幂等的并且更容易重新运行):

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:28083/connectors/file-sink/config \
    -d '{
     "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
     "tasks.max": 1,
     "file": "/test.sink.txt",
     "topics": "test-topic"
}'
HTTP/1.1 201 Created
Date: Wed, 11 Mar 2020 09:16:04 GMT
Location: http://localhost:28083/connectors/file-sink
Content-Type: application/json
Content-Length: 211
Server: Jetty(9.4.20.v20190813)

{"name":"file-sink","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","tasks.max":"1","file":"/test.sink.txt","topics":"test-topic","name":"file-sink"},"tasks":[],"type":"sink"}%

现在检查它是否正在运行(使用一些 bash 的东西来很好地重新格式化它):

curl -s "http://localhost:28083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort
sink  |  file-sink  |  RUNNING  |  RUNNING  | org.apache.kafka.connect.file.FileStreamSinkConnector

向主题发送一些数据:

➜ kafkacat -b localhost:9094 -t test-topic -P -K:
1:foo
2:bar

观察Kafka Connect写入的文件中的数据:

➜ docker exec -t kafka-connect-test bash -c 'tail -f /test.sink.txt'
foo
bar

顺便说一句:

最显着的区别是我使用StringConverter,因为我想使用kafkacat插入测试数据。

请注意,您可以在配置中为每个连接器设置转换器;在 worker (即全局)级别设置 StringConverter 可能不是一个好主意,因为您很少使用它,当然对于值。


有关 Kafka Connect 的更多信息,请查看:

【讨论】:

  • 你能在http://localhost:28083/connectors/<connector-name>/config(使用PUT)“创建”一个连接器吗?你不需要先发帖到/connectors吗?
  • 你可以单独使用PUT
猜你喜欢
  • 2018-08-27
  • 2021-07-06
  • 1970-01-01
  • 2018-12-14
  • 2020-02-09
  • 2021-01-13
  • 2020-05-01
  • 2019-06-17
  • 2021-07-18
相关资源
最近更新 更多