【问题标题】:How to upsert or partial updates with script documents in ElasticSearch with Spark?如何使用 Spark 对 ElasticSearch 中的脚本文档进行更新插入或部分更新?
【发布时间】:2018-04-28 11:53:39
【问题描述】:

我在 python 中有一个伪代码,它从 Kafka 流中读取并在 Elasticsearch 中插入文档(如果文档已经存在,则增加一个计数器 view

for message in consumer:

    msg = json.loads(message.value)
    print(msg)
    index = INDEX_NAME
    es_id = msg["id"]
    script = {"script":"ctx._source.view+=1","upsert" : msg}
    es.update(index=index, doc_type="test", id=es_id, body=script)

由于我想在分布式环境中使用它,所以我使用的是 Spark Structured Streaming

df.writeStream \
.format("org.elasticsearch.spark.sql")\
.queryName("ESquery")\
.option("es.resource","credentials/url") \
.option("checkpointLocation", "checkpoint").start()

或从 KafkaStream 读取的 scala 中的 SparkStreaming:

// Initializing Spark Streaming Context and kafka stream
sparkConf.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
[...] 
val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicsSet, kafkaParams)
    )

[...]
val urls = messages.map(record => JsonParser.parse(record.value()).values.asInstanceOf[Map[String, Any]])
urls.saveToEs("credentials/credential")

.saveToEs(...)elastic-hadoop.jar 的API,记录在案的here。不幸的是,this repo 并没有很好的记录。所以我不明白我可以把脚本命令放在哪里。

有人可以帮助我吗?提前谢谢你

【问题讨论】:

    标签: python scala elasticsearch spark-streaming spark-structured-streaming


    【解决方案1】:

    您应该能够通过设置写入模式“update”(或 upsert)并将您的脚本作为“script”(取决于 ES 版本)传递来做到这一点。

    EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id", "es.write.operation" -> "update","es.update.script.inline" -> "your script" , ))
    

    可能你想使用“upsert”

    在同一个库中有一些不错的unit tests in cascading integration;这些设置应该对 spark 有好处,因为两者都使用相同的 writer。

    我建议阅读单元测试来为您的 ES 版本选择正确的设置。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-04
      • 1970-01-01
      • 2016-05-21
      • 2021-12-14
      • 1970-01-01
      • 2016-08-11
      相关资源
      最近更新 更多