【发布时间】:2021-12-03 03:47:53
【问题描述】:
我对 Spark 还很陌生,仍在学习。我遇到的比较困难的概念之一是检查点以及 Spark 如何使用它从故障中恢复。我正在使用结构化流从 Kafka 进行批量读取,并将它们作为 Parquet 文件写入 S3:
dataset
.write()
.mode(SaveMode.Append)
.option("checkpointLocation", checkpointLocation)
.partitionBy("date_hour")
.parquet(getS3PathForTopic(topicName));
检查点位置是 S3 文件系统路径。但是,随着作业的运行,我看不到检查点文件。在随后的运行中,我看到以下日志:
21/10/14 12:20:51 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0-5, groupId=spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0] Found no committed offset for partition topic-1
这表明前一次运行没有检查点任何偏移量,以便从本次运行中获取它们。所以它从最早的偏移量开始一直消耗。
如何让我的工作获得新的补偿?请注意,这是一个批处理查询,如 here 所述。
这就是我的阅读方式:
sparkSession
.read()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaProperties.bootstrapServers())
.option("subscribe", topic)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", sslConfig.truststoreLocation())
.option("kakfa.ssl.truststore.password", sslConfig.truststorePassword())
.option("kafka.ssl.keystore.location", sslConfig.keystoreLocation())
.option("kafka.ssl.keystore.password", sslConfig.keystorePassword())
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("failOnDataLoss", "true");
【问题讨论】:
标签: apache-spark spark-structured-streaming