【问题标题】:How to have Nested Map RDD's in Spark如何在 Spark 中拥有嵌套的 Map RDD
【发布时间】:2018-04-28 13:09:54
【问题描述】:

我有一个文本文件,例如:-

ID,Hour,Ratio
100775,0.0,1.0
100775,1.0,1.0560344797302321
100775,2.0,1.1333317975785973
100775,3.0,1.1886133302168074
100776,4.0,1.2824427440125867

我想要一个像MAP{Hour,MAP{ID,Ratio}} 这样的结构来存储为RDD。我能找到的最接近的结构是 JavaPairRDD。我尝试实现像JavaPairRDD{Hour,MAP{ID,Ratio}} 这样的结构,但是,这个结构提供lookup() 功能,它返回LIST{MAP{ID,RATIO}},这并不能解决我的用例,因为我基本上想要这样做

ratio = MAP.get(Hour).get(ID)

关于如何最好地完成这项工作的任何指示。

更新:-

在拉梅什的回答之后,我尝试了以下方法:-

JavaRDD<Map<String,Map<String,String>>> mapRDD =  data.map(line -> line.split(",")).map(array-> Collections
              .singletonMap(array[0],
                Collections
                .singletonMap
                (array[1],array[2])));

但是,这里没有类似 lookup() 的功能,对吗?

【问题讨论】:

    标签: java scala apache-spark collections rdd


    【解决方案1】:

    在 spark 中使用数据集是一个常见问题。通常有一个数据集,其中的每一行都包含一些样本,每一列代表每个样本的一个特征。 但是针对常见问题的常见解决方案是定义一个实体来支持每列作为其属性,并且每个样本都是一个 RDD 对象。 要访问 rdd 中的每个对象,可以使用 javapairrdd 并设置例如在这个例子中 HOUR 作为它的键,结果会是这样的:

       Javapairrdd<INTEGER,Entity>
    

    【讨论】:

      【解决方案2】:

      对于我的用例,我决定采用以下方法:-

      我创建了一个 JavaPairRDD{Hour,MAP{ID,Ratio}}。在任务运行的任何时候,我只需要对应于那个小时的地图。 所以我做了以下事情:-

      Map<String, Double> result = new HashMap<>();
       javaRDDPair.lookup(HOUR).stream().forEach(map ->{
                  result.putAll(map.entrySet().stream().collect(Collectors.toMap(entry-> entry.getKey(), entry-> entry.getValue())));
              });
      

      这现在可以进一步用作广播变量。

      【讨论】:

        【解决方案3】:

        你可以这样做

        scala> val rdd = sc.textFile("path to the csv file")
        rdd: org.apache.spark.rdd.RDD[String] = path to csv file MapPartitionsRDD[7] at textFile at <console>:24
        
        scala> val maps = rdd.map(line => line.split(",")).map(array => (array(1), Map(array(0) -> array(2)))).collectAsMap()
        maps: scala.collection.Map[String,scala.collection.immutable.Map[String,String]] = Map(1.0 -> Map(100775 -> 1.0560344797302321), 4.0 -> Map(100776 -> 1.2824427440125867), 0.0 -> Map(100775 -> 1.0), 3.0 -> Map(100775 -> 1.1886133302168074), 2.0 -> Map(100775 -> 1.1333317975785973))
        

        如果您需要RDD[Map[String, Map[String, String]]],那么您可以执行以下操作。

        scala> val rddMaps = rdd.map(line => line.split(",")).map(array => Map(array(1) -> Map(array(0) -> array(2)))).collect
        rddMaps: Array[scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,String]]] = Array(Map(0.0 -> Map(100775 -> 1.0)), Map(1.0 -> Map(100775 -> 1.0560344797302321)), Map(2.0 -> Map(100775 -> 1.1333317975785973)), Map(3.0 -> Map(100775 -> 1.1886133302168074)), Map(4.0 -> Map(100776 -> 1.2824427440125867)))
        

        希望回答对你有帮助

        【讨论】:

        • 这有帮助,但这会返回一个地图,但是我想返回一个 RDD。
        • 我也包含了 rdd 答案。 :)
        • 你很困惑,因为我使用了收集。如果你不使用 collect 那么你会得到 rdd 为scala&gt; val rddMaps = rdd.map(line =&gt; line.split(",")).map(array =&gt; Map(array(1) -&gt; Map(array(0) -&gt; array(2)))) rddMaps: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,String]]] = MapPartitionsRDD[17] at map at &lt;console&gt;:26
        • 是的,我只这样做了(您可以在问题中找到代码。),但是我想知道是否有类似于 JavaPairRDD 中可用的lookup() 功能。跨度>
        • 查找功能是什么意思?
        猜你喜欢
        • 2014-07-12
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-12-29
        • 2018-04-08
        • 2020-06-24
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多