【发布时间】:2018-04-28 11:53:39
【问题描述】:
我在 python 中有一个伪代码,它从 Kafka 流中读取并在 Elasticsearch 中插入文档(如果文档已经存在,则增加一个计数器 view。
for message in consumer:
msg = json.loads(message.value)
print(msg)
index = INDEX_NAME
es_id = msg["id"]
script = {"script":"ctx._source.view+=1","upsert" : msg}
es.update(index=index, doc_type="test", id=es_id, body=script)
由于我想在分布式环境中使用它,所以我使用的是 Spark Structured Streaming
df.writeStream \
.format("org.elasticsearch.spark.sql")\
.queryName("ESquery")\
.option("es.resource","credentials/url") \
.option("checkpointLocation", "checkpoint").start()
或从 KafkaStream 读取的 scala 中的 SparkStreaming:
// Initializing Spark Streaming Context and kafka stream
sparkConf.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
[...]
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
[...]
val urls = messages.map(record => JsonParser.parse(record.value()).values.asInstanceOf[Map[String, Any]])
urls.saveToEs("credentials/credential")
.saveToEs(...) 是elastic-hadoop.jar 的API,记录在案的here。不幸的是,this repo 并没有很好的记录。所以我不明白我可以把脚本命令放在哪里。
有人可以帮助我吗?提前谢谢你
【问题讨论】:
标签: python scala elasticsearch spark-streaming spark-structured-streaming