【问题标题】:Convert iterable to RDD将可迭代转换为 RDD
【发布时间】:2016-12-13 20:56:45
【问题描述】:

我想从 spark-streaming 保存到几个弹性搜索索引。 我创建了一对<key(index), value>,当我执行groupByKey时,结果是<key(index), Iterable<value>>的元组,但是为了使用elasticsearch-spark插件保存到elasticsearch,我需要JavaRDD<value>的值。

我知道 sparkContext.parallelize(list) 有一个选项可以从列表中创建 JavaRDD,但这只能在驱动程序上执行。

是否有其他选项可以创建可以在执行器上执行的 JavaRDD?或者我可以实现Tuple2<key(index), JavaRDD<value>> 的另一种方式,它适用于执行程序? 如果不是,我怎样才能只在驱动程序上将 Iterator 切换到 JavaRDD,并在执行程序上将插件写入 elasticsearch?

谢谢,

丹妮拉

【问题讨论】:

  • 嗯,AFAIK,groupByKey 导致JavaPairRDD<K,Iterable<V>> 仍然是rdd。因此,rdd 的任何进一步处理都在执行程序上执行,而不是在驱动程序上执行。

标签: java elasticsearch apache-spark spark-streaming elasticsearch-plugin


【解决方案1】:

我会说它必须有可能像下面这样

JavaPairRDD<Key, Iterable<Value>> pair = ...;
JavaRDD<Iterable<Value>> values = pair.map(t2 -> t2._2());
JavaRDD<Value> onlyValues = values.flatMap(it -> it);

替代方法是

JavaPairRDD<Key, Iterable<Value>> pair = ...;
JavaRDD<Key, Value> keyValues = pair.flatMapValues(v1 -> v1);
JavaRDD<Value> values = keyValues.map(t2 -> t2._2());

【讨论】:

  • 感谢 evgenii,因为我需要从 JavaPairRDD> 到 JavaRDD 在 foreachRDD 中获取 JavaRDD values = rdd.flatMap((FlatMapFunction>, String>) tuple2 -> { final List l = Lists.newArrayList(); tuple2._2().forEach(l::add); return l; });是否与同一个键有关?
  • 我可能误解了你的问题。我会编辑我的答案,希望这次会更好。
猜你喜欢
  • 2015-12-22
  • 1970-01-01
  • 2021-04-21
  • 2012-05-07
  • 2017-12-30
  • 2016-01-07
  • 2021-02-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多