【问题标题】:Spark store kafka offset checkpoint in non-streaming(batch read write) approach非流式(批量读写)方法中的 Spark 存储 kafka 偏移检查点
【发布时间】:2020-02-18 13:35:16
【问题描述】:

我有一个用例,我想处理来自 kafka 的特定偏移集并存储在 cassandra 中并维护检查点,以便在发生故障时,我可以从检查点重新启动应用程序。因为它不是流式应用程序 -

val startingOffsets = """{"topic_name": { "0": 33190, "1": 557900, "2": -2} }"""
val endingOffsets =  """{"topic_name": { "0": 33495, "1": 559905, "2": -1} }"""

val df = sparkSession
        .read
        .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
        .option("kafka.bootstrap.servers", "kafka.brokers".getConfigValue) 
        .option("subscribe", "kafka.devicelocationdatatopic".getConfigValue) 
        .option("startingOffsets", "kafka.startingOffsets".getConfigValue)
        .option("endingOffsets", "kafka.endingOffsets".getConfigValue)
        .option("failOnDataLoss", "false") // any failure regarding data loss in topic or else, not supposed to fail, it has to continue...
        .option("maxOffsetsPerTrigger", "3") // any change please remove the checkpoint folder
        .load()

而写方法是-

df
.write
.cassandraFormat(
"tbl_name",
"cassandra.keyspace".getConfigValue,
"cassandra.clustername".getConfigValue )
.mode(SaveMode.Append)
.option("checkpointLocation", checkpointDirectory)
.save()

我试过这个选项不起作用-

.option("checkpointLocation", checkpointDirectory)

在检查时我发现了一些将检查点目录创建为的博客 -

 sc.setCheckpointDir("/batchProcessKafka")

&写作保持设置为-

df.checkpoint(true)

但这保存了整个 RDD,而我只想跟踪我的 kafka 偏移量。有什么建议吗?

【问题讨论】:

  • .option("checkpointLocation", checkpointDirectory) 向 checkpointDirectory 中写入了什么?
  • 它根本不工作。在像 writestream 这样的流式传输的情况下,它曾经像魅力一样工作。我的 kafka 偏移量被保存并正常工作,但是通过批量查询,我无法使用此选项维护偏移量。我不知道为什么
  • 哦,如果您没有使用火花流处理信息,您正在尝试恢复?是的检查点不起作用。您应该在火花流中依赖 kafka。
  • 另外,在你的代码中,不应该像ssc.checkpoint(checkpointDirectory)吗?在我们的用例中,我们依靠 Kafka 和/或数据库系统来管理偏移量。
  • 嗨 Piyush,我的代码看起来像流式传输示例吗?可能我的解释有些滞后。所以,实际上它不是流媒体应用程序。它是一个批处理,其中我有特定的偏移范围仅用于处理而不是常规流,因此,我不认为在这种情况下我们需要 ssc,它只是一个流上下文 & 只是为了更加确定我们想要将偏移保存在HDFS,无论如何,即使我按照你的方法依赖。每当我启动应用程序时,它都会从起始偏移量开始消耗。

标签: apache-spark cassandra apache-kafka spark-streaming


【解决方案1】:

现在不确定这是否有帮助。我一直在为 Kafka 寻找类似的方法,并且认为 spark 可以选择触发一次流,就像批处理一样运行,让 spark 为您管理检查点/偏移量。 这是一个 PySpark 示例 -

df.writeStream.trigger(once=True).format("parquet").option("checkpointLocation", <your checkpoint location>).foreachBatch(<your func here>)

更多信息 - https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

【讨论】:

    猜你喜欢
    • 2021-05-01
    • 2021-12-03
    • 1970-01-01
    • 2017-09-25
    • 2016-03-05
    • 2019-09-13
    • 2016-06-01
    • 2020-02-15
    • 2019-06-11
    相关资源
    最近更新 更多