【问题标题】:Spark streaming if(!rdd.partitions.isEmpty) not working火花流 if(!rdd.partitions.isEmpty) 不工作
【发布时间】:2017-03-16 02:11:35
【问题描述】:

我正在尝试从 kafka 服务器创建一个 dStream,然后对该流进行一些转换。如果流为空(if(!rdd.partitions.isEmpty)),我已经包含了一个捕获;但是,即使没有事件发布到 kafka 主题,也永远不会到达 else 语句。

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

stream.foreachRDD { rdd =>
    if(!rdd.partitions.isEmpty) {

        val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser)

        val val = message(0)

    } else println("empty stream...")

    ssc.start() 
    ssc.awaitTermination()

}

在使用KafkaUtils.createDirectStream 而不是createStream 时,我应该使用替代语句来检查流是否为空?

【问题讨论】:

    标签: scala apache-kafka spark-streaming kafka-consumer-api dstream


    【解决方案1】:

    使用RDD.isEmpty 而不是RDD.partitions.isEmpty,它会检查底层分区是否真的有元素:

    stream.foreachRDD { rdd =>
      if(!rdd.isEmpty) {
        // Stuff
      }
    }
    

    RDD.partitions.isEmpty 不起作用的原因是RDD 内部存在一个分区,但该分区本身是空的。但从partitions 来看,Array[Partition] 并不是空的。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-11-23
      • 1970-01-01
      • 1970-01-01
      • 2017-04-27
      • 2020-07-25
      • 1970-01-01
      相关资源
      最近更新 更多