【问题标题】:Why Kafka Streams don't work without @StreamListener?为什么没有@StreamListener Kafka Streams 不能工作?
【发布时间】:2021-04-25 17:57:43
【问题描述】:

@StreamListener 现在已被弃用,所以我尝试以功能方式重写我的消费者。

Java 代码

@Component
public class MyConsumer {
    public void handleMyMethod(MessageEntity message) {
        ...
    }
}


@RequiredArgsConstructor
@Configuration
public class MyConsumerConfiguration {
    private final MyConsumer myConsumer;

    @Bean
    public Consumer<MessageEntity> myMethod() {
        return myConsumer::handleMyMethod;
    }
}

application.yaml

spring.cloud :
    stream :
        kafka.binder.brokers :
            - "127.0.0.1:9092"
        function.definition : myMethod
        bindings :
            myMethod-in-0 :
                destination : service.method.update

测试

@Test
void handleMyMethod() {
    MessageEntity message = new MessageEntity();
    template.send("service.method.update", message);

    await()
        .atMost(6, TimeUnit.SECONDS)
        .untilAsserted(() -> {
            verify(myConsumer, atLeastOnce()).handleMyMethod(entityCaptor.capture());

            MessageEntity entity = entityCaptor.getValue();
            assertNotNull(entity);
        });
}

运行测试。消息传到 Kafka(我在 Kafka Tool 中看到),但 MyConsumer 没有收到它。测试失败并出现错误:

myConsumer.handleMethod(
    <Capturing argument>
);
-> at com.my.project.MyConsumer.handleMethod(MyConsumer.java:33)
Actually, there were zero interactions with this mock.```

【问题讨论】:

    标签: java spring apache-kafka apache-kafka-streams spring-kafka


    【解决方案1】:

    我发现了问题。我的 util 模块,集成在我上面描述的模块中,有自己的“function.definition”属性。由于上面的模块是在 util 模块之前配置的,因此 util 模块会从顶级模块中删除方法。我是这样解决问题的:

    @Autowired
    StreamFunctionProperties streamFunctionProperties;
    
    ...
    
    String definitionString = StringUtils.hasText(streamFunctionProperties.getDefinition()) ?
        streamFunctionProperties.getDefinition() + ";" :
        "";
    streamFunctionProperties.setDefinition(definitionString + "methodDeleteOut;methodUpdateOut;roleUpdateIn;roleDeleteIn");
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-05-25
      • 1970-01-01
      • 1970-01-01
      • 2020-01-12
      • 1970-01-01
      • 1970-01-01
      • 2018-06-25
      • 2019-09-07
      相关资源
      最近更新 更多