【问题标题】:spring-kafka Request Reply: Different Types for Request and Replyspring-kafka 请求回复:请求和回复的不同类型
【发布时间】:2018-11-05 09:54:21
【问题描述】:

提供 Request-Reply 支持的 ReplyingKafkaTemplate 的文档(在 Spring-Kafka 2.1.3 中引入)建议可以将不同的类型用于请求和回复:

ReplyingKafkaTemplate<K, V, R>

其中参数化类型 K 指定消息密钥,V 指定值(即请求),而 R 指定回复。

到目前为止一切顺利。但是用于实现服务器端 Request-Reply 的相应支持类似乎不支持 V、R 的不同类型。文档建议使用带有添加的 @SendTo 注释的 KafkaListener,它在幕后使用 MessageListenerContainer 上配置的 replyTemplate .但是 AbstractKafkaListenerEndpoint 只支持单一类型的监听器以及replyTemplate:

public abstract class AbstractKafkaListenerEndpoint<K, V>
        implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean {

    ...

    /**
     * Set the {@link KafkaTemplate} to use to send replies.
     * @param replyTemplate the template.
     * @since 2.0
     */
    public void setReplyTemplate(KafkaTemplate<K, V> replyTemplate) {
        this.replyTemplate = replyTemplate;
    }

    ...

}

因此 V 和 R 需要是同一类型。

文档中使用的示例确实对请求和回复都使用了字符串。

我是否遗漏了什么,或者这是 Spring-Kafka Request-Reply 支持中的设计缺陷,应该报告和纠正?

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    这是fixed in the 2.2 release

    对于早期版本,只需注入原始 KafkaTemplate(没有泛型)。

    编辑

    @SpringBootApplication
    public class So53151961Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So53151961Application.class, args);
        }
    
        @KafkaListener(id = "so53151961", topics = "so53151961")
        @SendTo
        public Bar handle(Foo foo) {
            System.out.println(foo);
            return new Bar(foo.getValue().toUpperCase());
        }
    
        @Bean
        public ReplyingKafkaTemplate<String, Foo, Bar> replyingTemplate(ProducerFactory<String, Foo> pf,
                ConcurrentKafkaListenerContainerFactory<String, Bar> factory) {
    
            ConcurrentMessageListenerContainer<String, Bar> replyContainer =
                    factory.createContainer("so53151961-replyTopic");
            replyContainer.getContainerProperties().setGroupId("so53151961.reply");
            ReplyingKafkaTemplate<String, Foo, Bar> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(pf, replyContainer);
            return replyingKafkaTemplate;
        }
    
        @Bean
        public KafkaTemplate<String, Bar> replyTemplate(ProducerFactory<String, Bar> pf,
                ConcurrentKafkaListenerContainerFactory<String, Bar> factory) {
    
            KafkaTemplate<String, Bar> kafkaTemplate = new KafkaTemplate<>(pf);
            factory.setReplyTemplate(kafkaTemplate);
            return kafkaTemplate;
        }
    
        @Bean
        public ApplicationRunner runner(ReplyingKafkaTemplate<String, Foo, Bar> template) {
            return args -> {
                ProducerRecord<String, Foo> record = new ProducerRecord<>("so53151961", null, "key", new Foo("foo"));
                RequestReplyFuture<String, Foo, Bar> future = template.sendAndReceive(record);
                System.out.println(future.get(10, TimeUnit.SECONDS).value());
            };
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("so53151961", 1, (short) 1);
        }
    
        @Bean
        public NewTopic reply() {
            return new NewTopic("so53151961-replyTopic", 1, (short) 1);
        }
    
        public static class Foo {
    
            public String value;
    
            public Foo() {
                super();
            }
    
            public Foo(String value) {
                this.value = value;
            }
    
            public String getValue() {
                return this.value;
            }
    
            public void setValue(String value) {
                this.value = value;
            }
    
            @Override
            public String toString() {
                return "Foo [value=" + this.value + "]";
            }
    
        }
    
        public static class Bar {
    
            public String value;
    
            public Bar() {
                super();
            }
    
            public Bar(String value) {
                this.value = value;
            }
    
            public String getValue() {
                return this.value;
            }
    
            public void setValue(String value) {
                this.value = value;
            }
    
            @Override
            public String toString() {
                return "Bar [value=" + this.value + "]";
            }
    
        }
    
    }
    
    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.properties.spring.json.trusted.packages=com.example
    

    结果

    Foo [value=foo]
    Bar [value=FOO]
    

    【讨论】:

    • 确认在 2.2 中修复。谢谢!
    • 有人可以提供一个示例代码吗?
    • 我在答案中添加了一个示例。
    • kafka 主题中的消息将是 Foo 对象的实例吗?可以对 Topic1 产生 Foo 对象并从 Topic2 消费 Bar 吗? @加里罗素
    • 不要在 cmets 中就旧答案提出新问题。您的意思完全不清楚;提出一个新问题,提供更多细节。
    猜你喜欢
    • 2021-01-14
    • 2020-10-24
    • 2023-02-07
    • 2018-05-13
    • 2016-11-24
    • 2014-08-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多