【发布时间】:2018-02-24 22:57:21
【问题描述】:
我对整个 Kafka/Spark 的事情都是新手。我有 Spark Streaming (PySpark) 从 Kafka 生产者那里获取数据。它运行了一分钟,然后总是抛出一个kafka.common.OffsetOutOfRangeException。 Kafka 消费者是 0.8 版(显然,PySpark 不支持 0.10)。我在 AWS Ubuntu 14.04 上有一个有 3 个工作人员的主节点。我不知道这是否相关,但这里的 Kafka 日志相对较大(~1-10kb),我已经相应地调整了生产者/经纪人/消费者配置。数据传递得很好,虽然可能比我认为生产者可能产生的要慢(这可能是问题的根源?)。
通过增加保留时间/大小解决了类似的问题:Kafka OffsetOutOfRangeException
但是我的保留时间是一个小时,每个节点的server.properties 的大小是 1GB,更重要的是,Spark 的故障时间和设置的保留时间/大小没有变化。
还有其他调整的可能性吗,也许在 Spark Streaming 配置上?我在网上看到的所有答案都与 Kafka 配置有关,但在我的情况下似乎没有什么不同。
编辑 1:我尝试 a) 从生产者那里读取多个流,b) 使用 time.sleep(1.0) 减慢生产者流本身的速度。两者都没有持久的效果。
即
n_secs = 1
ssc = StreamingContext(sc, n_secs)
kds = [KafkaUtils.createDirectStream(ssc, ['test-video'], {
'bootstrap.servers':'localhost:9092',
'group.id':'test-video-group',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'largest'}) for _ in range(n_streams)]
stream = ssc.union(*kds)
【问题讨论】:
-
看起来你正在使用 0.8 中的新消费者对吧?我猜这是通过引导服务器而不是 zk 连接。你如何提交偏移量?
-
@dawsaw 它是自动提交的,但在周末我想我已经确定这是 Spark Streaming 中的背压问题。
标签: apache-spark pyspark apache-kafka spark-streaming kafka-consumer-api