【问题标题】:Storm-kafka spout consuming slowlyStorm-kafka 喷口消耗缓慢
【发布时间】:2013-11-11 10:15:01
【问题描述】:

我只是在尝试这里提到的 kafka-storm spout https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka,我使用的配置如下所述。

    BrokerHosts brokerHosts = KafkaConfig.StaticHosts.fromHostString(
            ImmutableList.of("localhost"), 1);
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, // list of Kafka
            "test", // topic to read from
            "/kafkastorm", // the root path in Zookeeper for the spout to
            "discovery"); // an id for this consumer for storing the
                            // consumer offsets in Zookeeper
    spoutConfig.scheme = new StringScheme();
    spoutConfig.stateUpdateIntervalMs = 1000;


    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    TridentTopology topology = new TridentTopology();
    InetSocketAddress inetSocketAddress = new InetSocketAddress(
            "localhost", 6379);
    TridentState wordsCount = topology
            .newStream(SPOUT_FIRST, kafkaSpout)
            .parallelismHint(1)
            .each(new Fields("str"), new TestSplit(), new Fields("words"))
            .groupBy(new Fields("words"))
            .persistentAggregate(
                    RedisState.transactional(inetSocketAddress),
                    new Count(), new Fields("counts")).parallelismHint(100);

    Config conf = new Config();
    conf.setMaxTaskParallelism(200);
    // conf.setDebug( true );
    // conf.setMaxSpoutPending(20);

    // This topology can only be run as local because it is a toy example
    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("symbolCounter", conf, topology.build());

但上述 spout 从 Kafka 主题获取消息的速度约为 7000 条/秒,但我预计每秒加载约 50000 条消息。我尝试了各种增加 spoutConfig 中提取缓冲区大小的选项,但没有可见的结果。

有没有人遇到过无法以生产者生成消息的速度通过storm获取kafka主题的类似问题?

【问题讨论】:

  • 只是一个想法,可能是因为暴风雨中的acking而不是因为KafkaSpout,您是否看到关闭acking有什么不同?您的主题的分区数是多少。降低 maxSpoutPending 值还有什么变化?
  • @user2720864 问题是 kafspout 能够获取所有大量消息,但它在更新 zookeeper 阶段需要一些时间,该阶段继续在其自己的阶段。我会试试你的建议。
  • @user2720864:我将配置中的“topology.spout.max.batch.size”值更新为大约 64*1024 值,然后风暴处理变得很快。

标签: apache-storm apache-kafka


【解决方案1】:

我将配置中的“topology.spout.max.batch.size”值更新为大约 64*1024 值,然后风暴处理变得很快。

【讨论】:

  • 感谢分享配置!
猜你喜欢
  • 1970-01-01
  • 2020-05-02
  • 2015-07-12
  • 1970-01-01
  • 2015-10-14
  • 2014-04-29
  • 1970-01-01
  • 2017-02-23
  • 1970-01-01
相关资源
最近更新 更多