【问题标题】:Spring Cloud Stream dynamic channelsSpring Cloud Stream 动态通道
【发布时间】:2017-05-31 05:41:50
【问题描述】:

我正在使用 Spring Cloud Stream 并希望以编程方式创建和绑定频道。我的用例是在应用程序启动期间,我收到要订阅的 Kafka 主题的动态列表。然后如何为每个主题创建一个频道?

【问题讨论】:

  • 您可以在这里查看类似问题的答案:stackoverflow.com/questions/40485421/…
  • 这个答案是针对传出消息的。我需要传入的:(
  • 你找到答案了吗?我有同样的问题。如果你能指出我正确的方向,那就太好了。谢谢
  • @CCC,不,我没有。我的要求已经改变,所以对我来说不再是问题。

标签: spring-boot apache-kafka spring-cloud-stream


【解决方案1】:

我最近遇到了类似的情况,下面是我动态创建订阅者频道的示例。

    ConsumerProperties consumerProperties = new ConsumerProperties();
    consumerProperties.setMaxAttempts(1); 
    BindingProperties bindingProperties = new BindingProperties();
    bindingProperties.setConsumer(consumerProperties);
    bindingProperties.setDestination(retryTopic);
    bindingProperties.setGroup(consumerGroup);

    bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
    SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
    beanFactory.registerSingleton(consumerName, channel);
    channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
    bindingService.bindConsumer(channel, consumerName);
    channel.subscribe(consumerMessageHandler);

【讨论】:

  • 你能分享完整的源代码吗?
  • @sash,请告诉您在哪里找到此代码?对你有用吗?
  • @YanKhonski 抱歉,但我已经没有实际的源代码了 :( 在调试并了解如何创建消费者之后,我确实写了上面的内容。我会在时间允许时尝试重新创建它。跨度>
  • 当然,没问题,我解决了它并发布了我的解决方案。无论如何,如果你还记得,请分享。
【解决方案2】:

我必须为Camel Spring Cloud Stream 组件做类似的事情。 也许绑定目的地的消费者代码“真的只是一个String 指示频道名称”对您有用吗?

在我的例子中,我只绑定一个目的地,但我不认为它在概念上对于多个目的地有很大不同。

以下是它的要点:

    @Override
    protected void doStart() throws Exception {
        SubscribableChannel bindingTarget = createInputBindingTarget();
        bindingTarget.subscribe(message -> {
            // have your way with the received incoming message
        });

        endpoint.getBindingService().bindConsumer(bindingTarget,
                endpoint.getDestination());

       // at this point the binding is done
    }

    /**
     * Create a {@link SubscribableChannel} and register in the
     * {@link org.springframework.context.ApplicationContext}
     */
    private SubscribableChannel createInputBindingTarget() {
        SubscribableChannel channel = endpoint.getBindingTargetFactory()
                .createInputChannel(endpoint.getDestination());
        endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
        channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
                endpoint.getDestination());
        return channel;
    }

请参阅here 了解完整来源以了解更多上下文。

【讨论】:

    【解决方案3】:

    我有一个任务,我事先不知道主题。我通过一个输入通道来解决这个问题,该通道可以监听我需要的所有主题。

    https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/_configuration_options.html

    目的地

    绑定中间件上通道的目标目的地(例如,RabbitMQ 交换或 Kafka 主题)。如果通道绑定为消费者,它可以绑定到多个目的地,并且目的地名称可以指定为逗号分隔的字符串值。如果未设置,则使用频道名称。

    所以我的配置

    spring:
      cloud:
        stream:
          default:
            consumer:
              concurrency: 2
              partitioned: true
          bindings:
            # inputs
            input:
              group: application_name_group
              destination: topic-1,topic-2
              content-type: application/json;charset=UTF-8
    

    然后我定义了一个消费者来处理来自所有这些主题的消息。

    @Component
    @EnableBinding(Sink.class)
    public class CommonConsumer {
    
        private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);
    
        @StreamListener(target = Sink.INPUT)
        public void consumeMessage(final Message<Object> message) {
            logger.info("Received a message: \nmessage:\n{}", message.getPayload());
            // Here I define logic which handles messages depending on message headers and topic.
            // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
        }
    }
    

    请注意,在您的情况下,它可能不是解决方案。我需要将消息转发到 webhook,所以我可以进行配置映射。

    我也想过其他的想法。 1) 你没有 Spring Cloud 的 kafka 客户端消费者。

    2) 创建预定义数量的输入,例如 50 个。

    input-1
    intput-2
    ...
    intput-50
    

    然后对其中一些输入进行配置。

    相关讨论

    我们使用 Spring Cloud 2.1.1 RELEASE

    【讨论】:

      【解决方案4】:
      MessageChannel messageChannel = createMessageChannel(channelName);
      messageChannel.send(getMessageBuilder().apply(data));
      
      public MessageChannel createMessageChannel(String channelName) {
      return (MessageChannel) applicationContext.getBean(channelName);}
      
      public Function<Object, Message<Object>> getMessageBuilder() {
      return payload -> MessageBuilder
      .withPayload(payload)
      .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
      .build();}
      

      【讨论】:

      • 请不要只发布代码作为答案,还要解释您的代码的作用以及它如何解决问题的问题。带有解释的答案通常更有帮助、质量更好,并且更有可能吸引投票。
      【解决方案5】:

      对于传入的消息,您可以显式使用BinderAwareChannelResolver 来动态解析目的地。你可以检查这个example router sink 使用 binder 感知通道解析器。

      【讨论】:

      • 我不明白。我想订阅我只在运行时才知道名称的主题。我不想发送/路由消息。
      • 好的,对不起;我误解了。 dynamic 目标支持仅用于绑定生产者。我相信这个功能还有待解决和跟踪:github.com/spring-cloud/spring-cloud-stream/issues/746
      • @IlayaperumalGopinathan,你知道这个问题是否得到解决?
      猜你喜欢
      • 2020-07-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-27
      • 2021-07-30
      • 1970-01-01
      • 2021-06-07
      • 2022-07-20
      相关资源
      最近更新 更多