【问题标题】:Using AggregateApplicationBuilder with a HTTP Source binding将 AggregateApplicationBuilder 与 HTTP 源绑定一起使用
【发布时间】:2018-12-19 02:20:49
【问题描述】:

我的目标是创建一个可以由 REST 端点调用的同步进程。在我的实际需求中,这将处理来自 Web 界面的用户编辑,而由设置为单独服务的相同进程组成的异步管道处理通过事件流发送的数据。我们希望 HTTP 调用是同步的,以便在整个过程中运行编辑后立即显示结果。

对于下面的示例,我有一个源、一个进程和一个接收器。我想使用 Spring Cloud Stream 的 AggregateApplicationBuilder 来创建使用 RestController 作为 Source 的进程聚合,但我下面的示例只是创建应用程序,并在它完全连接后立即开始关闭它。

可以这样使用 AggregateApplicationBuilder 吗?我一直无法弄清楚如何让生成的 AggregateApplication 作为 Web 应用程序运行。

应用类:

package com.example.aggregate;

import com.example.aggregate.controller.FooController;
import com.example.aggregate.processor.BarProcess;
import com.example.aggregate.sink.SinkService;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        new AggregateApplicationBuilder()
            .web(true)
            .from(FooController.class)
            .via(BarProcess.class)
            .to(SinkService.class)
            .run(args);
    }
}

控制器作为源:

package com.example.aggregate.controller;

import com.example.aggregate.dto.Foo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.Instant;
import java.util.HashMap;

@RestController
@RequestMapping("/v1/foo")
@EnableBinding(Source.class)
public class FooController {

    private Source source;

    public FooController(Source source) {
        this.source = source;
    }

    @PostMapping
    public void handleRequest(@RequestBody Foo foo) {
        foo.putValue("Received", Instant.now().toString());
        sendMessage(foo);
    }

    private void sendMessage(Foo foo) {
        Message<Foo> message = MessageBuilder.createMessage(foo, new MessageHeaders(new HashMap<>()));
        source.output().send(message);
    }
}

处理器:

package com.example.aggregate.processor;

import com.example.aggregate.dto.Foo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(Processor.class)
public class BarProcess {
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<Foo> doFoo(Message<Foo> message) {
        Foo foo = message.getPayload();
        foo.putValue("BarProcess", "completed");
        return MessageBuilder.createMessage(foo, message.getHeaders());
    }
}

接收器类(目前,我只是尝试将结果记录到 System.out):

package com.example.aggregate.sink;

import com.example.aggregate.dto.Foo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(Sink.class)
public class SinkService {

    @StreamListener(Sink.INPUT)
    public void processPayload(Message<Foo> payload) {
        System.out.println("*****SINK*****");
        System.out.println("Received: " + payload);
    }
}

最后是简单的 Foo 类:

package com.example.aggregate.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;

import java.util.HashMap;
import java.util.Map;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class Foo {

    private Map<String, String> values;

    public void putValue(String key, String value) {
        if (values == null)
            values = new HashMap<>();
        values.put(key, value);
    }

    public Map<String, String> getValues() {
        return values;
    }
}

我运行 Spring 应用程序时的输出是:

2018-12-18 18:12:00.104  INFO 94095 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1bd4fdd: startup date [Tue Dec 18 18:12:00 PST 2018]; root of context hierarchy
2018-12-18 18:12:00.301  INFO 94095 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'configurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$a4218ca6] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.2.RELEASE)

