【问题标题】:groupBykey in sparkspark中的groupBykey
【发布时间】:2015-08-14 22:38:05
【问题描述】:

这里是 spark 的新手,我正在尝试读取 spark 中的管道分隔文件。我的文件如下所示:

user1|acct01|A|Fairfax|VA
user1|acct02|B|Gettysburg|PA
user1|acct03|C|York|PA
user2|acct21|A|Reston|VA
user2|acct42|C|Fairfax|VA
user3|acct66|A|Reston|VA

我在 scala 中执行以下操作:

scala> case class Accounts (usr: String, acct: String, prodCd: String, city: String, state: String)
defined class Accounts

scala> val accts = sc.textFile("accts.csv").map(_.split("|")).map(
     | a => (a(0), Accounts(a(0), a(1), a(2), a(3), a(4)))
     | )

然后我尝试按键对键值对进行分组,这不确定我这样做是否正确......我是这样做的吗?

scala> accts.groupByKey(2)
res0: org.apache.spark.rdd.RDD[(String, Iterable[Accounts])] = ShuffledRDD[4] at groupByKey at <console>:26

我认为 (2) 是为了将前两个结果返回给我,但我似乎在控制台上没有得到任何返回...

如果我运行一个独特的......我也会得到这个......

scala> accts.distinct(1).collect(1)
<console>:26: error: type mismatch;
 found   : Int(1)
 required: PartialFunction[(String, Accounts),?]
              accts.distinct(1).collect(1)

编辑: 本质上,我试图获得一个键值对嵌套映射。例如,user1 看起来像这样:

user1 | {'acct01': {prdCd: 'A', city: 'Fairfax', state: 'VA'}, 'acct02': {prdCd: 'B', city: 'Gettysburg', state: 'PA'}, 'acct03': {prdCd: 'C', city: 'York', state: 'PA'}}

试图一步一步地学习这个,所以我想我会把它分解成块来理解......

【问题讨论】:

  • 为了限制结果的数量,你会想使用take(int)

标签: scala apache-spark


【解决方案1】:

如果您已经完成了定义模式的过程,我认为如果您将数据放入 DataFrame 中,您可能会有更好的运气。首先,您需要修改拆分注释以使用单引号。 (见this question)。此外,您可以在一开始就摆脱a(0)。然后,转换为 DataFrame 是微不足道的。 (请注意,DataFrames 在 spark 1.3+ 上可用。)

val accts = sc.textFile("/tmp/accts.csv").map(_.split('|')).map(a => Accounts(a(0), a(1), a(2), a(3), a(4)))
val df = accts.toDF()

现在df.show 产生:

+-----+------+------+----------+-----+
|  usr|  acct|prodCd|      city|state|
+-----+------+------+----------+-----+
|user1|acct01|     A|   Fairfax|   VA|
|user1|acct02|     B|Gettysburg|   PA|
|user1|acct03|     C|      York|   PA|
|user2|acct21|     A|    Reston|   VA|
|user2|acct42|     C|   Fairfax|   VA|
|user3|acct66|     A|    Reston|   VA|
+-----+------+------+----------+-----+

您应该更容易处理数据。例如,要获取唯一用户列表:

df.select("usr").distinct.collect()

生产

res42: Array[org.apache.spark.sql.Row] = Array([user1], [user2], [user3])

更多详情,请查看docs

【讨论】:

  • 所以我浏览了这些文档,但如何让 groupBy 工作?我尝试了 df.groupBy("usr").collect()、df.select()、groupby().collect 和其他各种方法,但无法让它工作......
  • 你想做什么?我认为groupBy 运算符的作用可能有些混乱。您需要应用某种聚合,以便 Spark 知道如何聚合与给定键对应的所有记录。例如,df.groupBy("usr").count().collect() 将为您提供与每个不同用户对应的记录数。您可以查看API docs,了解可以使用哪些函数来聚合记录。
  • 刚刚进行了编辑以显示最终目标...基本上是尝试获取 csv 文件并将其转换为键值对映射...
【解决方案2】:

3 个观察可以帮助您理解问题:

1) groupByKey(2) 不返回前 2 个结果,参数 2 用作结果 RDD 的分区数。见docs

2) collect 不带 Int 参数。见docs

3) split 有两种类型的参数,CharString。字符串版本使用正则表达式,所以 "|" 需要转义,如果打算作为文字。

【讨论】:

  • 我想我仍然对此感到困惑......我看到了 groupByKey(K, V) 和 groupByKey(Partition)。我如何简单地将结果拉回列表中。我想我必须使用 groupByKey,对吗?但是我是使用分区还是 K、V?
猜你喜欢
  • 1970-01-01
  • 2017-07-06
  • 2015-02-08
  • 2015-09-10
  • 2015-02-16
  • 2017-04-25
  • 1970-01-01
  • 2016-03-18
  • 1970-01-01
相关资源
最近更新 更多