【发布时间】:2019-12-28 20:20:49
【问题描述】:
您好,我正在使用 apache-storm 创建一个拓扑,其中我的 Spout 正在从 Kakfa 主题收集数据并将其发送到螺栓。
我正在对元组进行一些验证,并再次为其他螺栓发出流。
现在的问题是我的第二个螺栓使用第一个螺栓的流有一个重载方法prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector)
每隔 2 秒执行一次。
拓扑代码是
topologyBuilder.setBolt("abc",new ValidationBolt()).shuffleGrouping(configurations.SPOUT_ID);
topologyBuilder.setBolt("TEST",new TestBolt()).shuffleGrouping("abc",Utils.VALIDATED_STREAM);
第一个螺栓“abc”的代码是
@Override
public void execute(Tuple tuple) {
String document = String.valueOf(tuple.getValue(4));
if (Utils.isJSONValid(document)) {
outputCollector.emit(Utils.VALIDATED_STREAM,new Values(document));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream(Utils.VALIDATED_STREAM,new Fields("document"));
}
我在搜索的时候发现了
The prepare method is called when the bolt is initialised and is
similar to the open method in spout. It is called only once for the bolt.
It gets the configuration for the bolt and also the context of the bolt.
The collector is used to emit or output the tuples from this bolt.
链接到公共要点以获取日志 Storm topology log
【问题讨论】:
-
工人崩溃了吗? Storm 每个bolt任务只会调用一次prepare,但是如果worker崩溃重启,当然会再次调用。
-
@Stig 日志中没有异常。但是不断地获取一些日志。附加有问题的日志。
标签: java apache-kafka apache-zookeeper apache-storm