【发布时间】:2020-02-18 13:35:16
【问题描述】:
我有一个用例,我想处理来自 kafka 的特定偏移集并存储在 cassandra 中并维护检查点,以便在发生故障时,我可以从检查点重新启动应用程序。因为它不是流式应用程序 -
val startingOffsets = """{"topic_name": { "0": 33190, "1": 557900, "2": -2} }"""
val endingOffsets = """{"topic_name": { "0": 33495, "1": 559905, "2": -1} }"""
val df = sparkSession
.read
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", "kafka.brokers".getConfigValue)
.option("subscribe", "kafka.devicelocationdatatopic".getConfigValue)
.option("startingOffsets", "kafka.startingOffsets".getConfigValue)
.option("endingOffsets", "kafka.endingOffsets".getConfigValue)
.option("failOnDataLoss", "false") // any failure regarding data loss in topic or else, not supposed to fail, it has to continue...
.option("maxOffsetsPerTrigger", "3") // any change please remove the checkpoint folder
.load()
而写方法是-
df
.write
.cassandraFormat(
"tbl_name",
"cassandra.keyspace".getConfigValue,
"cassandra.clustername".getConfigValue )
.mode(SaveMode.Append)
.option("checkpointLocation", checkpointDirectory)
.save()
我试过这个选项不起作用-
.option("checkpointLocation", checkpointDirectory)
在检查时我发现了一些将检查点目录创建为的博客 -
sc.setCheckpointDir("/batchProcessKafka")
&写作保持设置为-
df.checkpoint(true)
但这保存了整个 RDD,而我只想跟踪我的 kafka 偏移量。有什么建议吗?
【问题讨论】:
-
.option("checkpointLocation", checkpointDirectory)向 checkpointDirectory 中写入了什么? -
它根本不工作。在像 writestream 这样的流式传输的情况下,它曾经像魅力一样工作。我的 kafka 偏移量被保存并正常工作,但是通过批量查询,我无法使用此选项维护偏移量。我不知道为什么
-
哦,如果您没有使用火花流处理信息,您正在尝试恢复?是的检查点不起作用。您应该在火花流中依赖 kafka。
-
另外,在你的代码中,不应该像
ssc.checkpoint(checkpointDirectory)吗?在我们的用例中,我们依靠 Kafka 和/或数据库系统来管理偏移量。 -
嗨 Piyush,我的代码看起来像流式传输示例吗?可能我的解释有些滞后。所以,实际上它不是流媒体应用程序。它是一个批处理,其中我有特定的偏移范围仅用于处理而不是常规流,因此,我不认为在这种情况下我们需要 ssc,它只是一个流上下文 & 只是为了更加确定我们想要将偏移保存在HDFS,无论如何,即使我按照你的方法依赖。每当我启动应用程序时,它都会从起始偏移量开始消耗。
标签: apache-spark cassandra apache-kafka spark-streaming