【问题标题】:counting rows of a dataframe with condition in spark在火花中计算具有条件的数据帧的行
【发布时间】:2015-11-11 11:10:52
【问题描述】:

我正在尝试这个:

df=dfFromJson:
{"class":"name 1","stream":"science"}
{"class":"name 1","stream":"arts"}
{"class":"name 1","stream":"science"}
{"class":"name 1","stream":"law"}
{"class":"name 1","stream":"law"}
{"class":"name 2","stream":"science"}
{"class":"name 2","stream":"arts"}
{"class":"name 2","stream":"law"}
{"class":"name 2","stream":"science"}
{"class":"name 2","stream":"arts"}
{"class":"name 2","stream":"law"}


df.groupBy("class").agg(count(col("stream")==="science") as "stream_science", count(col("stream")==="arts") as "stream_arts", count(col("stream")==="law") as "stream_law")

这没有给出预期的输出,我怎样才能以最快的方式实现它?

【问题讨论】:

    标签: json scala apache-spark dataframe apache-spark-sql


    【解决方案1】:

    目前还不清楚预期的输出是什么,但我猜你想要这样的东西:

    import org.apache.spark.sql.functions.{count, col, when}
    
    val streams = df.select($"stream").distinct.collect.map(_.getString(0))
    val exprs = streams.map(s => count(when($"stream" === s, 1)).alias(s"stream_$s"))
    
    df
      .groupBy("class")
      .agg(exprs.head, exprs.tail: _*)
    
    // +------+--------------+----------+-----------+
    // | class|stream_science|stream_law|stream_arts|
    // +------+--------------+----------+-----------+
    // |name 1|             2|         2|          1|
    // |name 2|             2|         2|          2|
    // +------+--------------+----------+-----------+
    

    如果您不关心名称并且只有一个组列,您可以简单地使用DataFrameStatFunctions.crosstab

    df.stat.crosstab("class", "stream")
    
    // +------------+---+----+-------+
    // |class_stream|law|arts|science|
    // +------------+---+----+-------+
    // |      name 1|  2|   1|      2|
    // |      name 2|  2|   2|      2|
    // +------------+---+----+-------+
    

    【讨论】:

      【解决方案2】:

      您可以只按两列分组,而不是按单列分组然后过滤。因为我对Scala不够流利,下面是Python中的sn-p代码。请注意,我已将您的 col 名称从“stream”和“class”更改为“dept”和“name”,以避免与 Spark 的“stream”和“class”类型发生名称冲突。

      import pyspark.sql
      from pyspark.sql import Row
      
      hc = HiveContext(sc)
      
      obj = [
          {"class":"name 1","stream":"science"},
          {"class":"name 1","stream":"arts"}
          {"class":"name 1","stream":"science"},
          {"class":"name 1","stream":"law"},
          {"class":"name 1","stream":"law"},
          {"class":"name 2","stream":"science"},
          {"class":"name 2","stream":"arts"},
          {"class":"name 2","stream":"law"},
          {"class":"name 2","stream":"science"},
          {"class":"name 2","stream":"arts"},
          {"class":"name 2","stream":"law"}
      ]
      rdd = sc.parallelize(obj).map(labmda i: Row(dept=i['stream'], name=i['class']))
      df = hc.createDataFrame(rdd)
      df.groupby(df.dept, df.name).count().collect()
      

      这将导致以下输出 -

      [
          Row(dept='science', name='name 1', count=2), 
          Row(dept='science', name='name 2', count=2), 
          Row(dept='arts', name='name 1', count=1), 
          Row(dept='arts', name='name 2', count=2), 
          Row(dept='law', name='name 1', count=2), 
          Row(dept='law', name='name 2', count=2)
      ]
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2020-08-09
        • 2018-02-15
        • 2016-06-23
        • 2023-03-27
        • 2018-02-12
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多