【发布时间】:2018-10-27 20:15:30
【问题描述】:
只是为了学习新的Spark结构数据流,我尝试过这样的实验,但不确定我是否对流功能做错了。
首先,我从静态开始,只使用 Spark 2.1.0 附带的简单文本 (csv) 文件:
val df = spark.read.format("csv").load(".../spark2/examples/src/main/resources/people.txt")
df.show()
而且我可以得到如此合理的输出(在 Zepplin 下)。
+-------+---+
| _c0|_c1|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
并按照示例,我只是修改了代码以读取相同的文件并提供架构
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.schema(userSchema) // Specify schema of the csv files
.format("csv")
.load(".../spark2/examples/src/main/resources/people.csv")
并且没有错误消息,所以我想将数据写入内存并使用以下代码查看结果:
val outStream = csvDF.writeStream
.format("memory")
.queryName("logs")
.start()
sql("select * from logs").show(truncate = false)
但是,没有错误消息,我一直得到“空输出”
+----+---+
|name|age|
+----+---+
+----+---+
这些代码是在 Zeppelin 0.7 下测试的,我不确定我是否在这里遗漏了什么。同时,我用 Apache Spark 2.1.0 官方网站$nc -lk 9999 尝试了这个例子,它运行得很好。
如果我做错了什么,我可以知道吗?
[修改和测试]
- 我尝试将相同的文件 people.txt 复制到 people1.csv peopele2.csv people3.csv 在一个 .../csv/ 文件夹下
-
val csvDF = spark.readStream.schema(userSchema).csv("/somewhere/csv") csvDF.groupBy("name").count().writeStream.outputMode("complete").format("console").start().awaitTermination()
我得到了这个:
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
| name|count|
+-------+-----+
|Michael| 3|
| Andy| 3|
| Justin| 3|
+-------+-----+
因此,我可能认为这不是数据 readstream() 问题...
【问题讨论】:
标签: scala csv apache-spark spark-structured-streaming