【问题标题】:Kafka Connect Elasticsearch sink no documents are indexedKafka Connect Elasticsearch sink 没有文档被索引
【发布时间】:2019-06-21 10:55:21
【问题描述】:

我正在尝试设置一个测试以将数据从 MySQL 移动到 Elasticsearch。

我有一个带有代理、zookeeper、connect、ksql 服务器和 cli、模式注册表和 Elasticsearch 的 dockerized 设置。我正在使用融合版本 5.1.0 中的 docker 图像,而对于 Elasticsearch,我正在使用 elasticsearch:6.5.4

我配置了一个 JDBC 连接器 以将数据从 MySQL 获取到 Kafka,这是有效的MySQL。

我还配置了一个Elasticsearch sink 连接器,连接器创建成功,Elasticsearch 中的索引也在那里,但是我看到我的Elasticsearch 索引中没有任何文档。 p>

这是 ES sink 连接器配置:

{
    "name": "es-connector",
    "config": {
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "connection.url": "http://es:9200",
            "type.name": "_doc",
            "topics": "test_topic",
            "drop.invalid.message": true,
            "behavior.on.null.values": "ignore",
            "behavior.on.malformed.documents": "ignore",
            "schema.ignore": true
    }
}

这是我查询接收器连接器状态时看到的:curl -X GET http://connect:8083/connectors/es-connector

{
    "name": "es-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect:8083"
    },
    "tasks": [
        {
            "state": "RUNNING",
            "id": 0,
            "worker_id": "connect:8083"
        }
    ],
    "type": "sink"
}

在 Elasticsearch 中我可以看到索引 http://es:9200/test_topic/_search

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 0,
    "max_score": null,
    "hits": []
  }
}

我一直在 MySQL 中进行更新和插入,我使用 ksql-cli 在流中看到消息,但在 Elasticsearch 中没有创建任何文档。我什至使用kafka-avro-console-producer 手动创建了一个主题并发布了消息,然后为该主题创建了第二个接收器连接器,结果相同,我看到了索引但没有文档。

我在 kafka-connect 中没有看到任何错误,所以我不明白为什么不起作用。连接器配置有问题吗?我错过了什么吗?

编辑:

对于 Elasticsearch 接收器配置,我尝试了使用和不使用这些行:

"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true

结果是一样的,没有文件。

编辑

我发现了错误:

key作为文档id,不能为空

【问题讨论】:

  • 您能否添加示例消息,用作 Elasticsearch sink 连接器的源?
  • @wardziniak 我不明白为什么这很重要,但在这里,对于 jdbc 连接器,消息看起来像:{"email", "some@some.com"}而对于临时主题,我创建的消息看起来像 {"f1":"value1"}
  • 我建议提高 Kafka Connect 的日志级别并检查日志记录的内容,可能它们是空值

标签: elasticsearch apache-kafka apache-kafka-connect


【解决方案1】:

"key.ignore": true

Elasticsearch sink 将使用 topic+partition+offset 作为 Elasticsearch 文档 ID。正如您所发现的,您将为每条 消息获得一个新文档。

"key.ignore": false

Elasticsearch 接收器将使用 Kafka 消息的密钥 作为 Elasticsearch 文档 ID。如果您的 Kafka 消息中没有密钥,您将收到错误 Key is used as document id and cannot be null,这是可以理解的。您可以使用各种方法在 Kafka 消息中设置密钥,如果您通过 Kafka Connect detailed here 摄取,则可以使用单消息转换来设置 Kafka 消息密钥。

【讨论】:

    猜你喜欢
    • 2021-07-15
    • 2020-01-07
    • 1970-01-01
    • 2017-03-26
    • 1970-01-01
    • 2018-10-24
    • 2022-12-19
    • 2019-04-18
    • 1970-01-01
    相关资源
    最近更新 更多