【问题标题】:Storm-kafka-mongoDB integrationStorm-kafka-mongoDB 集成
【发布时间】:2019-07-28 03:06:42
【问题描述】:

我正在从 Kafka 生产者连续读取 500 MB 随机元组,并且在风暴拓扑中,我使用 Mongo Java 驱动程序将其插入到 MongoDb。问题是我的吞吐量非常低,每秒 4-5 个元组。

如果我编写一个简单的打印语句,如果没有 DB 插入,我将获得每秒 684 个元组的吞吐量。我计划从 Kafka 运行 100 万条记录,并使用 mongo insert 检查吞吐量。

我尝试在 kafkaconfig 中使用配置 setMaxSpoutPending 、 setMessageTimeoutSecs 参数进行调整。

   final SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, clientId);
    kafkaConf.ignoreZkOffsets=false;
    kafkaConf.useStartOffsetTimeIfOffsetOutOfRange=true;
    kafkaConf.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
    kafkaConf.stateUpdateIntervalMs=2000;
    kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    final TopologyBuilder topologyBuilder = new TopologyBuilder();
    topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
    topologyBuilder.setBolt("print-messages", new MyKafkaBolt()).shuffleGrouping("kafka-spout");
     Config conf = new Config();
     conf.setDebug(true);
     conf.setMaxSpoutPending(1000);
     conf.setMessageTimeoutSecs(30);

bolt的执行方法

      JSONObject jObj = new JSONObject();
    jObj.put("key", input.getString(0));

        if (null !=jObj && jObj.size() > 0 ) {
            final DBCollection quoteCollection = dbConnect.getConnection().getCollection("stormPoc");
            if (quoteCollection != null) {
                BasicDBObject dbObject = new BasicDBObject();
                dbObject.putAll(jObj);
                quoteCollection.insert(dbObject);
            //  logger.info("inserted in Collection !!!");
            } else {
                logger.info("Error while inserting data in DB!!!");
            }
            collector.ack(input);

【问题讨论】:

    标签: mongodb apache-kafka performance-testing apache-storm


    【解决方案1】:

    有一个 storm-mongodb 模块用于与 Mongo 集成。它不做这项工作吗? https://github.com/apache/storm/tree/b07413670fa62fec077c92cb78fc711c3bda820c/external/storm-mongodb

    您不应该使用storm-kafka 进行 Kafka 集成,它已被弃用。请改用storm-kafka-client

    设置conf.setDebug(true) 会影响您的处理,因为Storm 会为每个元组记录相当大量的文本。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-06-24
      • 2023-04-05
      • 2018-09-19
      • 1970-01-01
      • 2015-05-31
      • 2018-11-03
      • 2017-03-30
      • 1970-01-01
      相关资源
      最近更新 更多