【发布时间】:2020-03-11 14:32:13
【问题描述】:
我正在通过 DataSet 读取 CSV 文件,然后将该文件发送到 Kafka。 spark-submit 工作运行良好,但是当程序将文件发送到 Kafka 时,它给了我一个异常。以下是例外 -
文件流源 在 org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297) 在 org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) 引起:org.apache.spark.sql.AnalysisException:未找到必需的属性“值”; 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72) 在 scala.Option.getOrElse(Option.scala:121) 在 org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:71) 在 org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87) 在 org.apache.spark.sql.kafka010.KafkaSink.addBatch(KafkaSink.scala:38)
下面是我的代码:
System.setProperty("hadoop.home.dir", "C:\\hadoop-2.7.3\\");
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkSession spark = SparkSession
.builder()
.config("spark.sql.session.timeZone", "UTC")
.config("spark.sql.streaming.checkpointLocation", "D:\\Workspac\\checkpoint")
.appName("StructuredStreamingAverage")
.master("local")
.getOrCreate();
StructType userSchema = new StructType().add("startdate", "string").add("accountname", "string").add("eventdate", "string")/*.add("u_lastlogin", "string")*//*.add("u_firstName", "string")*/;
Dataset<Row> dataset = spark.
readStream()
.option("header",true)
.option("sep",",")
.schema(userSchema)
.csv("D:\\Workspac\\sophos");
Dataset<Row> df_DateConverted = dataset.withColumn("eventdate", from_unixtime(col("eventdate").divide(1000)).cast(DataTypes.TimestampType));
if(df_DateConverted.isStreaming()) {
try {
df_DateConverted
.select("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rawEventTopic")
.start().awaitTermination();
} catch (StreamingQueryException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
正如异常明确所说的那样
原因:org.apache.spark.sql.AnalysisException:未找到必需的属性“值”;
所以可能是.select("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value") 代码的问题,但我不知道我应该在这里写什么。谢谢。
我试过了
df_DateConverted
.select(col("key").cast("string"), from_json(col("value").cast("string"),userSchema))
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rawEventTopic")
.start().awaitTermination();
但面临以下异常 -
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [startdate, accountname, eventdate];;
'Project [unresolvedalias(cast('key as string), None), jsontostructs(StructField(startdate,StringType,true), StructField(accountname,StringType,true), StructField(eventdate,StringType,true), cast('value as string), Some(UTC)) AS jsontostructs(CAST(value AS STRING))#10]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5f3ddc86,csv,List(),Some(StructType(StructField(startdate,StringType,true), StructField(accountname,StringType,true), StructField(eventdate,StringType,true))),List(),None,Map(sep -> ,, header -> true, path -> D:\cybernetizWorkspace\sophos),None), FileSource[D:\cybernetizWorkspace\sophos], [startdate#0, accountname#1, eventdate#2]
df_DateConverted.printSchema(); 的输出如下 -
root
|-- startdate: string (nullable = true)
|-- accountname: string (nullable = true)
|-- eventdate: timestamp (nullable = true)
【问题讨论】:
-
df_DateConverted的目的是什么?既然它是流数据集,为什么要检查它是否在流?您可以dataset.printSchema并将输出添加到问题中吗?谢谢。 -
@JacekLaskowski 我用 df_DateConverted.printSchema() 的输出更新了我的问题。实际上在 csv 文件中有一个包含长格式日期的列。所以我们将 df_DateConverted 数据集转换为时间戳数据类型。
标签: java apache-spark spark-structured-streaming