【问题标题】:spark streaming: use of broadcast variable generates NotSerializableException火花流:使用广播变量生成 NotSerializableException
【发布时间】:2016-05-27 08:16:45
【问题描述】:

在使用 Twitter 实用程序加载 jar 后,我正在 spark-shell 中进行一些测试。这是一个有效的代码序列:

// launch:
// spark-shell --driver-memory 1g --master local[3] --jars target/scala-2.10/tweetProcessing-1.0.jar

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._

val consumerKey = ...
val consumerSecret = ...
val accessToken = ...
val accessTokenSecret = ...
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

val ssc = new StreamingContext(sc, Seconds(60))
val tweetStream = TwitterUtils.createStream(ssc, None)
val myNewStream = tweetStream.map(tweet => tweet.getText)
    .map(tweetText => tweetText.toLowerCase.split("\\W+"))
    .transform(rdd => 
        rdd.map(tweetWordSeq => { 
            tweetWordSeq.foreach { word => { 
                val mySet = Set("apple", "orange");
                if(!(mySet)(word)) word }
            }
        }))
myNewStream.foreachRDD((rdd,time) => { 
    println("%s at time %s".format(rdd.count(),time.milliseconds))
})
ssc.start()

(实际上我最大程度地减少了我所做的计算,只是为了突出问题)。这里mySet被序列化了,一切顺利。

但是当我使用广播变量并相应地替换测试时:

val ssc = new StreamingContext(sc, Seconds(60))

val mySet = sc.broadcast(Set("apple", "orange"))

val tweetStream = TwitterUtils.createStream(ssc, None)
val myNewStream = tweetStream.map(tweet => tweet.getText)
    .map(tweetText => tweetText.toLowerCase.split("\\W+"))
    .transform(rdd => 
        rdd.map(tweetWordSeq => { 
            tweetWordSeq.foreach { word => { 
                if(!(mySet.value)(word)) word }
            }
        }))
myNewStream.foreachRDD((rdd,time) => { 
    println("%s at time %s".format(rdd.count(),time.milliseconds))
})
ssc.start()

我明白了:

ERROR JobScheduler: Error generating jobs for time 1464335160000 ms
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.TransformedDStream is being serialized  possibly as a part of closure of an RDD operation. This is because  the DStream object is being referred to from within the closure.  Please rewrite the RDD operation inside this DStream to avoid this.  This has been enforced to avoid bloating of Spark tasks  with unnecessary objects.

我自然更喜欢使用广播变量(我的集合实际上是一组相当大的停用词),但我不太明白问题出在哪里。

【问题讨论】:

    标签: apache-spark streaming broadcast notserializableexception


    【解决方案1】:

    您需要在驱动程序中创建广播变量(在任何闭包之外),而不是在transformforeachRDD 等任何转换中。

    val ssc = new StreamingContext(sc, Seconds(60))
    val mySet = ssc.sparkContext.broadcast(Set("apple", "orange"))
    

    然后,您可以访问 transform 中的广播变量或执行器上的其他 DStream 闭包,例如,

    !(mySet.value)(word)
    

    如果您在 transform 闭包的 rdd.map 中有此语句 sc.broadcast(Set("apple", "orange")),驱动程序将尝试将 StreamingContext 发送给所有执行程序,并且它是不可序列化的。这就是为什么你看到NotSerializableException

    【讨论】:

    • 对不起,我的消息确实不是很清楚,我显然在val myNewStream = tweetStream.map...之前写了val mySet = sc.broadcast(Set("apple", "orange"))。是因为我直接访问 Spark 上下文 (sc) 而不是通过 Streaming 上下文 (ssc.sparkContext)?
    • 哦,好的:)。能否请您复制粘贴引发错误的整个代码?
    • 刚刚将 Spark 从 1.5.1 升级到 1.6.1(并重建了 jar),最小的代码现在似乎可以与广播变量一起使用。我会尝试完整的代码。
    • 我看不出代码有什么问题。我有非常相似的代码处理流和广播变量。
    • 刚刚又试了,有广播变量的版本不行,因为序列化问题。完全相同的代码(使用广播变量)适用于从文件(即不在 DStream 中)读取的固定 RDD。
    猜你喜欢
    • 1970-01-01
    • 2017-02-13
    • 2016-01-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-08
    • 1970-01-01
    • 2015-11-08
    相关资源
    最近更新 更多