【问题标题】:Split Spark Dataset into equal number of datasets in Spark 3.1 with java在 Spark 3.1 中使用 java 将 Spark 数据集拆分为相等数量的数据集
【发布时间】:2021-08-30 15:45:05
【问题描述】:

我有一个没有很多行的数据集,我正在该数据集中执行 collect_list 操作。我得到了像 Cannot grow BufferHolder; exceeds size limitation 这样的错误。 这是因为我的 collect_list 结果列大小超过 2GB。因此,我希望将此数据集拆分为多个数据集,并尝试对其执行相同的 collect_list 操作(以减小 col 大小)。尝试this 建议修复。我怎样才能做到这一点。

这是我的示例数据集和示例代码。

+----+----+
|col1|col2|
+----+----+
| abc|   A|
| abc|   B|
| cde|   B|
| cde|   C|
| efg|   A|
+----+----+

public static Dataset<Row> getData(){
      Dataset<Row> = myDataset;
      return myDataset.groupBy(col("col1")).agg(collect_list(col("col2")));
}

结果是

+----+-------+
|col1|col2   |
+----+-------+
| abc|[A,B]  |
| cde|[B,C]  |
| efg|[A]    |
+----+-------+

如何通过将其拆分为多个数据集来执行相同的逻辑?我在 java 中使用 spark 3.1。

谢谢

【问题讨论】:

  • 即使使用collect_set(),您是否也遇到同样的问题?

标签: java apache-spark


【解决方案1】:

您可以使用randomSplit()randomSplitAsList() 方法将一个数据集拆分为多个数据集。您可以详细阅读此方法here

上述方法将返回数据集的数组/列表,您可以迭代并执行groupByunion 以获得所需的结果。

   public static Dataset<Row> getData(Dataset<Row>[] myDataset) {

       // Start Empty dataframe with col1 as string and col2 as array to hold union result
        Dataset<Row> tempDS = SparkSession.active().emptyDataFrame().selectExpr("'' col1", "array() col2");
        
        for (Dataset<Row> ds : myDataset) {

            tempDS = tempDS.union(ds.groupBy("col1").agg(collect_list("col2").alias("col2")));

        }
        return  tempDS;
    }

【讨论】:

    猜你喜欢
    • 2017-07-27
    • 2017-11-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多