【问题标题】:Saving partial spark DStream window to HDFS将部分 spark DStream 窗口保存到 HDFS
【发布时间】:2015-03-03 07:33:55
【问题描述】:

我正在计算每个窗口中的值并找到最高值,并且只想将每个窗口的前 10 个频繁值保存到 hdfs 而不是所有值。

eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    val counts = eegStreams(a).map(x => (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))
    val sortedCounts = counts.map(_.swap).transform(rdd => rdd.sortByKey(false)).map(_.swap)
    ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1))}      


    //sortedCounts.foreachRDD(rdd =>println("\nTop 10 amplitudes:\n" + rdd.take(10).mkString("\n")))
    sortedCounts.map(tuple => "%s,%s".format(tuple._1, tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1))

我可以打印上面的前 10 名(评论)。

我也试过了

sortedCounts.foreachRDD{ rdd => ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1))} 

但我收到以下错误。我的数组不可序列化

15/01/05 17:12:23 错误 actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext

【问题讨论】:

    标签: hdfs apache-spark apache-storm hadoop-streaming spark-streaming


    【解决方案1】:

    你可以试试这个吗?

    sortedCounts.foreachRDD(rdd => rdd.filterWith(ind => ind)((v, ind) => ind <= 10).saveAsTextFile(...))
    

    注意:我没有测试 sn-p...

    【讨论】:

      【解决方案2】:

      您的第一个版本应该可以工作。只需声明 @transient ssc = ... 首次创建 Streaming Context 的位置。

      第二个版本不起作用 b/c StreamingContext 不能在闭包中序列化。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-07-17
        • 1970-01-01
        • 2016-10-17
        • 2018-06-22
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多