【问题标题】:Kafka + spark streaming: kafka.common.OffsetOutOfRangeException卡夫卡+火花流:kafka.common.OffsetOutOfRangeException
【发布时间】:2018-02-24 22:57:21
【问题描述】:

我对整个 Kafka/Spark 的事情都是新手。我有 Spark Streaming (PySpark) 从 Kafka 生产者那里获取数据。它运行了一分钟,然后总是抛出一个kafka.common.OffsetOutOfRangeException。 Kafka 消费者是 0.8 版(显然,PySpark 不支持 0.10)。我在 AWS Ubuntu 14.04 上有一个有 3 个工作人员的主节点。我不知道这是否相关,但这里的 Kafka 日志相对较大(~1-10kb),我已经相应地调整了生产者/经纪人/消费者配置。数据传递得很好,虽然可能比我认为生产者可能产生的要慢(这可能是问题的根源?)。

通过增加保留时间/大小解决了类似的问题:Kafka OffsetOutOfRangeException

但是我的保留时间是一个小时,每个节点的server.properties 的大小是 1GB,更重要的是,Spark 的故障时间和设置的保留时间/大小没有变化。

还有其他调整的可能性吗,也许在 Spark Streaming 配置上?我在网上看到的所有答案都与 Kafka 配置有关,但在我的情况下似乎没有什么不同。

编辑 1:我尝试 a) 从生产者那里读取多个流,b) 使用 time.sleep(1.0) 减慢生产者流本身的速度。两者都没有持久的效果。

n_secs = 1
ssc = StreamingContext(sc, n_secs)
kds = [KafkaUtils.createDirectStream(ssc, ['test-video'], {
                    'bootstrap.servers':'localhost:9092', 
                    'group.id':'test-video-group', 
                    'fetch.message.max.bytes':'15728640',
                    'auto.offset.reset':'largest'}) for _ in range(n_streams)]

stream = ssc.union(*kds)

【问题讨论】:

  • 看起来你正在使用 0.8 中的新消费者对吧?我猜这是通过引导服务器而不是 zk 连接。你如何提交偏移量?
  • @dawsaw 它是自动提交的,但在周末我想我已经确定这是 Spark Streaming 中的背压问题。

标签: apache-spark pyspark apache-kafka spark-streaming kafka-consumer-api


【解决方案1】:

是否有可能您的生产者生成太多消息太快以至于每个代理上 1G 不够? 1G在所有现实中似乎非常低。在 Spark Streaming 决定了它需要在微批处理中处理的偏移范围并尝试根据偏移量从代理检索消息后,由于大小限制,消息消失了。请将代理大小增加到 100G 之类的更大,看看是否能解决您的问题。

【讨论】:

  • 我同意(经过进一步测试后),几乎可以肯定是生产者生成了 Spark Streaming 无法跟上的消息。但我认为我需要更改我正在生产的内容和/或 Spark 配置,而不是更改 Kafka 配置。
猜你喜欢
  • 2016-08-03
  • 2018-09-15
  • 2018-08-13
  • 2023-03-19
  • 2018-08-15
  • 1970-01-01
  • 2019-04-11
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多