【问题标题】:Apache SPARK GroupByKey alternateApache SPARK GroupByKey 备用
【发布时间】:2015-02-08 17:28:15
【问题描述】:

我的表中有以下列 [col1,col2,key1,col3,txn_id,dw_last_updated]。在这些 txn_id 中, key1 是主键列。在我的数据集中,我可以有多个记录用于(txn_id,key)的组合。在这些记录中,我需要根据 dw_last_updated 选择最新的一条..

我正在使用这个逻辑。我一直遇到内存问题,我相信部分原因是 groupByKey()... 有更好的替代方案吗?

case class Fact(col1: Int,
  col2: Int,
  key1: String,
  col3: Int,
  txn_id: Double,
  dw_last_updated: Long)

sc.textFile(s3path).map { row =>
          val parts = row.split("\t")
          Fact(parts(0).toInt,
            parts(1).toInt,
            parts(2),
            parts(3).toInt,
            parts(4).toDouble,
            parts(5).toLong)
        }).map { t => ((t.txn_id, t.key1), t) }.groupByKey(512).map {
          case ((txn_id, key1), sequence) =>
            val newrecord = sequence.maxBy {
              case Fact_Cp(col1, col2, key1, col3, txn_id, dw_last_updated) => dw_last_updated.toLong
            }
           (newrecord.col1 + "\t" + newrecord.col2 + "\t" + newrecord.key1 +
              "\t" + newrecord.col3 + "\t" + newrecord.txn_id + "\t" + newrecord.dw_last_updated)
        }

感谢您的想法/建议...

【问题讨论】:

  • 恐怕我觉得不错。你可以尝试更多的分区。但也许你只是需要更多的机器。
  • 这还没有解决吗?

标签: scala apache-spark


【解决方案1】:

rdd.groupByKey 收集每个键的所有值,需要必要的内存来保存单个节点上键的值序列。不鼓励使用它。见[1]

鉴于我们只对每个键的 1 个值感兴趣:max(dw_last_updated),一种更节省内存的方法是使用 rdd.reduceByKey,这里的 reduce 函数是为相同的两条记录获取最大值使用该时间戳作为判别式的密钥。

rdd.reduceByKey{case (record1,record2) => max(record1, record2)}

应用于您的案例,应该如下所示:

case class Fact(...)
object Fact {
  def parse(s:String):Fact = ???
  def maxByTs(f1:Fact, f2:Fact):Fact = if (f1.dw_last_updated.toLong > f2.dw_last_updated.toLong) f1 else f2
}
val factById = sc.textFile(s3path).map{row => val fact = Fact.parse(row); ((fact.txn_id, fact.key1),fact)}
val maxFactById = factById.reduceByKey(Fact.maxByTs)

请注意,我在 Fact 伴随对象上定义了实用程序操作,以保持代码整洁。我还建议为每个转换步骤或逻辑步骤组提供命名变量。它使程序更具可读性。

【讨论】:

  • 你能详细说明你的答案吗?我如何到达 (record1,record2) ?
  • 看看添加的示例 - 但不要习惯它:-)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-08-31
  • 1970-01-01
  • 2015-09-10
  • 2017-07-06
  • 2015-02-16
  • 2017-04-25
相关资源
最近更新 更多