2018-12-18 18:12:00.398  INFO 94095 --- [           main] c.e.aggregate.SyncPipelineApplication    : The following profiles are active: local
2018-12-18 18:12:00.408  INFO 94095 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10: startup date [Tue Dec 18 18:12:00 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@1bd4fdd
2018-12-18 18:12:00.475  INFO 94095 --- [           main] o.s.i.config.IntegrationRegistrar        : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2018-12-18 18:12:00.519  INFO 94095 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2018-12-18 18:12:00.521  INFO 94095 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2018-12-18 18:12:00.705  INFO 94095 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService  'taskScheduler'
2018-12-18 18:12:00.864  INFO 94095 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -2147482648
2018-12-18 18:12:00.864  INFO 94095 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2018-12-18 18:12:00.864  INFO 94095 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2018-12-18 18:12:00.864  INFO 94095 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 1 subscriber(s).
2018-12-18 18:12:00.864  INFO 94095 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2018-12-18 18:12:00.865  INFO 94095 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147482647
2018-12-18 18:12:00.869  INFO 94095 --- [           main] c.e.aggregate.SyncPipelineApplication    : Started SyncPipelineApplication in 1.089 seconds (JVM running for 1.438)
2018-12-18 18:12:00.891  INFO 94095 --- [           main] com.example.aggregate.sink.SinkService   : The following profiles are active: local
2018-12-18 18:12:00.893  INFO 94095 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@54227100: startup date [Tue Dec 18 18:12:00 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.024  INFO 94095 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application-1-1.input' has 1 subscriber(s).
2018-12-18 18:12:01.024  INFO 94095 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -2147482648
2018-12-18 18:12:01.024  INFO 94095 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147482647
2018-12-18 18:12:01.027  INFO 94095 --- [           main] com.example.aggregate.sink.SinkService   : Started SinkService in 0.153 seconds (JVM running for 1.596)
2018-12-18 18:12:01.044  INFO 94095 --- [           main] c.e.aggregate.processor.BarProcess       : The following profiles are active: local
2018-12-18 18:12:01.046  INFO 94095 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@35fe2125: startup date [Tue Dec 18 18:12:01 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.143  INFO 94095 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application-1-2.input' has 1 subscriber(s).
2018-12-18 18:12:01.144  INFO 94095 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -2147482648
2018-12-18 18:12:01.144  INFO 94095 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147482647
2018-12-18 18:12:01.145  INFO 94095 --- [           main] c.e.aggregate.processor.BarProcess       : Started BarProcess in 0.116 seconds (JVM running for 1.714)
2018-12-18 18:12:01.158  INFO 94095 --- [           main] c.e.aggregate.controller.FooController   : The following profiles are active: local
2018-12-18 18:12:01.162  INFO 94095 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@48c40605: startup date [Tue Dec 18 18:12:01 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.244  INFO 94095 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -2147482648
2018-12-18 18:12:01.244  INFO 94095 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147482647
2018-12-18 18:12:01.246  INFO 94095 --- [           main] c.e.aggregate.controller.FooController   : Started FooController in 0.1 seconds (JVM running for 1.815)
2018-12-18 18:12:01.247  INFO 94095 --- [       Thread-5] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10: startup date [Tue Dec 18 18:12:00 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@1bd4fdd
2018-12-18 18:12:01.248  INFO 94095 --- [       Thread-5] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@54227100: startup date [Tue Dec 18 18:12:00 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.248  INFO 94095 --- [       Thread-5] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147482647
2018-12-18 18:12:01.249  INFO 94095 --- [       Thread-5] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase -2147482648
2018-12-18 18:12:01.249  INFO 94095 --- [       Thread-5] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@35fe2125: startup date [Tue Dec 18 18:12:01 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.250  INFO 94095 --- [       Thread-5] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147482647
2018-12-18 18:12:01.250  INFO 94095 --- [       Thread-5] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase -2147482648
2018-12-18 18:12:01.250  INFO 94095 --- [       Thread-5] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@48c40605: startup date [Tue Dec 18 18:12:01 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.250  INFO 94095 --- [       Thread-5] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147482647
2018-12-18 18:12:01.250  INFO 94095 --- [       Thread-5] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase -2147482648
2018-12-18 18:12:01.251  INFO 94095 --- [       Thread-5] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147482647
2018-12-18 18:12:01.251  INFO 94095 --- [       Thread-5] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 0
2018-12-18 18:12:01.251  INFO 94095 --- [       Thread-5] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2018-12-18 18:12:01.251  INFO 94095 --- [       Thread-5] o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 0 subscriber(s).
2018-12-18 18:12:01.251  INFO 94095 --- [       Thread-5] o.s.i.endpoint.EventDrivenConsumer       : stopped _org.springframework.integration.errorLogger
2018-12-18 18:12:01.251  INFO 94095 --- [       Thread-5] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase -2147482648
2018-12-18 18:12:01.252  INFO 94095 --- [       Thread-5] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'

Process finished with exit code 0

关于如何保持应用程序运行的任何指导?当我将控制器源替换为使用 InboundChannelAdapter 的控制器源时,它会按照我期望的方式工作,并根据轮询时间发送消息。

提前谢谢你

【问题讨论】:

    标签: spring-boot spring-cloud-stream


    【解决方案1】:

    David,虽然我们还没有真正发布任何正式公告,但 AggregatorBuilder 已被实际弃用;)我们不再谈论它,您可能会注意到最新的快照文档不再有它的部分。

    这意味着我们有一种更好、更简单的方法来满足您的需求,那就是使用函数组合。我们目前正在博客上发布它,以在 Spring Cloud Stream 的 2.1.0.RELEASE 之前发布,这将在 1 月初发生(我们现在处于 RC4 中),所以首先你需要切换到使用最新的版本。

    现在,通过将 Spring Cloud Function 编程模型添加到 Spring Cloud Stream 来提供支持。你可以阅读更多关于它的信息here。 此外,不久前在这个论坛上提出了一个类似的问题,所以this link 应该为您提供您正在寻找的答案。事实上,它引用了 GitHub 上使用 Http 源的示例应用程序,本质上演示了一个相同的案例。

    如果您仍有任何问题,请仔细阅读并告诉我们。

    干杯, 奥列格

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-05-15
      • 2013-06-18
      • 2019-10-08
      • 1970-01-01
      • 1970-01-01
      • 2018-05-23
      • 2012-11-12
      • 1970-01-01
      相关资源
      最近更新 更多