【问题标题】:Iterate Through Spark DataFrame in Java without Collect在没有收集的情况下在 Java 中迭代 Spark DataFrame
【发布时间】:2017-07-29 06:10:06
【问题描述】:

我使用的是 Spark 1.6.1

我有一个 DataFrame,我需要遍历它并将每一行写入 Kafka。截至目前,我正在做这样的事情:

Producer<String><String> message;
for(Row x: my_df.collect()){
    kafka_message = new Producer<String><String>(topic, String.valueOf(x))
    my_kafka_producer.send(kafka_message);
}

这里的问题是收集器将数据发送给驱动程序,然后推送到 kafka。鉴于我有大约 250 个执行者,我的 1 个驱动程序无法有效地处理工作量。所以,我想知道如何在我的执行程序上迭代数据框。这将需要避免执行 collect()。我发现一篇文章大致解释了如何做到这一点,但不幸的是他们的 GitHub 链接实际上已经过期,所以我找不到如何实现它。

参考文章: https://pythagoreanscript.wordpress.com/2015/05/28/iterate-through-a-spark-dataframe-using-its-partitions-in-java/comment-page-1/

【问题讨论】:

    标签: java loops apache-spark dataframe apache-kafka


    【解决方案1】:

    在 Java 中,您可以尝试以下方法。扩展AbstractFunction1

    import scala.runtime.AbstractFunction1;
    
    abstract class MyFunction1<T,R> extends AbstractFunction1<T, R> implements Serializable {
    }
    

    现在为您的 Dataframe 调用 foreachPartition,如下所示。

    import scala.collection.Iterator;
    import scala.runtime.BoxedUnit;
    
    df.foreachPartition(new MyFunction1<Iterator<Row>,BoxedUnit>(){
            @Override
            public BoxedUnit apply(Iterator<Row> rows) {
                while(rows.hasNext()){
                    //get the Row
                    Row row = rows.next();
                }
                return BoxedUnit.UNIT;
            }
        });
    

    【讨论】:

      猜你喜欢
      • 2018-03-21
      • 2021-01-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-10-31
      • 2013-02-28
      • 1970-01-01
      • 2019-03-16
      相关资源
      最近更新 更多