【问题标题】:how to map an input document field to the elasticsearch _id field?如何将输入文档字段映射到 elasticsearch _id 字段?
【发布时间】:2018-03-24 02:23:35
【问题描述】:

我有一个相当简单的管道,用于从 Kafka 获取 json 消息并将它们发送到 Elasticsearch:

input {
    kafka {
        bootstrap_servers =>  "kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093"
        topics => [ "transactions_load" ]
    }
}
filter {
  json {
    source => "message"
  }
  mutate{
    remove_field => ["kafka"]
    remove_field => ["@version"]
    remove_field => ["@timestamp"]
    remove_field => ["message"]
    remove_tag => ["multiline"]
  }
}
output {
    elasticsearch {
        hosts => [
                "xxxxx.ibm-343.composedb.com:16915",
                "xxxxx.ibm-343.composedb.com:16915"
            ]
        ssl => true
        user => "logstash_kafka"
        password => "*****"
        index => "pos_transactions"
    }
}

json 记录有一个TransactionID 字段,用于唯一标识每条记录:

{"TransactionID": "5440772161", "InvoiceNo": 5440772, "StockCode": 22294, "Description": "HEART FILIGREE DOVE  SMALL", "Quantity": 4, "InvoiceDate": 1507777440000, "UnitPrice": 1.25, "CustomerID": 14825, "Country": "United Kingdom", "LineNo": 16, "InvoiceTime": "03:04:00", "StoreID": 1}
{"TransactionID": "5440772191", "InvoiceNo": 5440772, "StockCode": 21733, "Description": "RED HANGING HEART T-LIGHT HOLDER", "Quantity": 4, "InvoiceDate": 1507777440000, "UnitPrice": 2.95, "CustomerID": 14825, "Country": "United Kingdom", "LineNo": 19, "InvoiceTime": "03:04:00", "StoreID": 1}

我可以将logstash 配置为使用TransactionID 作为_id 字段,以便在我处理同一事务的重复记录时,这些更新是幂等的吗?

【问题讨论】:

    标签: elasticsearch logstash


    【解决方案1】:

    我自己想出了答案。在这里发帖是因为它可能对其他人有用:

    output {
        elasticsearch {
            hosts => [
                    "xxxxx.ibm-343.composedb.com:16915",
                    "xxxxx.ibm-343.composedb.com:16915"
                ]
            ssl => true
            user => "logstash_kafka"
            password => "*****"
            index => "pos_transactions"
            document_id => "%{TransactionID}"
        }
    }
    

    document_id => "%{TransactionID}" 配置条目使用传入文档TransactionID 字段为elasticsearch _id

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-08-31
      • 2016-09-14
      • 1970-01-01
      相关资源
      最近更新 更多