【发布时间】:2019-06-21 10:55:21
【问题描述】:
我正在尝试设置一个测试以将数据从 MySQL 移动到 Elasticsearch。
我有一个带有代理、zookeeper、connect、ksql 服务器和 cli、模式注册表和 Elasticsearch 的 dockerized 设置。我正在使用融合版本 5.1.0 中的 docker 图像,而对于 Elasticsearch,我正在使用 elasticsearch:6.5.4
我配置了一个 JDBC 连接器 以将数据从 MySQL 获取到 Kafka,这是有效的MySQL。
我还配置了一个Elasticsearch sink 连接器,连接器创建成功,Elasticsearch 中的索引也在那里,但是我看到我的Elasticsearch 索引中没有任何文档。 p>
这是 ES sink 连接器配置:
{
"name": "es-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.url": "http://es:9200",
"type.name": "_doc",
"topics": "test_topic",
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
}
}
这是我查询接收器连接器状态时看到的:curl -X GET http://connect:8083/connectors/es-connector
{
"name": "es-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "connect:8083"
}
],
"type": "sink"
}
在 Elasticsearch 中我可以看到索引 http://es:9200/test_topic/_search
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}
我一直在 MySQL 中进行更新和插入,我使用 ksql-cli 在流中看到消息,但在 Elasticsearch 中没有创建任何文档。我什至使用kafka-avro-console-producer 手动创建了一个主题并发布了消息,然后为该主题创建了第二个接收器连接器,结果相同,我看到了索引但没有文档。
我在 kafka-connect 中没有看到任何错误,所以我不明白为什么不起作用。连接器配置有问题吗?我错过了什么吗?
编辑:
对于 Elasticsearch 接收器配置,我尝试了使用和不使用这些行:
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
结果是一样的,没有文件。
编辑
我发现了错误:
key作为文档id,不能为空
。
【问题讨论】:
-
您能否添加示例消息,用作 Elasticsearch sink 连接器的源?
-
@wardziniak 我不明白为什么这很重要,但在这里,对于 jdbc 连接器,消息看起来像:
{"email", "some@some.com"}而对于临时主题,我创建的消息看起来像{"f1":"value1"} -
我建议提高 Kafka Connect 的日志级别并检查日志记录的内容,可能它们是空值
标签: elasticsearch apache-kafka apache-kafka-connect