【问题标题】:Spark Streaming - Kinesis - JavaSpark Streaming - Kinesis - Java
【发布时间】:2020-09-09 00:52:07
【问题描述】:

是否可以在 Spark Streaming 中每批记录只处理或触发一次方法/动作?

我的用例是每个 DStream 批次调用一次 loadConfigurations(),即使有 1 到 n 条记录。加载的配置应在驱动程序处可用以进行进一步处理。

例如:

batch-1: 运动流中的 0 条记录 - 没有触发 加载配置()

batch-2: kinesis 流中的 1 条记录 - loadConfiguration() 调用一次 以及在驱动程序级别更新的变量

batch-3:运动流中的 100 条记录 - 调用 loadConfiguration() 一次并在驱动程序级别更新变量

提前致谢。

【问题讨论】:

  • 不使用结构化流的任何具体原因?另外,为什么不在驱动程序中预先加载配置,然后使用广播变量进行广播?每个批次的配置是否不断变化?
  • 用例具有复杂的计算而不是常规的 ETL。当前配置在驱动程序级别加载并广播一次。但是需要在有新配置更新时动态更改它。注意:处理记录是不同的流输入,配置触发事件会监听不同的流。因此,对于配置输入流,即使给定批次中有 100 个事件,我也只想更新一次配置。 (希望每批更新一次的原因是可以理解的)。

标签: java apache-spark lazy-loading spark-streaming amazon-kinesis


【解决方案1】:

不太确定我是否理解了确切的要求。但是,根据问题描述和您在 cmets 中的解释,这可能会起作用:

dstream.foreachRDD { rdd =>
  val config = loadConfiguration() //  executed at the driver
  rdd.foreach { record =>
   // do stuff here. e.g. config.get(). This code is executed at the worker.
  }
}

这里要注意的重要一点是Config 类必须是可序列化的,因为它将从驱动程序发送给工作人员。

另外,请注意,根据您的用例,这可能是一种反模式。例如对于每个批次,配置对象将被序列化并发送给工作人员,这将根据配置对象的大小增加网络开销。

我强烈建议检查forEachRDD 构造的推荐设计模式,并明智地选择您的方法。这是相同的链接:https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

【讨论】:

  • 我不需要每次处理 dstream 时都发生 loadConfiguration()。这应该在侦听来自其他流的事件时按需发生。要求:input stream => process records ;&; config stream => load configuration once per batch if any records (events come to this stream represents a configurations refresh needed). 但是,如果每批有超过 1 条记录进入此配置流,则执行 dstream.foreachRDD 或 foreach 记录会使配置重新加载多次,这不需要。所以每批配置流记录只需要更新一次配置。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-04-13
  • 1970-01-01
  • 1970-01-01
  • 2017-03-18
  • 1970-01-01
相关资源
最近更新 更多