【发布时间】:2016-04-21 11:45:25
【问题描述】:
用于测试的组件是
Kafka Procucer从机器读取文件,文件由1000行组成。
String sCurrentLine;
br = new BufferedReader(new FileReader("D:\\jsonLogTest.txt"));
while ((sCurrentLine = br.readLine()) != null) {
//System.out.println(sCurrentLine);
KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sCurrentLine);
producer.send(message);
}
Storm Consumer 使用三个 Bolt,BoltOne 应该接收流并将其分成两个不同的流(Stream1 和 Stream2)。 BoltTwo 和 BoltThree 应该订阅这些 Streams。 (简而言之,我希望在 BoltOne parley 中处理元组,例如 Bolt2 处理前 500 行,BoltThree 处理后 500 行。
拓扑
builder.setSpout("line-reader-spout",kafkaSpout,1);
builder.setBolt("bolt-one", new BoltOne(),1).shuffleGrouping("line-reader-spout");
builder.setBolt("bolt-two", new BoltTwo(),1).shuffleGrouping("bolt-one","stream1");
builder.setBolt("bolt-three", new BoltThree(),1).shuffleGrouping("bolt-one","stream2");
BoltOne
collector.emit("stream1", new Values(input.getString(0)));
collector.emit("stream2", new Values(input.getString(0)));
x++;System.out.println("" + x);
collector.ack(input);
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// TODO Auto-generated method stub
outputFieldsDeclarer.declareStream("stream1", new Fields("field1"));
outputFieldsDeclarer.declareStream("stream2", new Fields("field2"));
}
螺栓二和螺栓三
public void execute(Tuple input) {
String sentence = input.getString(0);
System.out.println("*********B2*************");
}
堆栈跟踪
*********B2*************
1
*********B3*************
2
*********B2*************
*********B3*************
3
*********B3*************
*********B2*************
4
*********B3*************
*********B2*************
5
*********B2*************
*********B3*************
6
*********B2*************
*********B3*************
7
*********B3*************
*********B2*************
完全混淆了拆分流和并行性。示例会有所帮助。
更新的解决方案我现在想出了:
public void execute(Tuple input) {
@SuppressWarnings("unused")
String sentence = input.getString(0);
if (x%2==0) {
collector.emit("stream1", new Values(input.getString(0)));
}
else{
collector.emit("stream2", new Values(input.getString(0)));
}
x++;
collector.ack(input);
}
我只是在奇偶基础上划分流,处理时间变成了一半,而BoltTwo处理一个元组,另一个由BoltThree处理。
【问题讨论】:
-
你看透了吗this?
-
通过某种方式实现了它: public void execute(Tuple input) { @SuppressWarnings("unused") String sentence = input.getString(0); if (x%2==0) { collector.emit("stream1", new Values(input.getString(0))); } else{ collector.emit("stream2", new Values(input.getString(0))); } x++;收集器.ack(输入); }
标签: multithreading apache-kafka apache-storm