【问题标题】:multi threading storm Bolt parallelism多线程风暴 Bolt 并行
【发布时间】:2016-04-21 11:45:25
【问题描述】:

用于测试的组件是

Kafka Procucer从机器读取文件,文件由1000行组成。

        String sCurrentLine;
        br = new BufferedReader(new FileReader("D:\\jsonLogTest.txt"));
        while ((sCurrentLine = br.readLine()) != null) {
            //System.out.println(sCurrentLine);
            KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sCurrentLine);
            producer.send(message);
        }   

Storm Consumer 使用三个 Bolt,BoltOne 应该接收流并将其分成两个不同的流(Stream1 和 Stream2)。 BoltTwo 和 BoltThree 应该订阅这些 Streams。 (简而言之,我希望在 BoltOne parley 中处理元组,例如 Bolt2 处理前 500 行,BoltThree 处理后 500 行。

拓扑

builder.setSpout("line-reader-spout",kafkaSpout,1);
        builder.setBolt("bolt-one", new BoltOne(),1).shuffleGrouping("line-reader-spout");
        builder.setBolt("bolt-two", new BoltTwo(),1).shuffleGrouping("bolt-one","stream1");
        builder.setBolt("bolt-three", new BoltThree(),1).shuffleGrouping("bolt-one","stream2");

BoltOne

collector.emit("stream1", new Values(input.getString(0)));
            collector.emit("stream2", new Values(input.getString(0)));
        x++;System.out.println("" + x);
        collector.ack(input);

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        // TODO Auto-generated method stub
        outputFieldsDeclarer.declareStream("stream1", new Fields("field1"));
        outputFieldsDeclarer.declareStream("stream2", new Fields("field2"));
    }

螺栓二和螺栓三

public void execute(Tuple input) {
        String sentence = input.getString(0);
        System.out.println("*********B2*************");

    }

堆栈跟踪

*********B2*************
1
*********B3*************
2
*********B2*************
*********B3*************
3
*********B3*************
*********B2*************
4
*********B3*************
*********B2*************
5
*********B2*************
*********B3*************
6
*********B2*************
*********B3*************
7
*********B3*************
*********B2*************

完全混淆了拆分流和并行性。示例会有所帮助。

更新的解决方案我现在想出了:

public void execute(Tuple input) {
        @SuppressWarnings("unused")
        String sentence = input.getString(0);
        if (x%2==0) {
            collector.emit("stream1", new Values(input.getString(0)));
        }
        else{
            collector.emit("stream2", new Values(input.getString(0)));
        }

        x++;
        collector.ack(input);
    }

我只是在奇偶基础上划分流,处理时间变成了一半,而BoltTwo处理一个元组,另一个由BoltThree处理。

【问题讨论】:

  • 你看透了吗this
  • 通过某种方式实现了它: public void execute(Tuple input) { @SuppressWarnings("unused") String sentence = input.getString(0); if (x%2==0) { collector.emit("stream1", new Values(input.getString(0))); } else{ collector.emit("stream2", new Values(input.getString(0))); } x++;收集器.ack(输入); }

标签: multithreading apache-kafka apache-storm


【解决方案1】:

我猜你使用LocalCluster 运行所有东西。由于有多个线程在运行,通过println(...) 的输出不同步,内部缓冲可能会弄乱输出的顺序......因此,您看到的内容不可靠 - 顺序仅保留在单个喷口中/螺栓。

另外,你想得到什么行为?

现在,你有

Spout => Bolt1 =+=> Bolt2
                +=> Bolt3

即,Bolt1 的输出被复制,Bolt2 和 Bolt3 都接收到 Bolt1 的所有输出元组。因此,Bolt1 从 1 计数到 7,并且 Bolt1 的每个输出元组都会触发 Bolt2 和 Bolt3 的execute()

由于 Bolt2 和 Bolt3 做同样的事情,我猜你想拥有同一个 Bolt 的两个副本并将输入分区到两者。为此,您只需添加一个螺栓并将并行度设置为 2:

builder.setSpout("line-reader-spout",kafkaSpout,1);
builder.setBolt("bolt-one", new BoltOne(),1).shuffleGrouping("line-reader-spout");
builder.setBolt("bolt-two", new BoltTwo(),2).shuffleGrouping("bolt-one","stream1");

此外,Bolt1 只需要声明一个输出流(而不是两个)。如果您声明多个输出流并写入两者,则复制数据...

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-08-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多