【问题标题】:How do I read from same starting offset in each micro batch in spark structured streaming job?如何在 spark 结构化流式作业中从每个微批次中的相同起始偏移量读取?
【发布时间】:2020-08-06 17:35:28
【问题描述】:

我正在使用 spark 结构化流。是否可以在每次批处理执行后重置 Kafka 偏移量,以便每个批处理从相同的起始偏移量读取,而不是仅读取新发现的事件?

引用 spark Kafka 集成文档herestartingOffsets 的描述

对于流式查询,这仅适用于启动新查询时, 并且恢复将始终从查询停止的地方开始。 查询期间新发现的分区将最早开始。

现在,我正在为每个批处理循环从 Kafka 内部创建一个静态数据帧,并使用格式为“速率”的虚拟流数据集。想知道是否有更好的方法来做到这一点

【问题讨论】:

  • 这种处理的用例是什么?
  • 我想从 Kafka 读取的数据是在 Kafka 中不经常更新的主数据。我想丰富这一点,并每 1 分钟将此主数据的完整快照写入持久存储。
  • 奇怪的方法恕我直言
  • 你得到这个@conetfun 的答案了吗?
  • @Akshay - 我最终使用了我在问题描述中提到的方法。使用使用 rate 和 insde 的虚拟流数据集,始终从相同的偏移量读取。

标签: apache-spark apache-kafka spark-streaming kafka-consumer-api spark-structured-streaming


【解决方案1】:

对于结构化流,可以将startingOffsets 设置为earliest,这样每次消费时都可以从最早的可用偏移量开始。以下将解决问题

.option("startingOffsets", "earliest")

但是请注意,这仅对新创建的查询有效:

startingOffsets

查询开始时的起点,"earliest" 是 从最早的偏移量,"latest",它只是从最新的 偏移量,或一个 json 字符串,指定每个偏移量的起始偏移量 主题分区。在json中,-2作为偏移量可以用来指代 最早,-1 到最新。注意:对于批量查询,latest(或者 不允许隐式地或在 json 中使用 -1)。 用于流式传输 查询,这仅适用于启动新查询时,并且 恢复将始终从查询停止的地方开始。新 查询期间发现的分区最早会开始。


或者,您也可以选择每次更改消费者组:

.option("kafka.group.id", "newGroupID")

【讨论】:

  • 我的要求是在每个微批处理执行中从相同的偏移量读取。现在在所有情况下(最早、最新、特定偏移量),第一批将从该偏移量读取,但后续批次将从前一批停止的位置读取。我在这里尝试做的是在所有后续批次中读取相同的偏移量。我在评论中提到了一种方法(使用虚拟流数据集并将实际数据作为批处理循环内的静态数据帧读取),但想知道是否有更好的方法来完成它。
猜你喜欢
  • 2019-12-25
  • 1970-01-01
  • 1970-01-01
  • 2021-02-22
  • 2020-09-19
  • 2020-07-22
  • 2019-01-14
  • 2021-12-03
  • 1970-01-01
相关资源
最近更新 更多