【发布时间】:2020-09-26 11:10:41
【问题描述】:
我的end goal 是到write out 和read 的聚合数据到新的Kafka topic 在它被处理的批次中。我关注了official documentation 和其他一些帖子,但没有运气。我会先阅读主题,执行聚合,将结果保存在另一个 Kafka 主题中,然后再次阅读主题并在控制台中打印。以下是我的代码:
package com.sparkKafka
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import scala.concurrent.duration._
object SparkKafkaTopic3 {
def main(ar: Array[String]) {
val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo5")
.option("startingOffsets", "earliest")
.load()
import spark.implicits._
df.printSchema()
val newDf = df.select($"value".cast("string"), $"timestamp").select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
val windowedCount = newDf
.withWatermark("timestamp", "40000 milliseconds")
.groupBy(
window(col("timestamp"), "20 seconds"), col("songName"))
.agg(count(col("songName")).alias("numberOfTimes"))
val outputTopic = windowedCount
.select(struct("*").cast("string").as("value")) // Added this line.
.writeStream
.format("kafka")
.option("topic", "songDemo6")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/spark_ss/")
.start()
val finalOutput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songDemo6").option("startingOffsets", "earliest")
.load()
.writeStream.format("console")
.outputMode("append").start()
spark.streams.awaitAnyTermination()
}
}
当我运行它时,最初在控制台中有一个exception
java.lang.IllegalStateException: Cannot find earliest offsets of Set(songDemo4-0). Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
另外,如果我尝试运行此代码 without 写入主题部分并再次阅读它,一切正常。
我尝试使用 consumer command 从 shell 读取主题,但没有显示任何记录。这里有什么我遗漏的吗?
下面是我的数据集:
>sid,Believer
>sid,Thunder
>sid,Stairway to heaven
>sid,Heaven
>sid,Heaven
>sid,thunder
>sid,Believer
当我运行@Srinivas 的代码并阅读新主题后,我得到如下数据:
[[2020-06-07 18:18:40, 2020-06-07 18:19:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Believer, 1]
[[2020-06-07 18:18:40, 2020-06-07 18:19:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Believer, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Stairway to heaven, 1]
[[2020-06-07 18:40:40, 2020-06-07 18:41:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Thunder, 1]
在这里,您可以看到 Believer 的窗口框架是相同的,但条目是分开的。为什么会这样?它应该是计数为 2 的单个条目,因为窗口框架是相同的
【问题讨论】:
-
你能发布一些kafka的示例输入吗??
-
sid,Believer这是输入数据。我能够执行聚合并将其显示在控制台上,但是当我尝试将其写出并再次读取时,它会失败.. -
投票结束这个问题,因为它没有提供完整的错误跟踪。
-
@GiorgosMyrianthous 没有错误。请再次阅读我的问题。
-
好吧,如果您再次阅读自己的问题可能会有所帮助。 当我运行它时,最初在控制台中会出现一个异常,提示找不到 value 属性,但代码仍在运行。
标签: apache-spark apache-kafka spark-structured-streaming