【发布时间】:2021-03-14 12:27:37
【问题描述】:
我们需要在 Kafka 源中使用 maxOffsetsPerTrigger 和结构化流中的 Trigger.Once(),但基于此 issue 似乎在 spark 3 中读取 allAvailable。在这种情况下有没有办法实现速率限制?
这是 spark 3 中的示例代码:
def options: Map[String, String] = Map(
"kafka.bootstrap.servers" -> conf.getStringSeq("bootstrapServers").mkString(","),
"subscribe" -> conf.getString("topic")
) ++
Option(conf.getLong("maxOffsetsPerTrigger")).map("maxOffsetsPerTrigger" -> _.toString)
val streamingQuery = sparkSession.readStream.format("kafka").options(options)
.load
.writeStream
.trigger(Trigger.Once)
.start()
【问题讨论】:
标签: apache-spark apache-kafka spark-structured-streaming spark-kafka-integration