【发布时间】:2017-05-04 11:37:48
【问题描述】:
Kafka 直接消费者开始将读取限制为每批(5 秒)读取 450 个事件(5 * 90 个分区),在此之前它运行良好 1 或 2 天(每批大约 5000 到 40000 个事件)
我正在使用在 AWS 中运行的 spark 独立集群(spark 和 spark-streaming-kafka 版本 1.6.1)并使用 S3 存储桶作为检查点目录StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext),每个工作节点上没有调度延迟和足够的磁盘空间.
没有改变任何Kafka客户端初始化参数,很确定kafka的结构没有改变:
val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
也无法理解为什么当直接消费者描述说The consumed offsets are by the stream itself我在创建流上下文时仍然需要使用检查点目录?
【问题讨论】:
-
spark.streaming.backpressure.enabled是否设置为true? -
是的,我会尝试禁用它
-
看起来有帮助
-
很高兴知道 :)
标签: scala amazon-web-services apache-spark apache-kafka spark-streaming