【发布时间】:2019-07-28 03:06:42
【问题描述】:
我正在从 Kafka 生产者连续读取 500 MB 随机元组,并且在风暴拓扑中,我使用 Mongo Java 驱动程序将其插入到 MongoDb。问题是我的吞吐量非常低,每秒 4-5 个元组。
如果我编写一个简单的打印语句,如果没有 DB 插入,我将获得每秒 684 个元组的吞吐量。我计划从 Kafka 运行 100 万条记录,并使用 mongo insert 检查吞吐量。
我尝试在 kafkaconfig 中使用配置 setMaxSpoutPending 、 setMessageTimeoutSecs 参数进行调整。
final SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, clientId);
kafkaConf.ignoreZkOffsets=false;
kafkaConf.useStartOffsetTimeIfOffsetOutOfRange=true;
kafkaConf.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
kafkaConf.stateUpdateIntervalMs=2000;
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
final TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
topologyBuilder.setBolt("print-messages", new MyKafkaBolt()).shuffleGrouping("kafka-spout");
Config conf = new Config();
conf.setDebug(true);
conf.setMaxSpoutPending(1000);
conf.setMessageTimeoutSecs(30);
bolt的执行方法
JSONObject jObj = new JSONObject();
jObj.put("key", input.getString(0));
if (null !=jObj && jObj.size() > 0 ) {
final DBCollection quoteCollection = dbConnect.getConnection().getCollection("stormPoc");
if (quoteCollection != null) {
BasicDBObject dbObject = new BasicDBObject();
dbObject.putAll(jObj);
quoteCollection.insert(dbObject);
// logger.info("inserted in Collection !!!");
} else {
logger.info("Error while inserting data in DB!!!");
}
collector.ack(input);
【问题讨论】:
标签: mongodb apache-kafka performance-testing apache-storm