【问题标题】:Issue in Spark + kafka IntegrationSpark + kafka 集成中的问题
【发布时间】:2020-03-11 14:32:13
【问题描述】:

我正在通过 DataSet 读取 CSV 文件,然后将该文件发送到 Kafka。 spark-submit 工作运行良好,但是当程序将文件发送到 Kafka 时,它给了我一个异常。以下是例外 -

文件流源 在 org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297) 在 org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) 引起:org.apache.spark.sql.AnalysisException:未找到必需的属性“值”; 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72) 在 scala.Option.getOrElse(Option.scala:121) 在 org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:71) 在 org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87) 在 org.apache.spark.sql.kafka010.KafkaSink.addBatch(KafkaSink.scala:38)

下面是我的代码:

System.setProperty("hadoop.home.dir", "C:\\hadoop-2.7.3\\");
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);

 SparkSession spark = SparkSession
          .builder()
          .config("spark.sql.session.timeZone", "UTC")
          .config("spark.sql.streaming.checkpointLocation", "D:\\Workspac\\checkpoint")
          .appName("StructuredStreamingAverage")
          .master("local")
          .getOrCreate();



StructType userSchema = new StructType().add("startdate", "string").add("accountname", "string").add("eventdate", "string")/*.add("u_lastlogin", "string")*//*.add("u_firstName", "string")*/;

Dataset<Row> dataset = spark.
        readStream()
        .option("header",true)
        .option("sep",",")
        .schema(userSchema)
        .csv("D:\\Workspac\\sophos");

Dataset<Row> df_DateConverted = dataset.withColumn("eventdate", from_unixtime(col("eventdate").divide(1000)).cast(DataTypes.TimestampType));



if(df_DateConverted.isStreaming()) {
    try {
        df_DateConverted
          .select("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
          .writeStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("topic", "rawEventTopic")
          .start().awaitTermination();
    } catch (StreamingQueryException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

正如异常明确所说的那样

原因:org.apache.spark.sql.AnalysisException:未找到必需的属性“值”;

所以可能是.select("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value") 代码的问题,但我不知道我应该在这里写什么。谢谢。

我试过了

    df_DateConverted
           .select(col("key").cast("string"), from_json(col("value").cast("string"),userSchema))
          .writeStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("topic", "rawEventTopic")
          .start().awaitTermination();

但面临以下异常 -

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [startdate, accountname, eventdate];;
'Project [unresolvedalias(cast('key as string), None), jsontostructs(StructField(startdate,StringType,true), StructField(accountname,StringType,true), StructField(eventdate,StringType,true), cast('value as string), Some(UTC)) AS jsontostructs(CAST(value AS STRING))#10]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5f3ddc86,csv,List(),Some(StructType(StructField(startdate,StringType,true), StructField(accountname,StringType,true), StructField(eventdate,StringType,true))),List(),None,Map(sep -> ,, header -> true, path -> D:\cybernetizWorkspace\sophos),None), FileSource[D:\cybernetizWorkspace\sophos], [startdate#0, accountname#1, eventdate#2]

df_DateConverted.printSchema(); 的输出如下 -

root
 |-- startdate: string (nullable = true)
 |-- accountname: string (nullable = true)
 |-- eventdate: timestamp (nullable = true)

【问题讨论】:

  • df_DateConverted 的目的是什么?既然它是流数据集,为什么要检查它是否在流?您可以dataset.printSchema 并将输出添加到问题中吗?谢谢。
  • @JacekLaskowski 我用 df_DateConverted.printSchema() 的输出更新了我的问题。实际上在 csv 文件中有一个包含长格式日期的列。所以我们将 df_DateConverted 数据集转换为时间戳数据类型。

标签: java apache-spark spark-structured-streaming


【解决方案1】:

正如您从 df_DateConverted 架构中看到的那样,您没有键列,因此在执行 col("key").cast("string") 时会出错

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [startdate, accountname, eventdate];;

您可以在将数据写入 kafka 时简单地删除密钥,因为key 在将数据写入 kafka 时是可选的。参考:here

在您的上述实现中,语法 "to_json(struct(*)) AS value" 是错误的,因此您在 value 上遇到错误。

你应该这样做:

df_DateConverted
          .select(to_json(struct($"startdate", $"accountname", $"eventdate")).alias("value"))
          .writeStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("topic", "rawEventTopic")
          .start().awaitTermination();

【讨论】:

    猜你喜欢
    • 2017-09-20
    • 2022-11-20
    • 1970-01-01
    • 2017-01-01
    • 2019-01-15
    • 2021-02-28
    • 2022-01-05
    • 2021-01-11
    • 1970-01-01
    相关资源
    最近更新 更多