【发布时间】:2018-09-12 03:54:40
【问题描述】:
我们正在使用 Spring Cloud Stream 构建微服务。
我们运行了这段代码,它基本上从 Tasks 频道订阅消息,带有特定的 type 标头,执行一个作业,然后发布一个新消息到活动频道:
@EnableBinding({InboundChannels.class, OutboundChannels.class})
public class TasksProcessor {
private final UserService userService;
private final MessageChannel eventsChannel;
private final Logger logger;
public TasksProcessor(
UserService userService,
@Qualifier(OutboundChannels.EVENTS) MessageChannel eventsChannel,
Logger logger
) {
this.userService = userService;
this.eventsChannel = eventsChannel;
this.logger = logger;
}
@StreamListener(value = TASKS, condition = "headers['type']=='" + CREATE_USER + "'")
public void createUser(User user) {
userService.save(user)
.subscribe(created -> {
Message<User> successMessage = Events.create(Events.USER_CREATED, created).build();
eventsChannel.send(successMessage);
});
}
}
我想知道是否有更好、更被动的方式来利用spring-cloud-stream-reactive API 实现它。
尝试之后:
@StreamListener(value = TASKS, condition = "headers['type']=='" + CREATE_USER + "'")
public @Output(EVENTS) Flux<Message<User>> createUser(Flux<User> users) {
return users.flatMap(userService::save)
.map(e -> Events.create(Events.USER_CREATED, created).build());
}
我在应用程序启动时收到此错误:
Caused by: java.lang.IllegalArgumentException: Cannot set a condition for methods that return a value
at org.springframework.util.Assert.isTrue(Assert.java:116)
at org.springframework.cloud.stream.binding.StreamListenerMethodUtils.validateStreamListenerMethod(StreamListenerMethodUtils.java:86)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$DefaultStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(StreamListenerAnnotationBeanPostProcessor.java:352)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167)
at java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:777)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:868)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:388)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:138)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
... 49 more
所以我尝试了一种更通用的方法:
@StreamListener(value = TASKS)
public @Output(EVENTS) Flux<Message<?>> reducer(Flux<Message<?>> messages) {
return messages
.filter(message -> message.getHeaders().containsKey(TYPE_HEADER))
.flatMap(message -> {
final String type = (String) message.getHeaders().get(TYPE_HEADER);
switch (type) {
case CREATE_USER:
final User toCreate = (User) message.getPayload();
return userService.save(toCreate)
.map(created -> Events.create(USER_CREATED, created).build());
default:
return null;
}
});
}
但这也不起作用,因为message.getPayload() 是byte[] 类型,我不知道如何解析它。
【问题讨论】:
标签: spring spring-cloud spring-cloud-stream project-reactor