【问题标题】:Spark Structured Streaming Batch Read CheckpointingSpark结构化流批量读取检查点
【发布时间】:2021-12-03 03:47:53
【问题描述】:

我对 Spark 还很陌生,仍在学习。我遇到的比较困难的概念之一是检查点以及 Spark 如何使用它从故障中恢复。我正在使用结构化流从 Kafka 进行批量读取,并将它们作为 Parquet 文件写入 S3:

dataset
    .write()
    .mode(SaveMode.Append)
    .option("checkpointLocation", checkpointLocation)
    .partitionBy("date_hour")
    .parquet(getS3PathForTopic(topicName));

检查点位置是 S3 文件系统路径。但是,随着作业的运行,我看不到检查点文件。在随后的运行中,我看到以下日志:

21/10/14 12:20:51 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0-5, groupId=spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0] Found no committed offset for partition topic-1

这表明前一次运行没有检查点任何偏移量,以便从本次运行中获取它们。所以它从最早的偏移量开始一直消耗。

如何让我的工作获得新的补偿?请注意,这是一个批处理查询,如 here 所述。

这就是我的阅读方式:

             sparkSession
                .read()
                .format("kafka")
                .option("kafka.bootstrap.servers", kafkaProperties.bootstrapServers())
                .option("subscribe", topic)
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.truststore.location", sslConfig.truststoreLocation())
                .option("kakfa.ssl.truststore.password", sslConfig.truststorePassword())
                .option("kafka.ssl.keystore.location", sslConfig.keystoreLocation())
                .option("kafka.ssl.keystore.password", sslConfig.keystorePassword())
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("failOnDataLoss", "true");

【问题讨论】:

    标签: apache-spark spark-structured-streaming


    【解决方案1】:

    我不确定为什么 batch 使用 Kafka 的 Spark Structured Streaming 现在仍然存在。如果您想使用它,那么您必须编写自己的代码自己的 Offset management。请参阅指南,但解释得很糟糕。

    我会说Trigger.Once 对你来说是一个更好的用例; Offset management 由 Spark 提供,因此它不是批处理模式。

    【讨论】:

    • 谢谢,我选择了触发一次选项。
    猜你喜欢
    • 2021-02-06
    • 2018-06-22
    • 2023-03-10
    • 2017-06-19
    • 1970-01-01
    • 1970-01-01
    • 2018-03-18
    • 2019-09-13
    • 2019-01-14
    相关资源
    最近更新 更多