【问题标题】:Order by value in spark pair RDD after join加入后在火花对 RDD 中按值排序
【发布时间】:2015-06-19 16:49:39
【问题描述】:

我有 2 个配对的 RDD,我使用相同的键将它们连接在一起,现在我想使用其中一个值对结果进行排序。 新加入的RDD类型为:RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])]

其中第一部分是配对的 RDD 键,可迭代部分是我加入的两个 RDD 中的值。我现在想按第二个 RDD 的时间字段对它们进行排序。我尝试使用 sortBy 函数,但出现错误。

有什么想法吗?

谢谢

【问题讨论】:

  • 改进您的问题以获得快速而好的答案。
  • 显示您的代码和错误。

标签: scala apache-spark rdd


【解决方案1】:

Spark 对 RDD 有一个 mapValues 方法。我想它会对你有所帮助。

    def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]
    Pass each value in the key-value pair RDD through a map function 
without changing the keys; this also retains the original RDD's partitioning.

Spark Documentation 有更多详细信息。

【讨论】:

  • 不确定我是否理解它对我的帮助。你能解释更多吗?谢谢
【解决方案2】:

你是对的,你可以使用sortBy函数:

val yourRdd: RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])] = ...(your cogroup operation here)

val result = yourRdd.sortBy({
  case ((str, i), iter) if iter.nonEmpty => iter.head._2._
  }, true)

iter.head 的类型为((String, DateTime, Int,Int), (String, DateTime, String, String))

iter.head._2 的类型为 (String, DateTime, String, String)

iter.head._2._2 确实是DateTime 的类型。

也许您应该为日期时间提供隐式排序对象,例如this。 顺便问一下,迭代器可能是空的吗?然后您应该将此案例添加到sortBy 函数中。如果这个迭代器中有很多项,那么选择哪一项进行排序?

【讨论】:

  • 感谢@ipoteka,我仍然遇到错误。这是我正在使用的代码: val mappedDF = firstRDD.join(secondRDD).groupByKey() val res = mappedDF.sortBy( {case ((str, i), iter) if iter.nonEmpty => iter(0) ._2._2} ,真);当站在 iter(0) 上时,错误是:Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))] does not take parameters 在涉及默认参数的应用程序中发生错误。跨度>
  • 啊,对不起。它确实可能没有这种方法。但它必须支持head操作:scala-lang.org/api/2.11.4/index.html#scala.collection.Iterable所以我编辑我的答案是准确的。
【解决方案3】:

如果RDD的Iterable需要排序:

val rdd: RDD[((String, Int), 
             Iterable[((String, DateTime, Int,Int), 
                       (String, DateTime, String, String))])] = ???

val dateOrdering = new Ordering[org.joda.time.DateTime]{ 
    override def compare(a: org.joda.time.DateTime,
                         b: org.joda.time.DateTime) = 
        if (a.isBefore(b)) -1 else 1
}

rdd.mapValues(v => v.toArray
                    .sortBy(x => x._2._2)(dateOrdering))

【讨论】:

    【解决方案4】:

    使用python:

    sortedRDD = unsortedRDD.sortBy(lambda x:x[1][1], False)
    

    这将按降序排序

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-02-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多