【问题标题】:How to upsert into elasticsearch in spark?如何在火花中插入弹性搜索?
【发布时间】:2015-12-12 21:06:51
【问题描述】:

使用 HTTP POST,以下脚本可以插入新字段 createtime 或更新 lastupdatetime

curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
    "lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
    "createtime": "2015-09-16T18:00:00"
    "lastupdatetime": "2015-09-16T18:00",
}
}'

但在 spark 脚本中,设置 "es.write.operation": "upsert" 后,我根本不知道如何插入 createtimeofficial document中只有es.update.script.*...那么,谁能举个例子?

更新:在我的例子中,我想将android设备的信息从log保存为one elasticsearch类型,并将其首次出现时间设置为createtime。如果设备再次出现,我只更新lastupdatetime,但保持createtime 不变。

所以文档id是android ID,如果id存在,更新lastupdatetime,否则插入createtimelastupdatetime。所以这里的设置是(在python中):

conf = {
    "es.resource.write": "stats-device/activation",
    "es.nodes": "NODE1:9200",
    "es.write.operation": "upsert",
    "es.mapping.id": "id"
    # ???
}

rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=conf
)

如果id 不存在,我只是不知道如何插入 字段。

【问题讨论】:

    标签: hadoop elasticsearch apache-spark pyspark


    【解决方案1】:

    没有看到您的 Spark 脚本,很难给出详细的答案。但一般来说,您会想要使用elasticsearch-hadoop(因此您需要将该依赖项添加到您的 Build.sbt 文件中,例如),然后在您的脚本中您可以:

    import org.elasticsearch.spark._ 
    val documents = sc.parallelize(Seq(Map(
                                       "id" -> 1, 
                                       "createtime" -> "2015-09-16T18:00:00"
                                       "lastupdatetime" -> "2015-09-16T18:00"),
                                      Map(<next document>), ...)
                       .saveToEs("test/type1", Map("es.mapping.id" -> "id"))
    

    根据official docs。 saveToES 的第二个参数指定 Maps 的 RDD 中的哪个键用作 ElasticSearch 文档 ID。

    当然,如果您使用 Spark 执行此操作,则意味着您的行数比您想要手动输入的要多,因此对于您的情况,您需要将数据转换为 Maps 的 RDD从脚本中的键-> 值。但在不知道数据源的情况下,我无法更详细地介绍。

    【讨论】:

    • 然后将设置 es.write.operation 设置为 upsert 就像你所做的那样,然后使用 rdd.saveToEs() 应该做你想做的事。
    • 不,createtime 永远不应该被更新。直接使用upsert会覆盖这个字段。
    • 我明白了。所以你想做一个部分文档的 upsert。例如。 elastic.co/guide/en/elasticsearch/reference/master/…
    【解决方案2】:

    最后,我得到了一个不完美的解决方案:

    1. 在所有源文档中添加createtime
    2. 使用create方法保存到es并忽略已经创建的错误;
    3. 删除createtime字段;
    4. update方法再次保存到es;

    目前(2015-09-27),第二步可以通过this patch实现。

    【讨论】:

    • 如果我可以问,409 是什么?
    • 你能发布你得到的整个错误吗?似乎您将 409 与其他东西混淆了。因为 Elasticsearch 默认更新一个已经存在的文档!
    • 请注意,我首先使用create,而不是update。如果 Elasticsearch 中已经存在id,则会抛出异常,但不会更改文档。这里的异常已经创建
    猜你喜欢
    • 1970-01-01
    • 2019-12-03
    • 2015-11-06
    • 1970-01-01
    • 2018-02-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-20
    相关资源
    最近更新 更多