【问题标题】:Spark Structured Streaming Batch QuerySpark 结构化流式批量查询
【发布时间】:2021-02-06 23:38:23
【问题描述】:

我是 kafka 和 spark 结构化流媒体的新手。我想知道批处理模式下的 spark 如何知道要从哪个偏移量读取?如果我将“startingOffsets”指定为“最早的”,我只会得到最新的记录,而不是分区中的所有记录。我在 2 个不同的集群中运行相同的代码。集群 A(本地机器)获取了 6 条记录,集群 B(TST 集群 - 第一次运行)获取了 1 条记录。

 df = spark \
     .read \
     .format("kafka") \
     .option("kafka.bootstrap.servers", broker) \
     .option("subscribe", topic) \
     .option("startingOffsets", "earliest") \
     .option("endingOffsets", "latest" ) \
     .load()

我计划每天运行一次批次,我会得到从昨天到当前运行的所有记录吗?我在哪里可以看到批量查询的偏移量和提交?

【问题讨论】:

    标签: apache-spark pyspark spark-structured-streaming


    【解决方案1】:

    根据Structured Streaming + Kafka Integration Guide,您的偏移量存储在您在批处理作业的write 部分中设置的提供的检查点位置。

    如果您不删除检查点文件,作业将继续从 Kafka 中断的地方读取。如果您删除检查点文件,或者如果您是第一次运行该作业,该作业将使用基于选项 startingOffsets 的消息。

    【讨论】:

    • 谢谢迈克,我尝试添加检查点文件选项,但仍然得到不同的结果。考虑以下运行: 运行 1 -(无检查点位置)id 1、2、3、4、5 运行 2 -(连续运行,无检查点位置)id 6 运行 3 -(新检查点位置)id 7、8、9对于运行 3,因为我更改了检查点位置;我不应该看到 id 的 1 到 9 吗?
    • 这是我的代码查询的写入部分 = data.write.mode("append").option("checkpointLocation","/foo/bar/ckpt_dir").format("text" ).save("/foo/bar/output") 仔细一看,我认为它忽略了批处理模式的检查点位置
    猜你喜欢
    • 2021-12-03
    • 2018-08-11
    • 2019-01-14
    • 1970-01-01
    • 1970-01-01
    • 2018-01-18
    • 2017-05-04
    • 2019-12-25
    • 1970-01-01
    相关资源
    最近更新 更多