【问题标题】:Split Map type column with huge values into multiple rows using Scala and Spark使用 Scala 和 Spark 将具有巨大值的 Map 类型列拆分为多行
【发布时间】:2020-12-30 12:36:53
【问题描述】:

这是问题的延续:Combine value part of Tuple2 which is a map, into single map grouping by the key of Tuple2

我现在可以使用 reduceByKey 减少行数。

但是现在,在最后的DataFrame...

例如

(A, {1->100, 2->200, 3->100})
(B, {4->300, 1->500, 9->300, 11->900, 5->900, 6-> 111, 7-> 222, 8-> 333, 12-> 444, 13->555, 19->666})
(C, {6->100, 4->200, 7->100, 8->200, 5->800})

...某些行的地图列具有非常大的地图。例如对于上面的B

我正在尝试将 DF 写入 Azure Cosmos DB Core SQL。在这里,来自上述 DF 的每一行都变成了 Cosmos DB 的 1 个文档。问题是如果行大小超过 2MB,那么 Cosmos DB 会拒绝该请求。

问题:我想将包含大量地图列的行拆分为多行(使它们的大小小于 2MB)。重复的键列不是问题。

最终的结果可以是(如果我每次都超过5个元素,我就划分地图):

(A, {1->100, 2->200, 3->100})
(B, {4->300, 1->500, 9->300, 11->900, 5->900})
(B, {6-> 111, 7-> 222, 8-> 333, 12-> 444, 13->555})
(B, {19->666})
(C, {6->100, 4->200, 7->100, 8->200, 5->800})

你可能会问,上一个问题,已经分裂了,那我为什么要合并呢?原因在上一个问题中,对于 B,没有 reduceByKey,我可能有 1000 行。但是,最后我只需要 20 行,例如上面的例子。 1 行本来是理想的,但由于 Cosmos 限制,我必须创建多个文档(每个小于 2MB)。

希望我清楚。如需任何说明,请告诉我。

【问题讨论】:

  • @Lamanus - 请提出建议
  • @werner - 请提出建议

标签: scala dataframe apache-spark azure-cosmosdb


【解决方案1】:

我可以通过编写自己的自定义代码来解决这个问题,如下所示:

originalDF.rdd.reduceByKey((a, b) => a ++ b).map(row => {
  val indexedMapEntries: Map[Int, (String, String)] = row._2.zipWithIndex.map(mapWithIndex => (mapWithIndex._2, mapWithIndex._1))
  var min = 0
  var max = Math.min(indexedMapEntries.size - 1, 9999)
  var proceed = true

  var rowKeyIdToAttributesMapList: ListBuffer[(String, Map[String, String])] = new ListBuffer[(String, Map[String, String])]()

  while (proceed) {

    var tempMapToHoldEntries = Map[String, String]()
    var i = min
    while (i <= max) {
      var entry: (String, String) = indexedMapEntries.get(i).get
      tempMapToHoldEntries += entry
      i = i + 1
    }
    rowKeyIdToAttributesMapList += ((row._1, tempMapToHoldEntries))
    min = max + 1
    max = Math.min(indexedMapEntries.size - 1, max + 9999)
    if (min > (indexedMapEntries.size - 1))
      proceed = false

  }

  rowKeyIdToAttributesMapList.toList

}).flatMap(x => x).toDF("rowKeyId", "attributes")

这里,originalDF 是我上一个问题中的那个(检查 OP)。 10000rowKeyId 的每个映射的最大大小。如果地图大小超过 10000,那么我会在循环中创建一个具有相同 rowKeyId 和剩余属性的新行。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-04-24
    • 2021-10-02
    • 2019-08-10
    • 1970-01-01
    • 1970-01-01
    • 2017-07-14
    • 2019-03-11
    • 1970-01-01
    相关资源
    最近更新 更多