【发布时间】:2015-10-14 02:00:24
【问题描述】:
我有一个将 json 发布到 Kafka 实例的场景。然后我使用 Kafka Spout 将流发射到螺栓。
但现在我想在我的 json 消息中添加额外的字段(称为 x)。如果x 是a 我希望它被boltA 消费,如果x 是b 我希望它被boltB 消费。
有没有办法根据流的内容将流引导到正确的螺栓?
【问题讨论】:
我有一个将 json 发布到 Kafka 实例的场景。然后我使用 Kafka Spout 将流发射到螺栓。
但现在我想在我的 json 消息中添加额外的字段(称为 x)。如果x 是a 我希望它被boltA 消费,如果x 是b 我希望它被boltB 消费。
有没有办法根据流的内容将流引导到正确的螺栓?
【问题讨论】:
最简单的方法应该是添加一个SplitBolt,它从KafkaSpout 消费,评估字段x,并转发到不同的输出流:
public class SplitBolt extends BaseRichBolt {
OutputCollector collector;
public void prepare(...) {
this.collector = collector;
}
public void execute(Tuple input) {
Object x = ... // get field x from input
String streamId;
if(x == a) {
streamId = "stream-xa";
} else { // x == b
streamId = "stream-xb";
}
collector.emit(streamId, input, input.getValues());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
Fields schema = new Fields(...)
declarer.declareStream("stream-xa", schema);
declarer.declareStream("stream-xy", schema);
}
}
在构建拓扑时,将BoltA 连接到“stream-xa”,将BoltB 连接到“stream-xb”:
TopologyBuilder b = new TopologyBuilder();
b.setSpout("spout", new KafkaSpout(...));
b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");
作为替代方案,也应该可以从KafkaSpout 继承并直接发送到两个不同的流。但是,要正确编写代码比较棘手。
【讨论】:
topicXA 和 topicXB 并使用新的 kafkaSpout 创建一个新的拓扑。这样做有什么缺点?以任何方式创建一个新的拓扑是一种开销吗?