【问题标题】:How to map fields in RDD[String] to broad cast?如何将 RDD[String] 中的字段映射到广播?
【发布时间】:2017-01-15 16:17:58
【问题描述】:

如何将特定字段从RDD[String] 获取到具有特定字段的映射List。我有一个RDD[String]org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] 每个条目都是这种格式的JSON:

{
  count: 1,
  itemId: "1122334",
  country: {
    code: {
      preferred: "USA"
    },
    name: {
      preferred: "America"
    }
  },
  states: "50",
  self: {
    otherInfo: [

    ],
    preferred: "National Parks"
  },
  Rating: 4  
}

如何获取只有itemId 作为键和self.preferred 作为值的映射列表({itemid , self.preferred}):

itemId : 1122334 self.preferred : "National Parks"
itemId : 3444444 self.preferred : "State Parks"
...

在所有节点上广播生成的地图是否有效?我需要通过进一步的计算来共享/引用这张地图。

【问题讨论】:

  • 是否有效取决于地图的大小。如果您真的希望它是一个列表(或 HashMap,这里更适合),您需要 .collect() RDD 到驱动程序,如果 RDD 太大而无法放入驱动程序的内存中,这可能无法正常工作。在这种情况下,您需要使用 RDD[(String, String)] 来保存映射,然后使用 .join() 将项目 ID 转换为首选值。

标签: scala apache-spark apache-spark-sql rdd broadcast


【解决方案1】:

你可以试试:

    val filteredMappingsList = countryMapping.filter(x=> {
    val jsonObj = new JSONObject(x)
    jsonObj.has("itemId") 

})

val finalMapping = filteredMappingsList.map(x=>{
    val jsonObj = new JSONObject(x);
    val itemId = jsonObj.get("itemId").toString()
    val preferred = jsonObj.getJSONObject("self").get("preferred").toString()
    (itemId, preferred)
}).collectAsMap

广播:

val broadcastedAsins = sc.broadcast(finalMapping)

【讨论】:

    猜你喜欢
    • 2016-04-11
    • 1970-01-01
    • 2011-02-10
    • 2016-04-18
    • 2018-07-29
    • 1970-01-01
    • 2014-03-09
    • 1970-01-01
    • 2023-04-02
    相关资源
    最近更新 更多