【问题标题】:How to send output of two different Spout to the same Bolt?如何将两个不同 Spout 的输出发送到同一个 Bolt?
【发布时间】:2015-10-29 08:08:14
【问题描述】:

我有两个 Kafka Spout,我想将它们的值发送到同一个 Bolt。

有可能吗?

【问题讨论】:

    标签: java apache-kafka apache-storm


    【解决方案1】:

    是的,这是可能的。您可以让任何喷口与同一个螺栓通信。 请参阅https://storm.apache.org/documentation/Tutorial.html“流”部分。

    【讨论】:

      【解决方案2】:

      是的,有可能:

      TopologyBuilder b = new TopologyBuilder();
      b.setSpout("topic_1", new KafkaSpout(...));
      b.setSpout("topic_2", new KafkaSpout(...));
      b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");
      

      您也可以使用任何其他分组。

      更新:

      为了区分consumer bolt中的元组(即topic_1或topic_2),有两种可能:

      1) 您可以使用操作员 ID(如 @user-4870385 建议的那样):

      if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
          //do something
      } else {
          //do something
      }
      

      2)您可以使用流名称(如@zenbeni 建议的那样)。对于这种情况,两个 spout 都需要声明命名流,并且 bolt 需要通过流名称连接到 spout:

      public class MyKafkaSpout extends KafkaSpout {
        final String streamName;
      
        public MyKafkaSpout(String stream) {
          this.streamName = stream;
        }
      
        // other stuff omitted
      
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
          // compare KafkaSpout.declareOutputFields(...)
          declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
        }
      }
      

      构建拓扑,现在需要使用流名:

      TopologyBuilder b = new TopologyBuilder();
      b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
      b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
      b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");
      

      MyBolt 中,流名称现在可用于区分输入元组:

      // in my MyBolt.execute():
      if(input.getSourceStreamId().equals("Topic1")) {
        // do something
      } else {
        // do something
      }
      

      讨论:

      虽然使用流名称的 second 方法更自然(根据@zenbeni),但 first 更灵活(IHMO)。流名称由 spout/bolt 直接声明(即在编写 spout/bolt 代码时);相反,操作员 ID 是在拓扑组合在一起时分配的(即,在 使用 喷口/螺栓时)。

      假设我们将三个螺栓作为类文件(没有源代码)。前两个应该用作生产者,并且都声明具有相同名称的输出流。如果第三个消费者通过流区分输入元组,这将不起作用。即使两个给定的生产者 Bolt 声明了不同的输出流名称,预期的输入流名称也可能在消费者 Bolt 中硬编码并且可能不匹配。因此,它也不起作用。但是,如果消费者 bolt 使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配预期的组件 ID。

      当然,可以从给定的类继承(如果未声明 final 并覆盖 declareOutputFields(...) 以分配自己的流名称。但是,这需要做更多的额外工作。

      【讨论】:

      • 是的,过了一会儿就明白了。谢谢!!
      • if(input.getSourceComponent().equalsIgnoreCase(topic_1)) { //do something } else { //do something } 在螺栓中添加这个以将源组件与元组分开
      • @user-4870385 我相信这是一种反模式,bolts 不应该知道元组来自哪里,它增加了复杂性和可维护性。您可以使用流,因为它比组件名称更抽象。
      • @zenbeni 我认为使用流名称而不是组件 ID 来区分输入元组没有任何优势。请查看我的扩展答案(欢迎提供反馈)。
      • @user-4870385 如果你想重构整体并在这两者之间添加一个新的螺栓,使用流而不是组件ID是更好的耦合,否则它是紧密耦合的,你会必须更改代码。只有拓扑应该知道哪个螺栓链接到另一个螺栓。
      猜你喜欢
      • 2015-08-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-19
      • 2015-06-02
      • 2017-11-12
      • 2016-04-01
      • 2013-05-08
      相关资源
      最近更新 更多