【发布时间】:2021-01-17 12:54:33
【问题描述】:
我将 parquet 文件读入元组数据集:
val dataset = spark.read.parquet("some-path").as[Tuple2[KeyClass, ValueClass]](Encoders.kryo)
我可以看到它:
import spark.implicits._
dataset.map(x => s"$x._1 : $x._2").show(false)
+----------------------------+
|value |
+----------------------------+
|(1, 2) |
|(1, 3) |
|(2, 3) |
+----------------------------+
我的 KeyClass 和 ValueCLass 实际上是与其他嵌套类的复杂类(我不能在这里发布 excect 类和 show 方法的结果,因为代码是专有的,但下面是它们的结构):
密钥类:
public class KeyClass implements WritableComparable<KeyClass> {
private byte[] field2;
private byte[] field3;
...
public boolean equals(Object o) {...}
public int hashCode() {...}
}
值类:
public class ValueClass implements Writable {
private OtherClass1 field1;
private OtherClass2 field2;
private boolean field3;
private Long field4;
}
我需要按元组的元素之一对其进行分组:
+----------------------------+
|value |
+----------------------------+
|(1, [2, 3]) |
|(2, 3) |
+----------------------------+
我试过了:
val value1 = dataset.groupByKey(_._1)(Encoders.kryo)
val value2 = value1.mapValues(_._2)(Encoders.bean(classOf[ValueClass]))
val value3 = value2.mapGroups({case (key, value) => (key, value.toList)})
或
value1.mapGroups((a, b) => (a, b.map(_._2))).show(false)
我在使用 mapGroups 方法的行中收到以下异常的异常:
Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
我也尝试在该方法中添加其他编码器:
val value3 = value2.mapGroups((a, b) => (a, b.toArray))(Encoders.tuple(Encoders.bean(classOf[KeyClass]), Encoders.bean(classOf[Array[ValueClass]])))
然后我在同一方法上看到不同的异常:
Exception in thread "main" java.lang.AssertionError: assertion failed
【问题讨论】:
-
您是否有不想将其读入数据框并使用数据框转换/groupBy 的原因?
-
另外,您介意尝试显示每个步骤的结果吗?所以这样做 -> val value1 = dataset.groupByKey(_._1)(Encoders.kryo) 然后 value1.show(false) 因为我不认为它正在做你认为它正在做的事情。
-
@GamingFelix 我想了一夜之间,我会尝试读入数据框。回答你的第二条评论,我不能做 value1.show 因为 dataset.groupByKey 操作返回 KeyValueGroupedDataset 并且它没有 show 方法我尝试了以下方法: value1.mapGroups((a,b) => (a,b )).map(x => s"$x").show(false) 但它给出了与我的问题相同的错误。似乎 (a,b) => (a,b) 这会导致它,但我还不知道如何解决这个问题
-
这是一个关于如何显示它的示例stackoverflow.com/questions/43918836/…,但是如果可以的话,我会推荐 DataFrame。网上有很多很好的例子
-
我不能(或者至少我有麻烦)将它作为数据框读取,因为我的元组实际上由具有其他嵌套类的复杂类组成
标签: scala apache-spark