【问题标题】:Spark Streaming write to Kafka with delay - after x minutesSpark Streaming 延迟写入 Kafka - x 分钟后
【发布时间】:2019-06-06 06:57:13
【问题描述】:

我们有一个 spark Streaming 应用程序。 架构如下

Kinesis 到 Spark 到 Kafka。

Spark 应用程序正在使用 qubole/kinesis-sql 从 Kinesis 进行结构化流式传输。然后将数据聚合,然后推送到 Kafka。

我们的用例要求在推送到 Kafka 之前延迟 4 分钟。

开窗时间为 2 分钟,水印时间为 4 分钟

val windowedCountsDF = messageDS
   .withWatermark("timestamp", "4 minutes")
   .groupBy(window($"timestamp", "2 minutes", "2 minutes"), $"id", $"eventType", $"topic")

每两分钟触发一次写入 Kafka

val eventFilteredQuery = windowedCountsDF
  .selectExpr("topic", "id as key", "to_json(struct(*)) AS value")
  .writeStream
  .trigger(Trigger.ProcessingTime("2 minutes"))
  .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
  .option("checkpointLocation", checkPoint)
  .outputMode("update")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .queryName("events_kafka_stream")
  .start()

我可以更改触发时间以匹配窗口,但仍有一些事件会立即推送到 kafka。

有没有办法在窗口完成后延迟 x 分钟写入 Kafka。

谢谢

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming amazon-kinesis


    【解决方案1】:

    将您的输出模式从update 更改为append(默认选项)。 output 模式会将所有更新的行写入接收器,因此,是否使用水印无关紧要。

    但是,在 append 模式下,任何写入都需要等到水印被越过 - 这正是您想要的:

    追加模式使用水印删除旧的聚合状态。但是窗口聚合的输出延迟了 withWatermark() 中指定的延迟阈值,因为模式语义,行在最终确定后(即越过水印后)只能添加到结果表一次。

    【讨论】:

    • @JeensonEphraim 乐于助人:)
    猜你喜欢
    • 2017-11-04
    • 2016-01-16
    • 2019-07-12
    • 1970-01-01
    • 2019-07-26
    • 1970-01-01
    • 2018-09-26
    • 1970-01-01
    • 2019-08-08
    相关资源
    最近更新 更多