【问题标题】:How to read the data from a dataframe without using collect method in Spak/Scala如何在不使用 Spark/Scala 中的 collect 方法的情况下从数据帧中读取数据
【发布时间】:2017-06-21 07:42:37
【问题描述】:

我有一个包含 200 万条记录的数据框。我想读取每条记录以进行分析。

但是当我使用dataframe.collect() 方法时,它将从本地运行驱动程序的所有节点获取数据,这会影响实现并行性。有什么解决办法吗?

我的配置是:

Cloudera:CDH 5.9.1
Cluster Nodes:5 ->each 8GB RAM
Spark:1.6
Scala:10.5

【问题讨论】:

  • 你想对每条记录做什么?
  • 我需要过滤每个唯一 ID 的数据,并对每个 ID 的 X 天数据进行分析。
  • 你需要更清楚。有很多可用的功能,您可以根据需要使用UDF和UDAF

标签: scala hadoop apache-spark hive apache-spark-sql


【解决方案1】:

如果您只需要读取该数据,则必须执行某种操作:collect 是您需要驱动程序访问它时的常用选择。但是,如果您需要将其存储在其他地方,您可以利用可用于 HDFS、JDBC 等的并行编写器。

如果您需要访问该数据以计算进一步的结果,您可以将数据保留在原处,并使用常用的组合器(mapflatMapfilter 等)对其应用函数。

但如果你需要将结果保存在本地,你别无选择,只能收集。当然,它会影响计算的并行性,但是你必须得到你想要的输出:本地存储,本地操作;分布式存储,分布式动作。

【讨论】:

    【解决方案2】:

    .collect() 是一个动作,正如您所说,它将结果作为Row 的本地集合返回给您的驱动程序。如果结果数据集的大小相对于您的系统配置来说很大,这可能是一个瓶颈。

    也就是说,您的问题缺乏意义,因为您没有提到您想对您阅读的这些数据做什么。如果只是将数据作为数据帧读取,您可以在 Spark 1.x.x 中执行类似操作

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SQLContext
    
    val conf = new SparkConf().setAppName("test").setMaster("local[2]")
    val sc = new SparkContext(conf)
    
    val sqlContext = new SQLContext(sc)  
    
    import sqlContext.implicits._
    
    val df = sqlContext.read.csv("file:///path/to/input/")
    

    使用 df 引用您的数据框。

    你需要对这个数据框做点什么。例如,代替collect(),您可以将其另存为csv,如下所示,

    df.write.csv("file:///path/to/output")
    

    这将在没有您之前面临的驱动程序开销的情况下工作。 让我知道这是否有帮助。

    【讨论】:

      猜你喜欢
      • 2020-07-22
      • 2022-12-09
      • 1970-01-01
      • 2018-04-22
      • 1970-01-01
      • 1970-01-01
      • 2019-01-24
      • 2017-10-03
      • 1970-01-01
      相关资源
      最近更新 更多