【问题标题】:Trying to index kafka topic in Elasticsearch with Kafka Connect尝试使用 Kafka Connect 在 Elasticsearch 中索引 kafka 主题
【发布时间】:2019-04-18 18:54:21
【问题描述】:

我想将一个主题从 avro 中的 kafka 索引到 elasticsearch 格式,但是 我的时间戳字段有问题需要被识别 elasticsearch 作为日期格式字段。

我已经为连接器使用了以下配置。

   {
          "name": "es-sink-barchart-10",
      "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schema.registry.url": "http://localhost:8081",

        "connection.url": "http://localhost:9200",

        "type.name":"type.name=kafka-connect",

        "topics": "exchange_avro_01",

        "topic.index.map": "exchange_avro_01:exchange_barchart",

        "key.ignore": "true"
     }
    }

原始字段是 bigint 类型,我希望目标字段是具有弹性搜索的任何有效格式的日期类型。我已经定义了一个动态模板来尝试通过以下方式解决它:

curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d'
{
  "index_patterns": "exchange*",
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "kafka-connect": {
      "dynamic_templates": [
    {
          "dates": {
        "match_mapping_type": "long",
            "match": "TIME",
            "mapping": {
              "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss"
            }
          }
        }
      ]
     ,
      "properties": {
          "CLOSE": {
            "type": "double"
          },
         .
         .
         .
        }
      }

    }
  }
}'

当我加载上述连接器时,没有任何内容被索引到 elasticsearch。

有什么帮助吗?

【问题讨论】:

    标签: elasticsearch apache-kafka apache-kafka-connect


    【解决方案1】:

    如果您的来源是 bigint,那么大概是一个时代。如果它是一个时代,那么这将不起作用:

    "mapping": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss"
            }
    

    因为您告诉 Elasticsearch 日期格式是 yyyy-MM-dd HH:mm:ss(它不是)。

    所以,试试这个(暂时忽略你的自定义映射;先让它工作,然后再添加):

    {
      "index_patterns": "exchange*",
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
      },
      "mappings": {
        "kafka-connect": {
          "dynamic_templates": [
            {
              "dates": {
                "match": "TIME",
                "mapping": {
                  "type": "date"
                } } } ] } } }
    

    另请参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection

    弹性搜索没有任何索引。

    检查 Kafka Connect 工作器日志和 Elasticsearch 日志是否有任何错误。

    【讨论】:

    • 如何查看 kafka connect worker 日志和 elasticsearch 日志?
    • 关于如何检查连接日志的信息here。如果您从命令行启动,Elasticsearch 的日志将在标准输出中,如果不是,则在相应的日志文件夹中。
    • 解决方案似乎运行良好,但性能受到了巨大影响。有没有其他方法可以改造它?更新模式注册表中的模式怎么样?
    猜你喜欢
    • 2019-06-21
    • 2017-01-08
    • 2017-11-07
    • 1970-01-01
    • 1970-01-01
    • 2021-10-27
    • 2020-01-19
    • 2019-07-08
    • 2019-07-02
    相关资源
    最近更新 更多