【问题标题】:Old Kafka Offset consuming by Spark Structured Streaming after clearing Checkpointing location清除检查点位置后 Spark Structured Streaming 消耗的旧 Kafka 偏移量
【发布时间】:2019-06-11 11:59:41
【问题描述】:

我已经使用 Apache Kafka 和 Apache Spark 结构化流构建了一个应用程序。我面临以下问题。

场景

  • 我使用 Kafka 主题源设置了 Spark 结构化流,并 sink 作为 Kafka 主题。
  • 我们在 Kafka 上运行流并生成大量消息 主题。
  • 我们通过清除检查点停止了流并重新启动流 流的位置。运行 5 到 6 小时后,流是 随机消费旧的 Kafka 消息。

清除检查点位置后,我期待的只有新消息。
星火版本:2.4.0, 卡夫卡客户端版本:2.0.0, 卡夫卡版本:2.0.0, 集群管理器:Kubernetes。

我已经通过更改检查点位置尝试了这种情况,但问题仍然存在。

{
SparkConf sparkConf = new SparkConf().setAppName("SparkKafkaConsumer");
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> stream = spark
        .readStream()
        .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option(subscribeType, "REQUEST_TOPIC")
            .option("failOnDataLoss",false)
            .option("maxOffsetsPerTrigger","50")
            .option("startingOffsets","latest")
            .load()
            .selectExpr(
                  "CAST(value AS STRING) as payload",
                  "CAST(key AS STRING)",
                  "CAST(topic AS STRING)",
                  "CAST(partition AS STRING)",
                  "CAST(offset AS STRING)",
                  "CAST(timestamp AS STRING)",
                  "CAST(timestampType AS STRING)");

DataStreamWriter<String>  dataWriterStream = stream
            .writeStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("kafka.max.request.size", "35000000")
            .option("kafka.retries", "5")
            .option("kafka.batch.size", "35000000")
            .option("kafka.receive.buffer.bytes", "200000000")
            .option("kafka.acks","0")
            .option("kafka.compression.type", "snappy")
            .option("kafka.linger.ms", "0")
            .option("kafka.buffer.memory", "50000000")
            .option("topic", "RESPONSE_TOPIC")
            .outputMode("append")
            .option("checkpointLocation", checkPointDirectory);
spark.streams().awaitAnyTermination();

}

【问题讨论】:

标签: java apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

查看以下链接,

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-checkpointing.html

您调用 SparkContext.setCheckpointDir(directory: String) 来设置检查点目录 - RDD 被检查点的目录。如果在集群上运行,该目录必须是 HDFS 路径。原因是驱动可能会尝试从自己的本地文件系统中重建checkpointed RDD,这是不正确的,因为checkpoint文件实际上是在executor机器上

【讨论】:

    猜你喜欢
    • 2021-05-22
    • 2020-09-03
    • 2019-07-30
    • 2018-04-26
    • 2022-01-14
    • 2017-12-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多