【问题标题】:Best way to have nested tuples or nested columns in the Spark and filter by or group by nested column在 Spark 中嵌套元组或嵌套列并按嵌套列过滤或分组的最佳方式
【发布时间】:2017-05-16 14:18:11
【问题描述】:

我在按嵌套列分组时遇到问题

我的应用程序 scala 版本是 2.11.7,这是我的 sbt 依赖项

libraryDependencies ++= {
  val akkaVersion = "2.4.10"
  val sparkVersion = "2.1.1"

  Seq(
    "com.typesafe.akka" %% "akka-actor"                           % akkaVersion,
    "com.typesafe"      %  "config"                               % "1.3.0" ,
    "org.apache.spark"  %%  "spark-core"                          % sparkVersion,
    "org.apache.spark"  %%  "spark-sql"                           % sparkVersion,
    "com.typesafe.akka" %% "akka-slf4j"                           % akkaVersion,
    "org.apache.spark"  %% "spark-streaming"                      % sparkVersion
  )
}

这是我的示例数据(1 行)

124567893|254887452|52448796|2017-02-22 00:00:02|1|4|0014551233548|N|0|0|2||2|44|4||1|1|||2|-1||1|USD|||1457784114521||7|[1~26.927900~0.390200][4~0.000000~0.000000][8~0.000000~0.000000][9~0.000000~0.000000][11~0.000000~0.000000][12~0.000000~0.000000][13~0.000000~0.000000][71~0.000000~0.000000][91~0.000000~0.000000][111~0.000000~0.000000][131~0.000000~0.000000][251~0.000000~0.000000][311~0.000000~0.000000][331~0.000000~0.000000][451~0.000000~0.000000][3~0.000000~0.000000]|[323~4517.702200~0.390200][384~5310.000000~0.000000][443~4296.000000~0.000000][463~0.000000~0.000000][1024~10.535400~0.390200][1343~57.980000~0.000000][783~0.000000~0.000000][303~0.000000~0.000000][403~10.535400~0.390200][523~13790.000000~0.000000][1143~0.000000~0.000000][763~0.000000~0.000000]|

这是我的映射器

