【问题标题】:How to group tuple dataset by one of its elements in Scala Spark?如何通过 Scala Spark 中的元素之一对元组数据集进行分组?
【发布时间】:2021-01-17 12:54:33
【问题描述】:

我将 parquet 文件读入元组数据集:

val dataset = spark.read.parquet("some-path").as[Tuple2[KeyClass, ValueClass]](Encoders.kryo)

我可以看到它:

import spark.implicits._
dataset.map(x => s"$x._1 : $x._2").show(false)
+----------------------------+
|value                       |
+----------------------------+
|(1, 2)                      |
|(1, 3)                      |
|(2, 3)                      |
+----------------------------+

我的 KeyClass 和 ValueCLass 实际上是与其他嵌套类的复杂类(我不能在这里发布 excect 类和 show 方法的结果,因为代码是专有的,但下面是它们的结构):

密钥类:

public class KeyClass implements WritableComparable<KeyClass> {
    private byte[] field2;
    private byte[] field3;

    ...

    public boolean equals(Object o) {...}
    public int hashCode() {...}
}

值类:

public class ValueClass implements Writable {
    private OtherClass1 field1;
    private OtherClass2 field2;
    private boolean field3;
    private Long field4;
}

我需要按元组的元素之一对其进行分组:

+----------------------------+
|value                       |
+----------------------------+
|(1, [2, 3])                 |
|(2, 3)                      |
+----------------------------+

我试过了:

val value1 = dataset.groupByKey(_._1)(Encoders.kryo)
val value2 = value1.mapValues(_._2)(Encoders.bean(classOf[ValueClass]))
val value3 = value2.mapGroups({case (key, value) => (key, value.toList)})

value1.mapGroups((a, b) => (a, b.map(_._2))).show(false)

我在使用 mapGroups 方法的行中收到以下异常的异常:

Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience

我也尝试在该方法中添加其他编码器:

 val value3 = value2.mapGroups((a, b) => (a, b.toArray))(Encoders.tuple(Encoders.bean(classOf[KeyClass]), Encoders.bean(classOf[Array[ValueClass]])))

然后我在同一方法上看到不同的异常:

Exception in thread "main" java.lang.AssertionError: assertion failed

【问题讨论】:

  • 您是否有不想将其读入数据框并使用数据框转换/groupBy 的原因?
  • 另外,您介意尝试显示每个步骤的结果吗?所以这样做 -> val value1 = dataset.groupByKey(_._1)(Encoders.kryo) 然后 value1.show(false) 因为我不认为它正在做你认为它正在做的事情。
  • @GamingFelix 我想​​了一夜之间,我会尝试读入数据框。回答你的第二条评论,我不能做 value1.show 因为 dataset.groupByKey 操作返回 KeyValueGroupedDataset 并且它没有 show 方法我尝试了以下方法: value1.mapGroups((a,b) => (a,b )).map(x => s"$x").show(false) 但它给出了与我的问题相同的错误。似乎 (a,b) => (a,b) 这会导致它,但我还不知道如何解决这个问题
  • 这是一个关于如何显示它的示例stackoverflow.com/questions/43918836/…,但是如果可以的话,我会推荐 DataFrame。网上有很多很好的例子
  • 我不能(或者至少我有麻烦)将它作为数据框读取,因为我的元组实际上由具有其他嵌套类的复杂类组成

标签: scala apache-spark


【解决方案1】:

GroupBy 已正确完成,问题在于编码器的使用。 我的自定义类是从另一个项目的遗留代码中导入的,它们没有正确的编码器,也不遵守为它们创建 bean 编码器的规则。所以我只需要为每个操作明确指定正确的编码器:

dataset
  .groupByKey(_._1)(Encoders.kryo)
  .mapGroups((a,b) => (a, b.map(_._2).toList))(Encoders.kryo)

如果我对编码器的解释有些模糊或不正确,请指出。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-02
    • 1970-01-01
    • 2018-07-19
    • 2017-05-24
    相关资源
    最近更新 更多