【发布时间】:2017-05-16 14:18:11
【问题描述】:
我在按嵌套列分组时遇到问题
我的应用程序 scala 版本是 2.11.7,这是我的 sbt 依赖项
libraryDependencies ++= {
val akkaVersion = "2.4.10"
val sparkVersion = "2.1.1"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe" % "config" % "1.3.0" ,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
)
}
这是我的示例数据(1 行)
124567893|254887452|52448796|2017-02-22 00:00:02|1|4|0014551233548|N|0|0|2||2|44|4||1|1|||2|-1||1|USD|||1457784114521||7|[1~26.927900~0.390200][4~0.000000~0.000000][8~0.000000~0.000000][9~0.000000~0.000000][11~0.000000~0.000000][12~0.000000~0.000000][13~0.000000~0.000000][71~0.000000~0.000000][91~0.000000~0.000000][111~0.000000~0.000000][131~0.000000~0.000000][251~0.000000~0.000000][311~0.000000~0.000000][331~0.000000~0.000000][451~0.000000~0.000000][3~0.000000~0.000000]|[323~4517.702200~0.390200][384~5310.000000~0.000000][443~4296.000000~0.000000][463~0.000000~0.000000][1024~10.535400~0.390200][1343~57.980000~0.000000][783~0.000000~0.000000][303~0.000000~0.000000][403~10.535400~0.390200][523~13790.000000~0.000000][1143~0.000000~0.000000][763~0.000000~0.000000]|
这是我的映射器
case class SampleMap(
id: Long, //1
a_id_1: Long, //2
b_id_2: Long, //3
date_time: String, //4
subscriber_type: Int, //5
x_type: Int, //6
sub_id_2: String, //7
account_type: Int, //11
master_sub_id: String, //12
application_id: Int, //13
sup_type_id: Int, //14
unit_type_id: Int, //15
usage_amount: Long, //16
type_of_charge: String, //17
identity_id: Int, //18
group_id: String, //19
charge_code: String, //20
content_type: Int, //21
fund_usage_type: Int, //24
msc_id: String, //28
circle_id: Int, //29
sp_id: Int, //30
balance: List[(Int, Double, Double)], //31
z_info: List[(Int, Double, Double] //33
)
我已经编写了过滤和映射的代码
private def mappingSparkLoadedSMSData(sparkRdd:Dataset[String]): Dataset[SMSMap] = {
import SparkFactory.spark.implicits._
sparkRdd
.map(_.split("\\|",-1))
.filter(_.length==33) //adding last empty string
.map(
data =>
SMSMap(
{if(data(0).nonEmpty) data(0).toLong else 0 },
{if(data(1).nonEmpty) data(1).toLong else 0 },
{if(data(2).nonEmpty) data(2).toLong else 0 },
data(3),
{if(data(4).nonEmpty) data(4).toInt else 0 },
{if(data(5).nonEmpty) data(5).toInt else 0 },
data(6),
{if(data(10).nonEmpty) data(10).toInt else 0 },
data(11),
{if(data(12).nonEmpty) data(12).toInt else 0 },
{if(data(13).nonEmpty) data(13).toInt else 0 },
{if(data(14).nonEmpty) data(14).toInt else 0 },
{if(data(15).nonEmpty) data(15).toLong else 0 },
data(16),
{if(data(17).nonEmpty) data(17).toInt else 0 },
data(18),
data(19),
{if(data(20).nonEmpty) data(20).toInt else 0 },
{if(data(23).nonEmpty) data(23).toInt else 0 },
data(27),
{if(data(28).nonEmpty) data(28).toInt else 0 },
{if(data(29).nonEmpty) data(29).toInt else 0 },
data(30)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList,
data(31)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList
)
)
}
然后我正在创建临时视图并尝试像这样查询
formattedRDD.createOrReplaceTempView("temp_table") //formattedRDD is a val that stored after Mapping
spark.sql(
s" select balance from temp_table group by balance"
).collectAsList()
当你看到 y_info: List[(Int, Double, Double)], //31
第一列是 bal_id (Int),第二列是 change_balance (Double),第三列是累积的 (Double),它有多个集合
现在我想按 bal_id 分组并得到 change_balance 的总和,但我不能这样做(当然不能这样做,因为每个都是值)
我的想法是将余额( balance: List[(Int, Double, Double)], //31 )分离到不同的数据集/表以及映射和分组中,但要分离,我们需要添加 auto_increment_id 或任何用于映射目的的数据集/表的唯一行标识符(注意 id 可以重复)
我真的很困惑。任何人请帮助我。在此先感谢
【问题讨论】:
标签: scala apache-spark apache-spark-sql