【问题标题】:Storm - Conditionally consuming stream from kafka spout?Storm - 有条件地消耗来自 kafka spout 的流?
【发布时间】:2015-10-14 02:00:24
【问题描述】:

我有一个将 json 发布到 Kafka 实例的场景。然后我使用 Kafka Spout 将流发射到螺栓。

但现在我想在我的 json 消息中添加额外的字段(称为 x)。如果xa 我希望它被boltA 消费,如果xb 我希望它被boltB 消费。

有没有办法根据流的内容将流引导到正确的螺栓?

【问题讨论】:

    标签: apache-kafka apache-storm


    【解决方案1】:

    最简单的方法应该是添加一个SplitBolt,它从KafkaSpout 消费,评估字段x,并转发到不同的输出流:

    public class SplitBolt extends BaseRichBolt {
      OutputCollector collector;
    
      public void prepare(...) {
        this.collector = collector;
      }
    
      public void execute(Tuple input) {
        Object x = ... // get field x from input
        String streamId;
        if(x == a) {
          streamId = "stream-xa";
        } else { // x == b
          streamId = "stream-xb";
        }
        collector.emit(streamId, input, input.getValues());
      }
    
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        Fields schema = new Fields(...)
        declarer.declareStream("stream-xa", schema);
        declarer.declareStream("stream-xy", schema);
      }
    }
    

    在构建拓扑时,将BoltA 连接到“stream-xa”,将BoltB 连接到“stream-xb”:

    TopologyBuilder b = new TopologyBuilder();
    b.setSpout("spout", new KafkaSpout(...));
    b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
    b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
    b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");
    

    作为替代方案,也应该可以从KafkaSpout 继承并直接发送到两个不同的流。但是,要正确编写代码比较棘手。

    【讨论】:

    • 嗨@Matthias J. Sax,这也可以通过阅读kafka的新主题来完成。说 topicXAtopicXB 并使用新的 kafkaSpout 创建一个新的拓扑。这样做有什么缺点?以任何方式创建一个新的拓扑是一种开销吗?
    • 嗯,Kafka 集群会有读写和存储开销。如果您想多次阅读这些主题(即使用不同的应用程序),那么将数据再次放入 Kafka 可能是有意义的。否则,我认为在 Kafka 中复制数据不会有很大的好处。
    猜你喜欢
    • 2015-03-13
    • 2017-02-23
    • 2018-11-03
    • 2016-11-12
    • 1970-01-01
    • 2017-08-24
    • 2020-05-01
    • 2013-06-24
    • 2013-11-11
    相关资源
    最近更新 更多