【问题标题】:spark scala type mismatch with zipwithIndex in groupbykeyspark scala 类型与 groupbykey 中的 zipwithIndex 不匹配
【发布时间】:2019-06-26 22:41:53
【问题描述】:

我正在尝试测试 groupByKey 以找到主题的第 n 高分

我的数据是这样的

scala> a
res176: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[263] at map at <console>:51

scala> a.take(10).foreach{println}
(data science,DN,US,28,98,SMITH,data science)
(maths,DN,US,28,92,SMITH,maths)
(chemistry,DN,US,28,94,SMITH,chemistry)
(physics,DN,US,28,88,SMITH,physics)
(data science,DN,UK,25,93,JOHN,data science)
(maths,DN,UK,25,91,JOHN,maths)
(chemistry,DN,UK,25,95,JOHN,chemistry)
(physics,DN,UK,25,90,JOHN,physics)
(data science,DN,CA,29,67,MARK,data science)
(maths,DN,CA,29,68,MARK,maths)

scala> 

所以对于第一行“数据科学”作为字符串是键,而“DN,US,28,98,SMITH,data science”是作为字符串的值

现在我想使用 group by 找到第二高

scala> a.groupByKey().flatMap(rec=>{ val max = rec._2.toList.map(x=>x.split(',')(3).toFloat).distinct.sortBy(x=>(-x)).zipWithIndex.filter(x=>x._2==2).toMap.keys
     | rec._2.toList.filter{x=>x.split(',')(3).toFloat==max}
     | }).take(15).foreach{println}

scala> 

这里什么都没有

如果我运行这个硬编码,我会得到价值

scala> a.groupByKey().flatMap(rec=>{ val max = "98"
     | rec._2.toList.sortBy(x=>(-x.split(',')(3).toFloat)).takeWhile(rec=> max.contains(rec.split(',')(3)))}).take(15).foreach{println}
DN,IND,26,98,XMAN,maths
DPS,US,28,98,XOMAN,chemistry
DN,US,28,98,SMITH,data science

这也给了我价值

scala> a.groupByKey().flatMap(rec=>{ rec._2.toList.map(x=>x.split(',')(3).toFloat).distinct.sortBy(x=>(-x)).zipWithIndex.filter(x=>x._2==2).map(_._1)}).take(15).foreach{println}
94.0
92.0
95.0
93.0

一些更复杂的代码给我输出

scala> a.groupByKey().flatMap(rec=>{ val max = rec._2.toList.map(x=>x.split(',')(3).toFloat).distinct.sortBy(x=>(-x)).take(1)
     | rec._2.toList.sortBy(x=>(-x.split(',')(3).toFloat)).takeWhile(rec=> max.contains(rec.split(',')(3).toFloat))}).take(15).foreach{println}
DN,IND,26,98,XMAN,maths
DPS,UK,25,96,SOMK,physics
DPS,US,28,98,XOMAN,chemistry
DN,US,28,98,SMITH,data science

当我使用 zipwithindex 时,似乎有一些数据类型不匹配。 有人可以帮我吗

【问题讨论】:

    标签: scala apache-spark flatmap


    【解决方案1】:

    由于.toMap.keys 导致类型不匹配。结果,val max 的类型为 Iterable[Float],因为方法 keys 返回 Iterable[A]。

    解决方案之一是在max 计算结束时添加head

      val max = rec._2.toList
        .map(x => x.split(',')(3).toFloat)
        .distinct
        .sortBy(x => (-x))
        .zipWithIndex
        .filter(x => x._2 == 2)
        .toMap
        .keys
        .head
    

    基本上,head 将返回 Float 类型的值。那么这段代码至少应该比较相等的类型x.split(',')(3).toFloat == max

    虽然,调用head 不是安全的方法。如果在您的情况下 filter 函数可以返回空列表,它可能会引发异常。那么就会抛出这样的异常:

    java.util.NoSuchElementException: next on empty iterator

    一旦它适用于具体的数据样本,您可以考虑重构此代码以与 Set 一起使用。而不是head.keys.toSet 并像使用max.contains(rec.split(',')(3)) 的其他示例一样进行比较

    【讨论】:

    • 谢谢 Alexey,你的回答有帮助,我在 .head 之后得到了结果。只是需要更多说明,当我再次使用 toSet 时,我没有得到任何值 a.groupByKey().flatMap(rec=>{ val max = rec._2.toList.map(x=>x.split(',' )(3).toFloat).distinct.sortBy(x=>(-x)).zipWithIndex.filter(x=>x._2==2).toMap.keys.toSet rec._2.toList.sortBy(x =>(-x.split(',')(3).toFloat)).takeWhile(rec=> max.contains(rec.split(',')(3).toFloat))}).take(15 ).foreach{println}
    • 不客气。 toSet 也应该可以工作。我有一个填充物,takeWhile 并不是你真正想要的。基本上,一旦其谓词函数第一次返回false.takeWhile 将停止过滤集合。事件虽然集合可能有一些元素,根据takeWhile 谓词,这将是true。有一个示例仅适用于 Scala 集合,而不是 Spark:List(3f,2,1).zipWithIndex.toMap.keys.toSet.takeWhile(x =&gt; x &lt; 3) res4: Set[Float] = Set()。尝试使用filter 而不是takeWhile
    • 是的,你是对的,.takeWhile 将停止过滤集合,这就是为什么发生这种情况的原因,因为第一条记录是 != max 所以它没有给我结果。你的回答真的帮助了我。感谢您抽出时间帮助我。请继续加油!!希望这是我应该做的逃避 java.util.NoSuchElementException: next on empty iterator 。 val max1 = (if (max.isEmpty) 0.0 else max.head) rec._2.toList.sortBy(x=>(-x.split(',')(3).toFloat)).filter{rec=> max1==(rec.split(',')(3).toFloat)}}).take(15).foreach{println}
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-04-09
    • 1970-01-01
    • 2013-09-03
    • 2014-10-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多