【问题标题】:Spark RDD.forEach re-initializing external objectSpark RDD.forEach 重新初始化外部对象
【发布时间】:2017-09-11 08:33:12
【问题描述】:

我可能错过了一些基本的 Spark 概念。我正在尝试将整数的 RDD 转换为逗号分隔的字符串。目前我通过将 RDD 收集为 List 并使用它的 Iterator 来做到这一点。但是,在分析 JVM 时,似乎它将所有工作都集中在一个看起来效率不高的线程中。因此,我试图在 RDD 本身上调用 forEach 方法,但它的行为很奇怪。下面是我的单元测试

JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3));

        StringBuilder sb = new StringBuilder("");

        rdd.foreach(t->{

            System.out.println(String.valueOf(t));

            if(sb.length() > 0)
                sb.append(",");         
            sb.append(String.valueOf(t));

            System.out.println(sb); 

        });

        System.out.println(sb.length());

输出:

1
3
2
2
3
1
0

显然 StringBuilder 在每次调用时都会重新实例化。还有其他方法吗?

【问题讨论】:

标签: apache-spark rdd


【解决方案1】:

您也可以使用 mapPartitions 来做到这一点。因此,对于每个分区,您将并行工作,然后在最后将它们收集在一起。

val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7),5) // This will have six number of partitions

val rdd3 = rdd1.mapPartitions(x => {
   val str = x.mkString(",")
  List(str).iterator
}) // Here we are creating a comma separated string for each partitions only if it has some elements in it

val test1 = rdd3.collect.filterNot(x => {
  x.equals("")
}) // filterNot is required as the number of partitions can be more than the number of elements in the sequence( based on spark.default.parallelism property). So some partitions with no elements will generate "" strings. 

对于 Java,您可以尝试以下代码 -

JavaRDD<Integer> rdd1 = jsc.parallelize(list);  

JavaRDD<String> collection = rdd1.mapPartitions((Iterator<Integer> iter) -> {
        ArrayList<String> out = new ArrayList<String>();
        StringBuffer strbf = new StringBuffer("");

        while(iter.hasNext()) {
            Integer current = iter.next();
            strbf.append(current);
        }

        out.add(strbf.toString());
        return out.iterator();
    });

StringBuffer strbfFinal = new StringBuffer("");

    collection.collect().forEach(item -> {
        if(!"".equals(item)){
            strbfFinal.append(item);
        }
    });

StringBuffer 有你附加的数字列表。

【讨论】:

    【解决方案2】:

    由于 forEach 确实在 spark 中返回 Unit/void,因此您需要中继一些集中的东西。在这种情况下,我们可以想到accumulators。累加器用于数值,因此我们需要构建自己的 String 累加器。

    import org.apache.spark.AccumulatorParam
        object StringAccumulator extends AccumulatorParam[String] {
    
          def addInPlace(accum: String, current: String): String = {
            s"accum $current"
          }
    
          def zero(initialValue: String): String = {
            ""
          }
        }
    

    然后使用累加器来收集你的价值。

    val sc = prepareConfig()
    
    val acc = sc.accumulator("")(StringAccumulator)
    
    val baseRDD = sc.parallelize(Seq(1, 2, 3))
    
    baseRDD.foreach { x => acc.++=(x.toString()) }
    println(acc.value)
    

    结果: 1 2 3

    Scala 中的解决方案。

    【讨论】:

      猜你喜欢
      • 2020-10-13
      • 1970-01-01
      • 2019-10-14
      • 1970-01-01
      • 1970-01-01
      • 2012-01-17
      • 1970-01-01
      • 1970-01-01
      • 2013-04-05
      相关资源
      最近更新 更多