【发布时间】:2018-12-19 02:20:49
【问题描述】:
我的目标是创建一个可以由 REST 端点调用的同步进程。在我的实际需求中,这将处理来自 Web 界面的用户编辑,而由设置为单独服务的相同进程组成的异步管道处理通过事件流发送的数据。我们希望 HTTP 调用是同步的,以便在整个过程中运行编辑后立即显示结果。
对于下面的示例,我有一个源、一个进程和一个接收器。我想使用 Spring Cloud Stream 的 AggregateApplicationBuilder 来创建使用 RestController 作为 Source 的进程聚合,但我下面的示例只是创建应用程序,并在它完全连接后立即开始关闭它。
可以这样使用 AggregateApplicationBuilder 吗?我一直无法弄清楚如何让生成的 AggregateApplication 作为 Web 应用程序运行。
应用类:
package com.example.aggregate;
import com.example.aggregate.controller.FooController;
import com.example.aggregate.processor.BarProcess;
import com.example.aggregate.sink.SinkService;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
new AggregateApplicationBuilder()
.web(true)
.from(FooController.class)
.via(BarProcess.class)
.to(SinkService.class)
.run(args);
}
}
控制器作为源:
package com.example.aggregate.controller;
import com.example.aggregate.dto.Foo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.Instant;
import java.util.HashMap;
@RestController
@RequestMapping("/v1/foo")
@EnableBinding(Source.class)
public class FooController {
private Source source;
public FooController(Source source) {
this.source = source;
}
@PostMapping
public void handleRequest(@RequestBody Foo foo) {
foo.putValue("Received", Instant.now().toString());
sendMessage(foo);
}
private void sendMessage(Foo foo) {
Message<Foo> message = MessageBuilder.createMessage(foo, new MessageHeaders(new HashMap<>()));
source.output().send(message);
}
}
处理器:
package com.example.aggregate.processor;
import com.example.aggregate.dto.Foo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(Processor.class)
public class BarProcess {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<Foo> doFoo(Message<Foo> message) {
Foo foo = message.getPayload();
foo.putValue("BarProcess", "completed");
return MessageBuilder.createMessage(foo, message.getHeaders());
}
}
接收器类(目前,我只是尝试将结果记录到 System.out):
package com.example.aggregate.sink;
import com.example.aggregate.dto.Foo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(Sink.class)
public class SinkService {
@StreamListener(Sink.INPUT)
public void processPayload(Message<Foo> payload) {
System.out.println("*****SINK*****");
System.out.println("Received: " + payload);
}
}
最后是简单的 Foo 类:
package com.example.aggregate.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.util.HashMap;
import java.util.Map;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class Foo {
private Map<String, String> values;
public void putValue(String key, String value) {
if (values == null)
values = new HashMap<>();
values.put(key, value);
}
public Map<String, String> getValues() {
return values;
}
}
我运行 Spring 应用程序时的输出是:
2018-12-18 18:12:00.104 INFO 94095 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1bd4fdd: startup date [Tue Dec 18 18:12:00 PST 2018]; root of context hierarchy
2018-12-18 18:12:00.301 INFO 94095 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'configurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$a4218ca6] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.2.RELEASE)
2018-12-18 18:12:00.398 INFO 94095 --- [ main] c.e.aggregate.SyncPipelineApplication : The following profiles are active: local
2018-12-18 18:12:00.408 INFO 94095 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10: startup date [Tue Dec 18 18:12:00 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@1bd4fdd
2018-12-18 18:12:00.475 INFO 94095 --- [ main] o.s.i.config.IntegrationRegistrar : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2018-12-18 18:12:00.519 INFO 94095 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2018-12-18 18:12:00.521 INFO 94095 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2018-12-18 18:12:00.705 INFO 94095 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2018-12-18 18:12:00.864 INFO 94095 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147482648
2018-12-18 18:12:00.864 INFO 94095 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-12-18 18:12:00.864 INFO 94095 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2018-12-18 18:12:00.864 INFO 94095 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 1 subscriber(s).
2018-12-18 18:12:00.864 INFO 94095 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started _org.springframework.integration.errorLogger
2018-12-18 18:12:00.865 INFO 94095 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147482647
2018-12-18 18:12:00.869 INFO 94095 --- [ main] c.e.aggregate.SyncPipelineApplication : Started SyncPipelineApplication in 1.089 seconds (JVM running for 1.438)
2018-12-18 18:12:00.891 INFO 94095 --- [ main] com.example.aggregate.sink.SinkService : The following profiles are active: local
2018-12-18 18:12:00.893 INFO 94095 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@54227100: startup date [Tue Dec 18 18:12:00 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.024 INFO 94095 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application-1-1.input' has 1 subscriber(s).
2018-12-18 18:12:01.024 INFO 94095 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147482648
2018-12-18 18:12:01.024 INFO 94095 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147482647
2018-12-18 18:12:01.027 INFO 94095 --- [ main] com.example.aggregate.sink.SinkService : Started SinkService in 0.153 seconds (JVM running for 1.596)
2018-12-18 18:12:01.044 INFO 94095 --- [ main] c.e.aggregate.processor.BarProcess : The following profiles are active: local
2018-12-18 18:12:01.046 INFO 94095 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@35fe2125: startup date [Tue Dec 18 18:12:01 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.143 INFO 94095 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application-1-2.input' has 1 subscriber(s).
2018-12-18 18:12:01.144 INFO 94095 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147482648
2018-12-18 18:12:01.144 INFO 94095 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147482647
2018-12-18 18:12:01.145 INFO 94095 --- [ main] c.e.aggregate.processor.BarProcess : Started BarProcess in 0.116 seconds (JVM running for 1.714)
2018-12-18 18:12:01.158 INFO 94095 --- [ main] c.e.aggregate.controller.FooController : The following profiles are active: local
2018-12-18 18:12:01.162 INFO 94095 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@48c40605: startup date [Tue Dec 18 18:12:01 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.244 INFO 94095 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147482648
2018-12-18 18:12:01.244 INFO 94095 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147482647
2018-12-18 18:12:01.246 INFO 94095 --- [ main] c.e.aggregate.controller.FooController : Started FooController in 0.1 seconds (JVM running for 1.815)
2018-12-18 18:12:01.247 INFO 94095 --- [ Thread-5] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10: startup date [Tue Dec 18 18:12:00 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@1bd4fdd
2018-12-18 18:12:01.248 INFO 94095 --- [ Thread-5] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@54227100: startup date [Tue Dec 18 18:12:00 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.248 INFO 94095 --- [ Thread-5] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147482647
2018-12-18 18:12:01.249 INFO 94095 --- [ Thread-5] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase -2147482648
2018-12-18 18:12:01.249 INFO 94095 --- [ Thread-5] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@35fe2125: startup date [Tue Dec 18 18:12:01 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.250 INFO 94095 --- [ Thread-5] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147482647
2018-12-18 18:12:01.250 INFO 94095 --- [ Thread-5] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase -2147482648
2018-12-18 18:12:01.250 INFO 94095 --- [ Thread-5] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@48c40605: startup date [Tue Dec 18 18:12:01 PST 2018]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@6a57ae10
2018-12-18 18:12:01.250 INFO 94095 --- [ Thread-5] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147482647
2018-12-18 18:12:01.250 INFO 94095 --- [ Thread-5] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase -2147482648
2018-12-18 18:12:01.251 INFO 94095 --- [ Thread-5] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147482647
2018-12-18 18:12:01.251 INFO 94095 --- [ Thread-5] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 0
2018-12-18 18:12:01.251 INFO 94095 --- [ Thread-5] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2018-12-18 18:12:01.251 INFO 94095 --- [ Thread-5] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 0 subscriber(s).
2018-12-18 18:12:01.251 INFO 94095 --- [ Thread-5] o.s.i.endpoint.EventDrivenConsumer : stopped _org.springframework.integration.errorLogger
2018-12-18 18:12:01.251 INFO 94095 --- [ Thread-5] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase -2147482648
2018-12-18 18:12:01.252 INFO 94095 --- [ Thread-5] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
Process finished with exit code 0
关于如何保持应用程序运行的任何指导?当我将控制器源替换为使用 InboundChannelAdapter 的控制器源时,它会按照我期望的方式工作,并根据轮询时间发送消息。
提前谢谢你
【问题讨论】:
标签: spring-boot spring-cloud-stream