【问题标题】:rename index in elasticsearch with kafka sink使用kafka sink重命名elasticsearch中的索引
【发布时间】:2018-10-24 15:49:09
【问题描述】:

我正在使用以下水槽。问题是它将弹性搜索索引名称设置为与主题相同。我想要一个不同的 elasticseach 索引名称。我怎样才能做到这一点。我正在使用融合 4

{
  "name": "es-sink-mysql-foobar-02",
  "config": {
    "_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.schema.registry.url": "http://localhost:8081",


    "_comment": "--- Elasticsearch-specific config ---",
    "_comment": "Elasticsearch server address",
    "connection.url": "http://localhost:9200",

    "_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist  ",
    "type.name": "type.name=kafka-connect",
    "index.name": "asimtest",
    "_comment": "Which topic to stream data from into Elasticsearch",
    "topics": "mysql-foobar",

    "_comment": "If the Kafka message doesn't have a key (as is the case with JDBC source)  you need to specify key.ignore=true. If you don't, you'll get an error from the Connect task: 'ConnectException: Key is used as document id and can not be null.",
    "key.ignore": "true"
  }
}

【问题讨论】:

    标签: elasticsearch apache-kafka kafka-consumer-api apache-kafka-connect confluent-platform


    【解决方案1】:

    为此使用 Kafka Connect 的 单消息转换 (SMT) 功能。

    例如,删除mysql- 前缀:

    "_comment": "Drop the mysql- prefix from the topic name and thus Elasticsearch index name",
    "transforms": "dropPrefix",
    "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex":"mysql-(.*)",
    "transforms.dropPrefix.replacement":"$1"
    

    或删除前缀并将消息路由到基于时间的 Elasticsearch 索引:

     "transforms":"dropPrefix,routeTS",  
     "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",  
     "transforms.dropPrefix.regex":"mysql-(.*)",  
     "transforms.dropPrefix.replacement":"$1",  
     "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",  
     "transforms.routeTS.topic.format":"kafka-${topic}-${timestamp}",  
     "transforms.routeTS.timestamp.format":"YYYYMM"
    

    更多详情请见https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

    【讨论】:

    • 我已经厌倦了 SMT。问题是我想根据正在运行的配置接收器有一个单独的索引。通过以上操作,我仍然会有相同的索引
    • 那么为每个接收器使用不同的 SMT?请您更清楚地解释为什么您不能使用 SMT?
    • 我想我可以......对此太陌生了。我会接受答案谢谢你
    猜你喜欢
    • 2019-06-21
    • 2020-07-28
    • 1970-01-01
    • 1970-01-01
    • 2019-04-18
    • 1970-01-01
    • 2021-02-28
    • 2020-11-11
    • 2013-04-22
    相关资源
    最近更新 更多