【问题标题】:scala spark reducebykey use custom fuctionscala spark reducebykey 使用自定义函数
【发布时间】:2021-08-20 08:01:07
【问题描述】:

我想使用reducebykey,但是当我尝试使用它时,它显示错误:

type miss match required Nothing

问题:如何为reducebykey 创建自定义函数?

{(键,值)}

键:字符串 值:地图

示例:

rdd = {("a", "weight"->1), ("a", "weight"->2)}
expect{("a"->3)}

def combine(x: mutable.map[string,Int],y:mutable.map[string,Int]):mutable.map[String,Int]={
    x.weight = x.weithg+y.weight
    x
}
    
rdd.reducebykey((x,y)=>combine(x,y))

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    假设您有一个RDD[(K, V)](或者更准确地说是PairRDD[K, V]),并且您想以某种方式将值与相同的key 组合起来,那么您可以使用reduceByKey,它需要一个函数(V, V) => V 并为您提供修改后的RDD[(K, V)](或PairRDD[K, V]

    在这里,您的 rdd = {("a", "weight"->1), ("a", "weight"->2)} 不是真正的 Scala,类似地,整个 combine 函数在语法和逻辑上都是错误的(它不会编译)。但我猜你所拥有的是类似于以下的东西,

    val rdd = sc.parallelize(List(
      ("a", "weight"->1),
      ("a", "weight"->2)
    ))
    

    这意味着您的 rdd 是 RDD[(String, (String, Int))]PairRDD[String, (String, Int)] 类型,这意味着 reduceByKey 想要 ((String, Int), (String, Int)) => (String, Int) 类型的函数。

    def combine(x: (String, Int), y: (String, Int])): (String, Int) =
      (x._1, x._2 + y._2)
    
    val rdd2 = rdd.reducebykey(combine)
    

    如果您的问题是其他问题,请更新问题以使用真实代码分享您的问题,以便其他人能够真正理解它。

    【讨论】:

    • 如果value是map呢,因为value不只是weight->1,是一个复杂的map
    • 您能否提供准确描述您的问题的实际 scala 代码?为什么你甚至需要在 RDD 中使用 mutable map
    • 谢谢认为我已经解决了问题,我的问题是当我使用 row.getAs("something"),但没有声明类型时,应该是 row.getAs[Map]("something ")
    • 现在你说的是Row,这意味着你实际上是在处理DataFrame而不是RDD。
    • 我的意思是当我将 DF 转移到 rdd[k,v] 对时,我没有声明 v 的类型,这就是我的 reducebykey 不能工作的原因
    猜你喜欢
    • 1970-01-01
    • 2023-03-11
    • 2016-08-26
    • 1970-01-01
    • 2016-02-25
    • 2015-12-17
    • 2023-03-23
    • 1970-01-01
    • 2014-07-19
    相关资源
    最近更新 更多