【问题标题】:The same offset value is used by different topics不同主题使用相同的偏移值
【发布时间】:2018-05-31 11:52:32
【问题描述】:

我们的拓扑使用KafkaSpout 从 kafka 主题中获取消息。我们有约 150 个主题,约 12 个分区、8 个风暴执行器和 2 个风暴节点上的任务。 Storm 版本 1.0.5,Kafka 代理版本 10.0.2,Kafka 客户端版本 0.9.0.1。我们不会删除 Kafka 主题。

在某个时刻,我在 worker.log 中观察到大量重复的 WARN 消息

2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] 分区{host1:9092, topic=topic_1, partition=10} 获取偏移量超出的获取请求 范围:[9248]

2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [警告] 分区{host=host2:9092, topic=topic_2, partition=0} 获取 偏移量超出范围的请求:[22650006]

2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [警告] 分区{host=host3:9092, topic=topic_3, partition=4} 获取 偏移量超出范围的请求:[1011584]

2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] 分区{host1:9092, topic=topic4, partition=4} 获取偏移量超出的获取请求 范围:[9266]

2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] 分区{host=host2:9092, topic=topic5, partition=4} 获取偏移量超出范围的请求: [9266]

2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] 分区{host1:9092, topic=topic6, partition=4} 获取偏移量超出范围的请求: [1011584]

2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [警告] 分区{host=host2:9092, topic=topic6, partition=10} 获取 偏移量超出范围的请求:[9248]

由于某种原因,相同的常量偏移值被用于不同主题的相同分区。

我启用了 DEBUG 模式并更准确地观察了日志文件。

2018-05-29 14:37:03.573 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] 写到最后完成 对于分区 {host=host3:9092, topic=topic1, 偏移 (1572936) 到 ZK, partition=8} 用于拓扑:topology1

2018-05-29 14:37:03.577 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] 写到最后完成 对于分区 {host=host1:9092, topic=topic2, 偏移 (1572936) 到 ZK, partition=8} 用于拓扑:topology1

2018-05-29 14:37:03.578 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] 写到最后完成 对于分区 {host=host2:9092, topic=topic3, 偏移 (1572936) 到 ZK, partition=8} 用于拓扑:topology1

2018-05-29 14:38:07.581 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] 写到最后完成 对于分区 {host=host1:9092, topic=topic4, 偏移 (61292573) 到 ZK, partition=8} 用于拓扑:topology1

2018-05-29 14:38:07.582 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] 写到最后完成 对于分区 {host=host2:9092, topic=topic5, 到 ZK 的偏移量 (61292573) partition=8} 用于拓扑:topology1

2018-05-29 14:38:07.584 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] 写到最后完成 分区的偏移量(61292573)到 ZK {host=host3:9092, topic=topic6, partition=8} 用于拓扑:topology1

我注意到所有主题的某些部分被分成两个独立的组。每组由 31 个主题组成。每个组中的所有主题都对每个分区使用相同的偏移值。然而,该值不是恒定的,而是在 8 个不同的值之间变化。对于小组中的特定主题,这 8 个值中的每一个都是正确的。此外,这些值中的每一个都随着时间的推移而增长,并且所有主题都会同步更新它。 每个组的大多数主题(62 个中的 55 个)都有相应的“偏移量或范围”警告消息,但具有恒定值。其他 7 个主题在没有警告消息的情况下继续正常工作,但它们的偏移值也在发生变化。

我浏览了storm-kafka 的源代码,发现useStartOffsetTimeIfOffsetOutOfRange 标志在我们的例子中不起作用,因为我们没有失败的元组并且kafka 偏移量小于_emittedToOffset。因此,同样的 WARN 消息被一次又一次地记录下来。

    } catch (TopicOffsetOutOfRangeException e) {
        offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
        // fetch failed, so don't update the fetch metrics

        //fix bug [STORM-643] : remove outdated failed offsets
        if (!processingNewTuples) {
            // For the case of EarliestTime it would be better to discard
            // all the failed offsets, that are earlier than actual EarliestTime
            // offset, since they are anyway not there.
            // These calls to broker API will be then saved.
            Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);

            // Omitted messages have not been acked and may be lost
            if (null != omitted) {
                _lostMessageCount.incrBy(omitted.size());
            }

            _pending.headMap(offset).clear();

            LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
        }

        if (offset > _emittedToOffset) {
            _lostMessageCount.incrBy(offset - _emittedToOffset);
            _emittedToOffset = offset;
            LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
        }

        return;
    }

但是我不明白_emittedToOffset 怎么可能得到相同的值 针对不同的主题。您可能知道为什么会发生这种情况吗?

【问题讨论】:

  • 如果您使用一个分区创建两个唯一主题,则向每个主题发送一条消息。两个主题都在偏移量 1... 为什么主题不能具有相同的偏移值?
  • 他们可以,但在我们的例子中,所有主题都有不同的唯一偏移量。
  • 我的意思是不能保证。
  • 可能值得看看哪个代理是每个主题分区的领导者,但我发现有趣的是,在你的第二个引用中,它都是分区 8
  • 是的,我使用它的实用程序检查了 kafka 中的偏移量。这绝对是KafkaSpout 的错误行为。

标签: apache-kafka apache-storm


【解决方案1】:

storm-kafka 源代码中存在一个错误,当 Kafka 代理失败时会发生该错误。这里是对应的JIRA票和pull request有修复。

【讨论】:

    猜你喜欢
    • 2019-01-06
    • 2018-06-14
    • 2013-06-03
    • 2019-05-18
    • 2015-05-22
    • 2013-02-27
    • 2021-03-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多