【问题标题】:Spark 2.1.0 structure streaming with local CSV file带有本地 CSV 文件的 Spark 2.1.0 结构流式传输
【发布时间】:2018-10-27 20:15:30
【问题描述】:

只是为了学习新的Spark结构数据流,我尝试过这样的实验,但不确定我是否对流功能做错了。

首先,我从静态开始,只使用 Spark 2.1.0 附带的简单文本 (csv) 文件:

val df = spark.read.format("csv").load(".../spark2/examples/src/main/resources/people.txt")
df.show()

而且我可以得到如此合理的输出(在 Zepplin 下)。

+-------+---+
|    _c0|_c1|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

并按照示例,我只是修改了代码以读取相同的文件并提供架构

val userSchema = new StructType().add("name", "string").add("age", "integer")

val csvDF = spark
  .readStream
  .schema(userSchema)      // Specify schema of the csv files
  .format("csv")
  .load(".../spark2/examples/src/main/resources/people.csv") 

并且没有错误消息,所以我想将数据写入内存并使用以下代码查看结果:

val outStream = csvDF.writeStream
  .format("memory")
  .queryName("logs")
  .start()

sql("select * from logs").show(truncate = false)

但是,没有错误消息,我一直得到“空输出”

+----+---+
|name|age|
+----+---+
+----+---+

这些代码是在 Zeppelin 0.7 下测试的,我不确定我是否在这里遗漏了什么。同时,我用 Apache Spark 2.1.0 官方网站$nc -lk 9999 尝试了这个例子,它运行得很好。

如果我做错了什么,我可以知道吗?

[修改和测试]

  1. 我尝试将相同的文件 people.txt 复制到 people1.csv peopele2.csv people3.csv 在一个 .../csv/ 文件夹下
  2. val csvDF = spark.readStream.schema(userSchema).csv("/somewhere/csv")
  3. csvDF.groupBy("name").count().writeStream.outputMode("complete").format("console").start().awaitTermination()

我得到了这个:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
|   name|count|
+-------+-----+
|Michael|    3|
|   Andy|    3|
| Justin|    3|
+-------+-----+

因此,我可能认为这不是数据 readstream() 问题...

【问题讨论】:

    标签: scala csv apache-spark spark-structured-streaming


    【解决方案1】:
    1. 文件名是people.txt,而不是people.csv。 Spark 会抛出一个错误,提示“路径不存在”。我刚刚用 Spark Shell 验证了一下。

    2. 输入路径应该是一个目录。使用文件没有意义,因为这是一个流式查询。

    【讨论】:

    • 欣赏。我也尝试了 people.csv 没有错误消息。但在最终输出时仍然是空的。
    • 我的意思是正确的文件名应该是people.txt。您是否碰巧创建了一个名为 people.csv 的文件夹?
    【解决方案2】:

    您的代码有 2 个不同之处: 1. 非工作的输出模式为“追加”(默认),工作的输出模式为“完成”。 2. non-working 选择没有聚合的记录,但 working 有 groupBy 聚合。

    我建议您切换到完整输出模式并进行 groupBy 计数以查看是否可以解决问题。

    【讨论】:

      猜你喜欢
      • 2018-05-27
      • 2020-07-17
      • 1970-01-01
      • 2018-01-29
      • 1970-01-01
      • 2021-05-21
      • 1970-01-01
      • 1970-01-01
      • 2020-08-11
      相关资源
      最近更新 更多