【问题标题】:How to refactor this piece of code into a more reactive one如何将这段代码重构为更具反应性的代码
【发布时间】:2018-09-12 03:54:40
【问题描述】:

我们正在使用 Spring Cloud Stream 构建微服务。

我们运行了这段代码,它基本上从 Tasks 频道订阅消息,带有特定的 type 标头,执行一个作业,然后发布一个新消息到活动频道:

@EnableBinding({InboundChannels.class, OutboundChannels.class})
public class TasksProcessor {

private final UserService userService;
private final MessageChannel eventsChannel;
private final Logger logger;

public TasksProcessor(
        UserService userService,
        @Qualifier(OutboundChannels.EVENTS) MessageChannel eventsChannel,
        Logger logger
) {
    this.userService = userService;
    this.eventsChannel = eventsChannel;
    this.logger = logger;
}

    @StreamListener(value = TASKS, condition = "headers['type']=='" + CREATE_USER + "'")
    public void createUser(User user)  {
        userService.save(user)
            .subscribe(created -> {
                Message<User> successMessage = Events.create(Events.USER_CREATED, created).build();
                eventsChannel.send(successMessage);
            });
    }
}

我想知道是否有更好、更被动的方式来利用spring-cloud-stream-reactive API 实现它。

尝试之后:

@StreamListener(value = TASKS, condition = "headers['type']=='" + CREATE_USER + "'")
public @Output(EVENTS) Flux<Message<User>> createUser(Flux<User> users)  {
       return users.flatMap(userService::save)
            .map(e -> Events.create(Events.USER_CREATED, created).build());
}

我在应用程序启动时收到此错误:

Caused by: java.lang.IllegalArgumentException: Cannot set a condition for methods that return a value
at org.springframework.util.Assert.isTrue(Assert.java:116)
at org.springframework.cloud.stream.binding.StreamListenerMethodUtils.validateStreamListenerMethod(StreamListenerMethodUtils.java:86)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$DefaultStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(StreamListenerAnnotationBeanPostProcessor.java:352)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167)
at java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285)
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:777)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:868)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:388)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:138)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
... 49 more

所以我尝试了一种更通用的方法:

@StreamListener(value = TASKS)
public @Output(EVENTS) Flux<Message<?>> reducer(Flux<Message<?>> messages)  {
    return messages
            .filter(message -> message.getHeaders().containsKey(TYPE_HEADER))
            .flatMap(message -> {
                final String type = (String) message.getHeaders().get(TYPE_HEADER);

                switch (type) {

                    case CREATE_USER:
                        final User toCreate = (User) message.getPayload();
                        return userService.save(toCreate)
                                .map(created -> Events.create(USER_CREATED, created).build());


                    default:
                        return null;
                }

            });
}

但这也不起作用,因为message.getPayload()byte[] 类型,我不知道如何解析它。

【问题讨论】:

    标签: spring spring-cloud spring-cloud-stream project-reactor


    【解决方案1】:

    我认为这应该适合你:

    @StreamListener(value = TASKS, condition = "headers['type']=='" + CREATE_USER + "'")
    public @Output(EVENTS) Flux<Message<User>> createUser(Flux<User> users)  {
               return users.flatMap(userService::save)
                    .map(e -> Events.create(Events.USER_CREATED, created).build());
    }
    

    只要您的UserService.save(User) 返回Mono&lt;User&gt;

    请参阅Reference Manual 中的示例。也关注Reactive Sample

    通过框架提供的响应式支持的要点是,您不应该订阅该框架为您提供的Publisher

    【讨论】:

    • 太棒了,谢谢!我明天在工作中测试它,然后,如果它有效,我可以接受你的答案!但是在你走之前,你能解释一下为什么我不应该订阅出版商吗?
    • 好吧,你可以为你自己的Publisher链做到这一点,但是当你使用框架提供的时候,最好把所有的工作都委托给框架。你不知道在你的功能之前和之后它还会做什么。而且您可能会破坏 Reactive Streams 组合的行为和背压特性。我认为您可以在 WebFlux 文档中找到更多信息:docs.spring.io/spring/docs/5.0.4.RELEASE/…
    • 很好,有道理,但你的代码不起作用:(我添加了一些日志
    • 这不是我的代码的问题,这就是你使用Framework的方式。根据该异常,当有一些返回值时,您不能在@StreamListener 上使用condition。您应该考虑将该条件 (.filter()?) 移至方法主体中的 Flux。您可以在文档中找到这样的信息:docs.spring.io/spring-cloud-stream/docs/Elmhurst.RC3/reference/…:it must be an individual message handling method (reactive API methods are not supported)
    • 我明白了,我会用更多信息更新我的问题,但到目前为止,谢谢你,你一直很有帮助
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多