【问题标题】:Unable to write results to kafka topic using spark无法使用 spark 将结果写入 kafka 主题
【发布时间】:2020-09-26 11:10:41
【问题描述】:

我的end goal 是到write outread 的聚合数据到新的Kafka topic 在它被处理的批次中。我关注了official documentation 和其他一些帖子,但没有运气。我会先阅读主题,执行聚合,将结果保存在另一个 Kafka 主题中,然后再次阅读主题并在控制台中打印。以下是我的代码:

package com.sparkKafka
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import scala.concurrent.duration._
object SparkKafkaTopic3 {
  def main(ar: Array[String]) {
    val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "songDemo5")
      .option("startingOffsets", "earliest")
      .load()

    import spark.implicits._
    df.printSchema()
    val newDf = df.select($"value".cast("string"), $"timestamp").select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
    val windowedCount = newDf
      .withWatermark("timestamp", "40000 milliseconds")
      .groupBy(
        window(col("timestamp"), "20 seconds"), col("songName"))
      .agg(count(col("songName")).alias("numberOfTimes"))


    val outputTopic = windowedCount
      .select(struct("*").cast("string").as("value")) // Added this line.
      .writeStream
      .format("kafka")
      .option("topic", "songDemo6")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("checkpointLocation", "/tmp/spark_ss/")
      .start()

    val finalOutput = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "songDemo6").option("startingOffsets", "earliest")
      .load()
      .writeStream.format("console")
      .outputMode("append").start()

    spark.streams.awaitAnyTermination()

  }
}

当我运行它时,最初在控制台中有一个exception

java.lang.IllegalStateException: Cannot find earliest offsets of Set(songDemo4-0). Some data may have been missed. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".

另外,如果我尝试运行此代码 without 写入主题部分并再次阅读它,一切正常。

我尝试使用 consumer command 从 shell 读取主题,但没有显示任何记录。这里有什么我遗漏的吗?

下面是我的数据集:

>sid,Believer
>sid,Thunder
>sid,Stairway to heaven
>sid,Heaven
>sid,Heaven
>sid,thunder
>sid,Believer    

当我运行@Srinivas 的代码并阅读新主题后,我得到如下数据:

[[2020-06-07 18:18:40, 2020-06-07 18:19:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Believer, 1]
[[2020-06-07 18:18:40, 2020-06-07 18:19:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Believer, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Stairway to heaven, 1]
[[2020-06-07 18:40:40, 2020-06-07 18:41:00], Heaven, 1]
[[2020-06-07 18:17:00, 2020-06-07 18:17:20], Thunder, 1]

在这里,您可以看到 Believer 的窗口框架是相同的,但条目是分开的。为什么会这样?它应该是计数为 2 的单个条目,因为窗口框架是相同的

【问题讨论】:

  • 你能发布一些kafka的示例输入吗??
  • sid,Believer 这是输入数据。我能够执行聚合并将其显示在控制台上,但是当我尝试将其写出并再次读取时,它会失败..
  • 投票结束这个问题,因为它没有提供完整的错误跟踪。
  • @GiorgosMyrianthous 没有错误。请再次阅读我的问题。
  • 好吧,如果您再次阅读自己的问题可能会有所帮助。 当我运行它时,最初在控制台中会出现一个异常,提示找不到 value 属性,但代码仍在运行。

标签: apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

检查下面的代码。

添加此windowedCount.select(struct("*").cast("string").as("value")) 在您向kafka 写入任何内容之前,您必须转换所有类型为string 的列,该列的别名为value

 val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
  val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "songDemo")
    .option("startingOffsets", "earliest")
    .load()

  import spark.implicits._
  df.printSchema()
  val newDf = df.select($"value".cast("string"),$"timestamp").select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
  val windowedCount = newDf
    .withWatermark("timestamp", "40000 milliseconds")
    .groupBy(
      window(col("timestamp"), "20 seconds"), col("songName"))
    .agg(count(col("songName")).alias("numberOfTimes"))


  val outputTopic = windowedCount
    .select(struct("*").cast("string").as("value")) // Added this line.
    .writeStream
    .format("kafka")
    .option("topic", "songDemoA")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("checkpointLocation", "/tmp/spark_ss/")
    .start()


  val finalOutput = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "songDemoA").option("startingOffsets", "earliest")
    .load()
    .writeStream.format("console")
    .outputMode("append").start()

  spark.streams.awaitAnyTermination()

更新 - 排序输出


val windowedCount = newDf
    .withWatermark("timestamp", "40000 milliseconds")
    .groupBy(
      window(col("timestamp"), "20 seconds"), col("songName"))
    .agg(count(col("songName")).alias("numberOfTimes"))
    .orderBy($"window.start".asc) // Add this line if you want order.

仅当您使用输出模式为complete 时,排序或排序结果才有效,任何其他值都会引发错误。

例如检查下面的代码。

val outputTopic = windowedCount
    .writeStream
    .format("console")
    .option("truncate","false")
    .outputMode("complete")
    .start()

【讨论】:

    猜你喜欢
    • 2021-12-21
    • 1970-01-01
    • 1970-01-01
    • 2019-07-12
    • 2021-10-16
    • 2017-08-12
    • 2021-08-03
    • 2021-11-20
    • 1970-01-01
    相关资源
    最近更新 更多