【问题标题】:Kafka Spout Reading same message multiple timesKafka Spout 多次读取相同的消息
【发布时间】:2015-04-20 20:32:03
【问题描述】:

如果我在我的 Storm 拓扑中增加 Kafka Spout 的并行度,我怎样才能阻止它多次读取同一主题中的同一消息?

【问题讨论】:

    标签: apache-storm apache-kafka kafka-consumer-api


    【解决方案1】:

    Storm 的 Kafka spout 将消费者偏移量持久保存到 Zookeeper,因此只要您不清除 Zookeeper 存储,它就不应多次读取同一消息。如果您看到一条消息被多次读取,也许检查偏移量是否被持久化到您的 zookeeper 实例?

    我认为,默认情况下,Kafka spout 在本地运行时会启动自己的本地 Zookeeper 实例(与 Kafka 的 Zookeeper 不同),每次重启拓扑时,它的状态可能会重置。

    【讨论】:

      【解决方案2】:

      您应该检查消息是否得到正确确认。如果不是,则 spout 会将其视为失败并回复消息。

      【讨论】:

      • 对此答案的补充。通过深入研究,我们也遇到了同样的问题,我们发现处理 MessageTimeoutSecs 中指定的元组需要更多时间。所以通过增加MessageTimeoutSecs 的值可以解决我的问题。
      【解决方案3】:

      如果是从kafka流入storm,那么请分享更多信息。

      如果数据流是从storm到kafka:

      然后只需检查代码中的 TopologyBuilder。

      不应该是allGrouping,如果是则将其更改为shuffleGrouping

      例子:

          builder.setBolt("OUTPUTBOLT", new OutBoundBolt(boltConfig), 4)
                  .allGrouping("previous_bolt"); // this is wrong change it to
                                                  // shuffleGrouping
      

      所有分组:流在所有螺栓的任务中复制。请谨慎使用此分组。

      【讨论】:

        【解决方案4】:

        您需要指定消费者组。一旦指定,Kafka 只会将下一条消息发送给您的任何 spout。所有的 spout 都应该属于同一个消费者组。

        在创建消费者时,请指定以下属性

        props.put("group.id", a_groupId);

        【讨论】:

        • 我正在使用Trident Kafka Spout,我不清楚如何在这个设置中添加!你能帮我解决这个问题吗?
        【解决方案5】:

        如果你的 kafka spout 是 Opeque 那么你需要 topology.max.spout.pending

        如果您的需求满足,您可以使用 Transactional Spout 来处理此问题。

        【讨论】:

          猜你喜欢
          • 2017-01-24
          • 2019-10-11
          • 1970-01-01
          • 2017-02-05
          • 1970-01-01
          • 2015-07-30
          • 2016-05-04
          • 2017-02-23
          • 2016-06-04
          相关资源
          最近更新 更多