【问题标题】:How to match an RDD[ParentClass] with RDD[Subclass] in apache spark?如何在apache spark中将RDD [ParentClass]与RDD [Subclass]匹配?
【发布时间】:2020-08-10 04:29:30
【问题描述】:

我必须将 rdd 与其类型相匹配。

trait Fruit

case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends  Fruit

现在DStream[Fruit] 类型的 dstream 即将到来。它是AppleMango

如何根据子类进行操作?类似下面的东西(不起作用):

dStream.foreachRDD{rdd:RDD[Fruit] =>
     rdd match {
       case rdd: RDD[Apple] =>
         //do something

       case rdd: RDD[Mango] =>
         //do something

       case _ =>
         println(rdd.count() + "<<<< not matched anything")
     }
    }

【问题讨论】:

  • 您如何使用数据我的意思是发送 Apple 或 Mango 类型数据的源是什么?
  • 目前我使用的是端口(nc -lk 12345)
  • 输入以 json 字符串形式给出并进行相应解析,我得到 dStream : DStream[Payload]
  • 模式匹配每个 RDD 中的每一行的解决方案是否可行?即,对于 Apple => 的每一行执行此操作,对于每个 Mango 行 => 执行此操作,忽略并过滤掉其他类型。
  • trait SentientBeing trait Animal extends SentientBeing case class Dog(name: String) extends Animal case class Person(name: String, age: Int) extends SentientBeing // later in the code ... def printInfo(x: SentientBeing) = x match { case Person(name, age) =&gt; // handle the Person case Dog(name) =&gt; // handle the Dog } 应该可以工作......它不工作吗?简单的方法是@Shaido-ReinstateMonica 建议...您可以过滤有效载荷类型的值并根据过滤值进行处理

标签: scala apache-spark spark-streaming subclass rdd


【解决方案1】:

由于我们有RDD[Fruit],因此任何行都可以是AppleMango。使用foreachRDD 时,每个RDD 将包含这些(以及可能的其他)类型的混合。

为了区分不同的类型,我们可以使用collect[U](f: PartialFunction[T, U]): RDD[U](不要与collect(): Array[T] 混淆,collect(): Array[T] 返回包含来自 RDD 的元素的列表)。 该函数将通过应用函数f返回一个包含所有匹配值的RDD(在这种情况下,我们可以在这里使用模式匹配)。

下面是一个小的说明性示例(也将Orange 添加到水果中)。

设置:

val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val inputData: Queue[RDD[Fruit]] = Queue()
val dStream: InputDStream[Fruit] = ssc.queueStream(inputData)

inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))

这会创建一个 RDD[Fruit] 流,其中包含两个单独的 RDDs。

dStream.foreachRDD{rdd: RDD[Fruit] =>
  val mix = rdd.collect{
    case row: Apple => ("APPLE", row.price) // do any computation on apple rows
    case row: Mango => ("MANGO", row.price) // do any computation on mango rows
    //case _@row => do something with other rows (will be removed by default).
  }
  mix foreach println
}

在上面的collect 中,我们稍微更改每一行(删除类),然后打印结果RDD。结果:

// First RDD
(MANGO,11)
(APPLE,5)
(APPLE,5)

// Second RDD
(MANGO,10)

可以看出,模式匹配保留并更改了包含AppleMango 的行,同时删除了所有Orange 类。


单独的 RDD

如果需要,也可以将两个子类分成自己的RDDs,如下所示。然后可以对这两个RDDs 执行任何计算。

val apple = rdd.collect{case row: Apple => row}
val mango = rdd.collect{case row: Mango => row}

完整示例代码

trait Fruit
case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends  Fruit
case class Orange(price:Int) extends  Fruit

object Test {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.master("local[*]").getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    val inputData: Queue[RDD[Fruit]] = Queue()
    val inputStream: InputDStream[Fruit] = ssc.queueStream(inputData)

    inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
    inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))

    inputStream.foreachRDD{rdd:RDD[Fruit] =>
      val mix = rdd.collect{
        case row: Apple => ("APPLE", row.price) // do any computation on apple rows
        case row: Mango => ("MANGO", row.price) // do any computation on mango rows
        //case _@row => do something with other rows (will be removed by default).
      }
      mix foreach println
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

【讨论】:

  • @potterpod 似乎这解决了你的问题,如果没有,请提问/respond.....如果你想关闭线程,如果你没问题,请注意接受the answer as owner 和@ 987654323@
  • 是的,它有帮助,只是一件事......试图获得单独的 rdd,现在我得到 RDD[DataFrame] 说苹果,现在我想写入数据库。现在在 RDD[Dataframe] 中一个接一个地迭代似乎效率不高。那么有没有办法将 RDD[Dataframe] 转换为 Dataframe。转换为单个数据帧并将其作为批量写入数据库
  • convert RDD[Apple] in to RDD[Row]((not RDD[DataFrame])...see here RDD[Row] 只不过是一个数据框。您可以使用将数据框插入数据库写方法。请不要忘记接受关闭线程的答案。我觉得影子给出的精彩解释。
猜你喜欢
  • 2014-05-13
  • 2016-03-06
  • 1970-01-01
  • 2015-06-15
  • 2014-12-18
  • 2017-03-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多