【发布时间】:2020-08-10 04:29:30
【问题描述】:
我必须将 rdd 与其类型相匹配。
trait Fruit
case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends Fruit
现在DStream[Fruit] 类型的 dstream 即将到来。它是Apple 或Mango。
如何根据子类进行操作?类似下面的东西(不起作用):
dStream.foreachRDD{rdd:RDD[Fruit] =>
rdd match {
case rdd: RDD[Apple] =>
//do something
case rdd: RDD[Mango] =>
//do something
case _ =>
println(rdd.count() + "<<<< not matched anything")
}
}
【问题讨论】:
-
您如何使用数据我的意思是发送 Apple 或 Mango 类型数据的源是什么?
-
目前我使用的是端口(
nc -lk 12345) -
输入以 json 字符串形式给出并进行相应解析,我得到
dStream : DStream[Payload] -
模式匹配每个 RDD 中的每一行的解决方案是否可行?即,对于 Apple => 的每一行执行此操作,对于每个 Mango 行 => 执行此操作,忽略并过滤掉其他类型。
-
trait SentientBeing trait Animal extends SentientBeing case class Dog(name: String) extends Animal case class Person(name: String, age: Int) extends SentientBeing // later in the code ... def printInfo(x: SentientBeing) = x match { case Person(name, age) => // handle the Person case Dog(name) => // handle the Dog }应该可以工作......它不工作吗?简单的方法是@Shaido-ReinstateMonica 建议...您可以过滤有效载荷类型的值并根据过滤值进行处理
标签: scala apache-spark spark-streaming subclass rdd