case class SampleMap(
                   id: Long, //1
                   a_id_1: Long, //2
                   b_id_2: Long, //3
                   date_time: String, //4
                   subscriber_type: Int, //5
                   x_type: Int, //6
                   sub_id_2: String, //7
                   account_type: Int, //11
                   master_sub_id: String, //12
                   application_id: Int, //13
                   sup_type_id: Int, //14
                   unit_type_id: Int, //15
                   usage_amount: Long, //16
                   type_of_charge: String, //17
                   identity_id: Int, //18
                   group_id: String, //19
                   charge_code: String, //20
                   content_type: Int, //21
                   fund_usage_type: Int, //24
                   msc_id: String, //28
                   circle_id: Int, //29
                   sp_id: Int, //30
                   balance: List[(Int, Double, Double)], //31
                   z_info: List[(Int, Double, Double] //33

                 )

我已经编写了过滤和映射的代码

 private def mappingSparkLoadedSMSData(sparkRdd:Dataset[String]): Dataset[SMSMap] = {

    import SparkFactory.spark.implicits._
    sparkRdd
      .map(_.split("\\|",-1))
      .filter(_.length==33)       //adding last empty string
      .map(
      data =>
        SMSMap(

          {if(data(0).nonEmpty) data(0).toLong else 0 },
          {if(data(1).nonEmpty) data(1).toLong else 0 },
          {if(data(2).nonEmpty) data(2).toLong else 0 },
          data(3),
          {if(data(4).nonEmpty) data(4).toInt else 0 },
          {if(data(5).nonEmpty) data(5).toInt else 0 },
          data(6),
          {if(data(10).nonEmpty) data(10).toInt else 0 },
          data(11),
          {if(data(12).nonEmpty) data(12).toInt else 0 },
          {if(data(13).nonEmpty) data(13).toInt else 0 },
          {if(data(14).nonEmpty) data(14).toInt else 0 },
          {if(data(15).nonEmpty) data(15).toLong else 0 },
          data(16),
          {if(data(17).nonEmpty) data(17).toInt else 0 },
          data(18),
          data(19),
          {if(data(20).nonEmpty) data(20).toInt else 0 },
          {if(data(23).nonEmpty) data(23).toInt else 0 },
          data(27),
          {if(data(28).nonEmpty) data(28).toInt else 0 },
          {if(data(29).nonEmpty) data(29).toInt else 0 },

          data(30)
            .drop(1)
            .dropRight(1)
            .split("\\]\\[")
            .map(_.split('~'))
            .filter(data =>  data.length > 2 && data(2).nonEmpty &&  data(2).toDouble != 0)
            .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
            .toList,

          data(31)
            .drop(1)
            .dropRight(1)
            .split("\\]\\[")
            .map(_.split('~'))
            .filter(data =>  data.length > 2 && data(2).nonEmpty &&  data(2).toDouble != 0)
            .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
            .toList


        )
    )
  }

然后我正在创建临时视图并尝试像这样查询

formattedRDD.createOrReplaceTempView("temp_table")  //formattedRDD is a val that stored after Mapping

spark.sql(
      s" select balance from temp_table group by balance"
    ).collectAsList()

当你看到 y_info: List[(Int, Double, Double)], //31

第一列是 bal_id (Int),第二列是 change_balance (Double),第三列是累积的 (Double),它有多个集合

现在我想按 bal_id 分组并得到 change_balance 的总和,但我不能这样做(当然不能这样做,因为每个都是值)

我的想法是将余额( balance: List[(Int, Double, Double)], //31 )分离到不同的数据集/表以及映射和分组中,但要分离,我们需要添加 auto_increment_id 或任何用于映射目的的数据集/表的唯一行标识符(注意 id 可以重复)

我真的很困惑。任何人请帮助我。在此先感谢

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    如果您将余额列分成三个不同的列,您将很容易在bal_idsum change_balance 上使用groupBy
    您可以在初始阶段创建这三个单独的列。
    这是根据我从您的问题中了解到的解决方案:

    您需要在案例类中包含三个列名:

    case class SampleMap(
                          id: Long, //1
                          a_id_1: Long, //2
                          b_id_2: Long, //3
                          date_time: String, //4
                          subscriber_type: Int, //5
                          x_type: Int, //6
                          sub_id_2: String, //7
                          account_type: Int, //11
                          master_sub_id: String, //12
                          application_id: Int, //13
                          sup_type_id: Int, //14
                          unit_type_id: Int, //15
                          usage_amount: Long, //16
                          type_of_charge: String, //17
                          identity_id: Int, //18
                          group_id: String, //19
                          charge_code: String, //20
                          content_type: Int, //21
                          fund_usage_type: Int, //24
                          msc_id: String, //28
                          circle_id: Int, //29
                          sp_id: Int, //30
                          balance: List[(Int, Double, Double)], //31
                          bal_id: Int,              //added by Ramesh
                          change_balance: Double,   //added by Ramesh
                          accumulated: Double,      //added by Ramesh
                          z_info: List[(Int, Double, Double)] //33
                        )
    

    在创建数据框/数据集时,您必须将这三个值分开以分隔列。以下是您的代码的改进版本:

    val formattedRDD = sparkRdd.map(_.split("\\|",-1))
          .filter(_.length==33)       //adding last empty string
          .map( data => {
            val balance = Try(data(30)
              .drop(1)
              .dropRight(1)
              .split("\\]\\[")
              .map(_.split('~'))
              .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
              .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
              .toList) getOrElse List((0, 0.0, 0.0))
    
            SampleMap(
              Try(data(0).toLong) getOrElse 0,
              Try(data(1).toLong) getOrElse 0,
              Try(data(2).toLong) getOrElse 0,
              Try(data(3).toString) getOrElse "",
              Try(data(4).toInt) getOrElse 0,
              Try(data(5).toInt) getOrElse 0,
              Try(data(6).toString) getOrElse "",
              0,
              Try(data(11).toString) getOrElse "",
              Try(data(12).toInt) getOrElse 0,
              Try(data(13).toInt) getOrElse 0,
              Try(data(14).toInt) getOrElse 0,
              Try(data(15).toLong) getOrElse 0,
              Try(data(16).toString) getOrElse "",
              Try(data(17).toInt) getOrElse 0,
              Try(data(18).toString) getOrElse "",
              Try(data(19).toString) getOrElse "",
              Try(data(20).toInt) getOrElse 0,
              Try(data(23).toInt) getOrElse 0,
              Try(data(27).toString) getOrElse "",
              Try(data(28).toInt) getOrElse 0,
              Try(data(29).toInt) getOrElse 0,
              balance,               //this is the 30th value i.e. balance
              balance(0)._1,         //this is bal_id
              balance(0)._2,         //this is change_balance
              balance(0)._3,         //this is accumulator
    
              Try(data(31)
                .drop(1)
                .dropRight(1)
                .split("\\]\\[")
                .map(_.split('~'))
                .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
                .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
                .toList) getOrElse List.empty
            )
          }
        )
      .toDS()
    

    现在你需要做的就是调用一个聚合器

    import org.apache.spark.sql.functions.sum
    formattedRDD.groupBy("bal_id").agg(sum("change_balance")).show
    

    我希望这是您需要的解决方案

    【讨论】:

    • 首先很抱歉回复晚了,关于关注balance(0)._1, //this is bal_id balance(0)._2, //this is change_balance balance(0)._3, //this is accumulator,您只选择了第一个元组。但列表有多个元组
    猜你喜欢
    • 2023-04-03
    • 1970-01-01
    • 1970-01-01
    • 2011-12-04
    • 2017-05-25
    • 2013-12-02
    • 1970-01-01
    • 1970-01-01
    • 2017-12-18
    相关资源
    最近更新 更多