【发布时间】:2017-07-29 06:10:06
【问题描述】:
我使用的是 Spark 1.6.1
我有一个 DataFrame,我需要遍历它并将每一行写入 Kafka。截至目前,我正在做这样的事情:
Producer<String><String> message;
for(Row x: my_df.collect()){
kafka_message = new Producer<String><String>(topic, String.valueOf(x))
my_kafka_producer.send(kafka_message);
}
这里的问题是收集器将数据发送给驱动程序,然后推送到 kafka。鉴于我有大约 250 个执行者,我的 1 个驱动程序无法有效地处理工作量。所以,我想知道如何在我的执行程序上迭代数据框。这将需要避免执行 collect()。我发现一篇文章大致解释了如何做到这一点,但不幸的是他们的 GitHub 链接实际上已经过期,所以我找不到如何实现它。
【问题讨论】:
标签: java loops apache-spark dataframe apache-kafka