【发布时间】:2019-06-11 11:59:41
【问题描述】:
我已经使用 Apache Kafka 和 Apache Spark 结构化流构建了一个应用程序。我面临以下问题。
场景:
- 我使用 Kafka 主题源设置了 Spark 结构化流,并 sink 作为 Kafka 主题。
- 我们在 Kafka 上运行流并生成大量消息 主题。
- 我们通过清除检查点停止了流并重新启动流 流的位置。运行 5 到 6 小时后,流是 随机消费旧的 Kafka 消息。
清除检查点位置后,我期待的只有新消息。
星火版本:2.4.0,
卡夫卡客户端版本:2.0.0,
卡夫卡版本:2.0.0,
集群管理器:Kubernetes。
我已经通过更改检查点位置尝试了这种情况,但问题仍然存在。
{
SparkConf sparkConf = new SparkConf().setAppName("SparkKafkaConsumer");
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> stream = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option(subscribeType, "REQUEST_TOPIC")
.option("failOnDataLoss",false)
.option("maxOffsetsPerTrigger","50")
.option("startingOffsets","latest")
.load()
.selectExpr(
"CAST(value AS STRING) as payload",
"CAST(key AS STRING)",
"CAST(topic AS STRING)",
"CAST(partition AS STRING)",
"CAST(offset AS STRING)",
"CAST(timestamp AS STRING)",
"CAST(timestampType AS STRING)");
DataStreamWriter<String> dataWriterStream = stream
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.max.request.size", "35000000")
.option("kafka.retries", "5")
.option("kafka.batch.size", "35000000")
.option("kafka.receive.buffer.bytes", "200000000")
.option("kafka.acks","0")
.option("kafka.compression.type", "snappy")
.option("kafka.linger.ms", "0")
.option("kafka.buffer.memory", "50000000")
.option("topic", "RESPONSE_TOPIC")
.outputMode("append")
.option("checkpointLocation", checkPointDirectory);
spark.streams().awaitAnyTermination();
}
【问题讨论】:
-
能否附上您在 Spark 作业中使用的带有 Kafka 配置的代码?
-
由于issues.apache.org/jira/browse/SPARK-26267 而出现此问题。它被解析为 Spark 2.4.1
标签: java apache-spark apache-kafka spark-structured-streaming