【问题标题】:How to merge two different type streams RDDs如何合并两个不同类型的流 RDD
【发布时间】:2016-08-05 17:18:59
【问题描述】:

我需要合并两个不同的流 RDD。

streams类型的一个是org.apache.spark.streaming.dstream.DStream[String],另一个是org.apache.spark.streaming.dstream.DStream[twitter4j.Status]类型。

我试过了:

  val streamRDD = stream.union(sentiments)

但它不会成功:

[error]  found   : org.apache.spark.streaming.dstream.DStream[String]
[error]  required: org.apache.spark.streaming.dstream.DStream[twitter4j.Status]
[error]       val streamRDD = stream.union(sentiments)
[error]                                    ^

【问题讨论】:

  • 您希望结果是什么样的?错误消息正确描述了问题:您的 Dstreams 包含不同的类型,因此无法合并。您希望合并后的DStream 持有什么?如果String 你必须先将另一个转换成DStream[String]

标签: scala apache-spark stream rdd


【解决方案1】:

问题是union 仅适用于相同元素类型的两个DStream,而您有DStream[String]DStream[twitter4j.Status]String 不是twitter4j.Status

我假设你有以下类型:

val stream: DStream[twitter4j.Status]
val sentiments: DStream[String]

你有不同的选择来解决这个问题:

    1. 您确定Stringtwitter4j.Status 应该混合为一个DStream,因为它们在您的上下文中代表相同的信息:转换DStream 以匹配另一个

      • a) 将stream 转换为匹配sentiments,因此您需要转换twitter4j.Status => String,也许您可​​以像这样使用_.toString

        val stream2 = stream.map(_.toString)
        val result = stream2.union(sentiments)
        
      • b) 将sentiments 转换为匹配stream,需要String => twitter4j.Status
    1. Stringtwitter4j.Status 在您的上下文中是两个不同的东西,您希望保持两者之间的区别,但仍将它们组合成一个 DStream

    一般你可以使用Sum-type 来表示每种情况,这里我们只有两个所以我们可以使用预定义的Either

    type R = DStream[Either[String,twitter4j.Status] // shorter
    val streamL: R = stream.map(Left(_))
    val sentimentR: R = sentiments.map(Right(_))
    val result: R = streamL.union(sentimentsR)
    

    最后,您将获得 一个 流,其中每个元素要么是 String 包裹在 Left 中,要么是 twitter4j.Status 包裹在 Right 中,让您能够区分处理流时介于两者之间。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-22
    • 2017-10-12
    • 1970-01-01
    • 2019-08-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多