【问题标题】:Spring Cloud Stream Kafka send messageSpring Cloud Stream Kafka 发送消息
【发布时间】:2024-05-26 01:25:01
【问题描述】:

如何使用新的 Spring Cloud Stream Kafka 功能模型发送消息?

不推荐使用的方式如下所示。

public interface OutputTopic {

    @Output("output")
    MessageChannel output();
}

@Autowired
OutputTopic outputTopic;

public someMethod() {
    
    outputTopic.output().send(MessageBuilder.build());
}

但是我怎样才能以函数式风格发送消息呢?

application.yml

spring:
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-out-0:
          destination: output
          binder: kafka
@Configuration
public class Configuration {

    @Bean
    Supplier<Message<String>> process() {
        return () -> {
            return MessageBuilder.withPayload("foo")
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
        };
    }

我会自动装配一个 MessageChannel,但没有 MessageChannel-Bean 用于进程、process-out-0、输出或类似的东西。或者我可以用供应商 Bean 发送消息吗? 有人可以给我一个例子吗? 非常感谢!

【问题讨论】:

    标签: spring-boot functional-programming spring-kafka spring-cloud-stream-binder-kafka


    【解决方案1】:

    您可以使用StreamBridge 或reactor API - 请参阅Sending arbitrary data to an output (e.g. Foreign event-driven sources)

    【讨论】:

    • EmitterProcessor 已被弃用。你知道最新的使用方法吗?
    • Gary 提到的StreamBridge 是推荐的方式。我们将更新文档以删除对EmitterProcessor 的任何引用,