【问题标题】:storm processing data extremely slow风暴处理数据极慢
【发布时间】:2013-12-05 22:41:43
【问题描述】:
  • 我们在单个节点上有 1 个喷口和 1 个螺栓。 Spout 从 RabbitMQ 读取数据并将其发送到唯一一个将数据写入 Cassandra 的 Bolt。
  • 我们的数据源每秒生成 10000 条消息,storm 大约需要 10 秒来处理这个,这对我们来说太慢了。
  • 我们尝试增加拓扑的并行度,但这并没有什么不同。

在具有 1 个 spout 和 1 个 bolt 的单节点机器上可以处理的理想消息数量是多少?以及提高风暴拓扑处理速度的可能方法是什么?

更新: 这是示例代码,它没有用于 RabbitMQ 和 cassandra 的代码,但存在相同的性能问题。

// Topology Class
public class SimpleTopology {

public static void main(String[] args) throws InterruptedException {
    System.out.println("hiiiiiiiiiii");
    TopologyBuilder topologyBuilder = new TopologyBuilder();
    topologyBuilder.setSpout("SimpleSpout", new SimpleSpout());
    topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 2).setNumTasks(4).shuffleGrouping("SimpleSpout");

    Config config = new Config();
    config.setDebug(true);
    config.setNumWorkers(2);

    LocalCluster localCluster = new LocalCluster();
    localCluster.submitTopology("SimpleTopology", config, topologyBuilder.createTopology());

    Thread.sleep(2000);
}

}

// Simple Bolt 
public class SimpleBolt implements IRichBolt{

private OutputCollector outputCollector;

public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
    this.outputCollector = oc;
}

public void execute(Tuple tuple) {
    this.outputCollector.ack(tuple);
}

public void cleanup() {
    // TODO
}

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    // TODO
}

public Map<String, Object> getComponentConfiguration() {
    return null;
}

}

// Simple Spout

public class SimpleSpout implements IRichSpout{

private SpoutOutputCollector spoutOutputCollector;
private boolean  completed = false;
private static int i = 0;

public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {      
    this.spoutOutputCollector = soc;
}

public void close() {
    // Todo
}

public void activate() {
    // Todo
}

public void deactivate() {
    // Todo
}

public void nextTuple() {
    if(!completed)
    {
        if(i < 100000)
        {
            String item = "Tag" + Integer.toString(i++);
            System.out.println(item);
            this.spoutOutputCollector.emit(new Values(item), item);
        }
        else
        {  
            completed = true;
        }
    }
    else
    {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException ex) {
            Logger.getLogger(SimpleSpout.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

public void ack(Object o) {
    System.out.println("\n\n OK : " + o);
}

public void fail(Object o) {
    System.out.println("\n\n Fail : " + o);
}

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declare(new Fields("word"));
}

public Map<String, Object> getComponentConfiguration() {
    return null;
}

}

更新: 是否有可能使用 shuffle 分组相同的元组将被多次处理?使用的配置(spouts = 4. bolts = 4),现在的问题是,随着螺栓数量的增加,性能正在下降。

【问题讨论】:

  • 您尝试过的配置是什么,您可以发布一些代码吗?你究竟是如何从 RabbitMQ 阅读的?
  • Our data source generates 10000 messages per second .. 你这么说是因为nextTuple 方法中的if(i &lt; 100000) 语句吗?

标签: performance apache-storm


【解决方案1】:

您应该找出这里的瓶颈——RabbitMQ 或 Cassandra。打开 Storm UI 并查看每个组件的延迟时间。

如果增加并行度没有帮助(通常应该),那么 RabbitMQ 或 Cassandra 肯定有问题,所以你应该关注它们。

【讨论】:

  • 是的,RabbitMq 正在减慢速度
【解决方案2】:

我们成功地使用了 RabbitMQ 和 Storm。结果存储在不同的数据库中,但无论如何。我们先在 Spout 中使用 basic_get,性能很差,后来改用 basic_consume,性能其实很好。所以看看你如何使用来自 Rabbit 的消息。 一些重要因素:

  • basic_consume 代替 basic_get
  • prefetch_count(使其足够高)
  • 如果您想提高性能,并且不关心丢失消息 - 不要确认消息并将 delivery_mode 设置为 1。

【讨论】:

  • 有一个问题?,您在单个节点上观察到的正常元组通过率是多少?
  • 这取决于,就像我说的很多事情都会影响它。在我们的场景中,我们有一个 CPU 密集型螺栓,它会进行大量计算。
【解决方案3】:

在您的代码中,每次调用 nextTuple() 时您只会发出一个元组。尝试每次调用发出更多元组。

类似:

public void nextTuple() {

    int max = 1000;
    int count = 0;
    GetResponse response = channel.basicGet(queueName, autoAck);
    while ((response != null) && (count < max)) {

        // process message

        spoutOutputCollector.emit(new Values(item), item);

        count++;
        response = channel.basicGet(queueName, autoAck);
    }

    try { Thread.sleep(2000); } catch (InterruptedException ex) {
}

【讨论】:

  • 你能分享一些例子吗?我试过但无法弄清楚。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-02-22
  • 1970-01-01
  • 1970-01-01
  • 2017-03-17
  • 1970-01-01
  • 1970-01-01
  • 2017-04-15
相关资源
最近更新 更多