【发布时间】:2019-06-06 06:57:13
【问题描述】:
我们有一个 spark Streaming 应用程序。 架构如下
Kinesis 到 Spark 到 Kafka。
Spark 应用程序正在使用 qubole/kinesis-sql 从 Kinesis 进行结构化流式传输。然后将数据聚合,然后推送到 Kafka。
我们的用例要求在推送到 Kafka 之前延迟 4 分钟。
开窗时间为 2 分钟,水印时间为 4 分钟
val windowedCountsDF = messageDS
.withWatermark("timestamp", "4 minutes")
.groupBy(window($"timestamp", "2 minutes", "2 minutes"), $"id", $"eventType", $"topic")
每两分钟触发一次写入 Kafka
val eventFilteredQuery = windowedCountsDF
.selectExpr("topic", "id as key", "to_json(struct(*)) AS value")
.writeStream
.trigger(Trigger.ProcessingTime("2 minutes"))
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("checkpointLocation", checkPoint)
.outputMode("update")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.queryName("events_kafka_stream")
.start()
我可以更改触发时间以匹配窗口,但仍有一些事件会立即推送到 kafka。
有没有办法在窗口完成后延迟 x 分钟写入 Kafka。
谢谢
【问题讨论】:
标签: scala apache-spark apache-kafka spark-streaming amazon-kinesis