【问题标题】:Apache Storm Kafka Spout Lag IssueApache Storm Kafka Spout 延迟问题
【发布时间】:2019-03-14 15:00:37
【问题描述】:

我正在使用 Storm 1.1.2 和 Kafka 0.11 构建 Java Spring 应用程序,以在 Docker 容器中启动。

我的拓扑结构中的所有内容都按计划工作,但在 Kafka 的高负载下,Kafka 延迟会随着时间的推移越来越多。

我的 KafkaSpoutConfig:

 KafkaSpoutConfig<String,String> spoutConf = 
     KafkaSpoutConfig.builder("kafkaContainerName:9092", "myTopic")
     .setProp(ConsumerConfig.GROUP_ID_CONFIG, "myGroup")
     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectDeserializer.class)
     .build()

那么我的拓扑如下

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("stormKafkaSpout", new KafkaSpout<String,String>(spoutConf), 25);

builder.setBolt("routerBolt", new RouterBolt(),25).shuffleGrouping("stormKafkaSpout");

Config conf = new Config();
conf.setNumWorkers(10);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of("zookeeper"));
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);

conf.put(Config.NIMBUS_SEEDS, ImmutableList.of("nimbus"));
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);

System.setProperty("storm.jar", "/opt/storm.jar");

StormSubmitter.submitTopology("topologyId", conf, builder.createTopology());

RouterBolt(扩展 BaseRichBolt)执行一个非常简单的 switch 语句,然后使用本地 KafkaProducer 对象将新消息发送到另一个主题。就像我说的,一切都可以编译并且拓扑按预期运行,但是在高负载(3000 条消息/秒)下,Kafka 延迟只是堆积起来,等同于拓扑的低吞吐量。

我已经尝试禁用 acking

conf.setNumAckers(0);

conf.put(Config.TOPOLGY_ACKER_EXECUTORS, 0);

但我想这不是问题。

我在 Storm UI 上看到 RouterBolt 在高负载下的执行延迟为 1.2 毫秒,处理延迟为 0.03 毫秒,这让我相信 Spout 是瓶颈。此外,并行提示是 25,因为有“myTopic”的 25 个分区。谢谢!

【问题讨论】:

    标签: java spring apache-kafka apache-storm apache-storm-topology


    【解决方案1】:

    您可能会受到https://issues.apache.org/jira/browse/STORM-3102 的影响,这会导致 spout 在每次发出时都会进行非常昂贵的调用。请尝试升级到某个固定版本。

    编辑:该修复程序实际上尚未发布。您可能仍想通过使用例如从源构建 spout 来尝试修复。 https://github.com/apache/storm/tree/1.1.x-branch 构建 1.1.4 快照。

    【讨论】:

    • 感谢您的意见!如果我正确理解该问题,可以通过将我的处理保证更改为最多一次处理来解决。我只是尝试在 spout 配置上更改它以查看它是否有任何效果,但似乎没有产生效果。这不能解决阻塞问题吗?或者以其他方式减缓喷口的消耗?如果是这样,我会尝试检查 1.1.4 快照
    • 虽然这可能会产生一些影响,但我不相信这是我看到的问题。我的 kafka spout 只能消耗约 1600 条消息/秒,并且延迟呈指数级增长(奇怪的是,在某些分区上它增长的速度要快得多,但在其他分区上却很低并保持在低水平)
    • 我认为这会影响到每个人。昂贵的电话是第一行的kafkaConsumer.committed(tp)
    • 就是这样!这绝对削弱了我的拓扑的吞吐量。谢谢!
    • 这感觉像是上辈子的事,但 IIRC 迁移到 1.1.4 解决了这个问题。 @Stig Rohde Døssing 关于 kafkaConsumer.committed(tp) 是一个昂贵的电话是正确的
    猜你喜欢
    • 2019-03-08
    • 1970-01-01
    • 2020-05-01
    • 2018-09-19
    • 2016-11-12
    • 1970-01-01
    • 1970-01-01
    • 2019-05-04
    • 1970-01-01
    相关资源
    最近更新 更多