【问题标题】:Is there any possible to achieve dynamic batch size in Spark Streaming?是否有可能在 Spark Streaming 中实现动态批量大小?
【发布时间】:2020-07-18 17:13:07
【问题描述】:
为了降低代码难度,我允许重启 Spark Streaming 系统以使用新的批大小,但需要保持之前的进度(允许丢失正在处理的批)。
如果我在 Spark Streaming 中使用checkpoint,它无法在应用程序重新启动时更改所有配置。
所以想通过修改源代码来实现这个功能,但是不知道从何下手。希望给点指导,告诉我困难。
【问题讨论】:
标签:
apache-spark
spark-streaming
batchsize
【解决方案1】:
由于您谈论的是批量大小,我假设您询问的是火花流而不是结构化流。
有一种方法可以以编程方式设置批处理间隔的值,请参阅this 链接以获取文档。
StreamingContext 的构造函数接受定义批处理间隔的duration 类的对象。
您可以通过在代码中硬编码来传递批处理间隔大小,这将要求您每次需要更改批处理间隔时都构建 jar 文件,相反,您可以从配置文件中获取它,这样您就不需要'不需要每次都构建代码。
注意:您必须在应用程序的配置文件中设置此属性,而不是在 spark 的配置文件中。
您可以更改批处理间隔的配置并重新启动应用程序,这不会对检查点造成任何问题。
val sparkConf: SparkConf = new SparkConf()
.setAppName("app-name")
.setMaster("app-master")
val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(config.getInt("batch-interval")))
干杯!!