【发布时间】:2020-06-23 03:38:53
【问题描述】:
我正在尝试运行一个简单的 Kafka Connect 容器。我确实尝试了Confluent Connect Tutorial,但设置略有不同(没有 docker 机器,没有模式注册表)。
目前,我正在使用包含 Zookeeper 和 Kafka 的 Docker compose 设置。
version: '3.1'
services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- 2181
environment:
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
- ZOOKEEPER_SYNC_LIMIT=2
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 9092
- 9094:9094
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
# setup :9092 for access inside the docker network, 9094 for outside (ie host)
- KAFKA_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://kafka:9094
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_NUM_PARTITIONS=10
这适用于不同的用途,所以我不认为这是一个问题。
现在我正在启动一个可以很好地连接到 Kafka 的 Kafka Connect 容器。我使用以下改编自 Connect 教程的命令:
docker run -d \
--name=kafka-connect-test \
--net=kafka-connect_default \
--expose 28083 \
-p 28083:28083 \
-e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 \
-e CONNECT_REST_PORT=28083 \
-e CONNECT_GROUP_ID="quickstart-test" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-test-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-test-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-test-status" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
-e CONNECT_PLUGIN_PATH=/usr/share/java/kafka,/etc/kafka-connect/jars \
-v /tmp/quickstart/file:/tmp/quickstart \
-v /tmp/quickstart/jars:/etc/kafka-connect/jars \
confluentinc/cp-kafka-connect:latest
最显着的区别是我使用StringConverter,因为我想使用kafkacat 插入测试数据。
容器启动良好,并且在我尝试过的所有暴露端点上运行且可访问。 由于我没有添加任何连接器,因此我查询了可用的连接器:
localhost:28083/connector-plugins:
[
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "5.4.0-ccs"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "5.4.0-ccs"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
所以现在我创建一个将数据从主题写入文件的文件接收器就足够了。我发帖到localhost:28083/connectors
{ "name": "file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": 1,
"file": "/test.sink.txt",
"topics": "test-topic"
}
}
并接收201 - Created。
但是,当使用 GET 查询该端点时,我得到一个空数组作为响应。尝试一下,我也可以将connector.class 更改为FileStreamSinkConnector 或只是FileStreamSink,仍然会得到201(不添加连接器)。
我做错了什么?
当事情明显出错时,为什么我会得到“成功”的响应?
【问题讨论】:
-
顺便说一句,容器内的文件接收器似乎是个坏主意。如果您真的想要“生产之类”的东西,我建议您使用 Minio 和 S3 连接器
-
@cricket_007 是的,我知道,这些仅代表我使用 Connect 的第一步。没有从快速入门中获得 MySQL 示例,所以我想我会更基础。
-
是的,我很少使用 JDBC 的。 Landoop Connect 教程很好。这篇博文也是blog.minio.io/…
标签: apache-kafka apache-kafka-connect