【发布时间】:2017-12-17 23:09:11
【问题描述】:
我有一个与How to create a Spring Reactor Flux from a ActiveMQ queue?非常相似的问题
不同之处在于消息来自 Http 端点而不是 JMS 队列。问题是由于某种原因没有填充消息通道,或者它没有被 Flux.from() 拾取。日志条目显示 GenericMessage 是从 Http 集成流创建的,其有效负载作为路径变量,但没有被排队/发布到通道?我试过.channel(MessageChannels.queue()) 和.channel(MessageChannels.publishSubscribe())
没有任何区别,事件流是空的。代码如下:
@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/eventmessage/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}
更新1:
build.gradle
buildscript {
ext {
springBootVersion = '2.0.0.M2'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-freemarker')
compile('org.springframework.boot:spring-boot-starter-integration')
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.integration:spring-integration-http')
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('io.projectreactor:reactor-test')
}
更新2
@SpringBootApplication 和 @RestController 在一个文件中定义时有效,但在 @SpringBootApplication 和 @RestController 在不同文件中时停止工作。
TestApp.java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TestApp {
public static void main(String[] args) {
SpringApplication.run(TestApp.class, args);
}
}
TestController.java
package com.example.controller;
import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;
@RestController
public class TestController {
@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/message/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.toReactivePublisher();
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages() {
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}
}
【问题讨论】:
-
你使用什么 Spring Boot 配置 (
pom)?如何将 Servlet 容器与 WebFlux 混合使用? -
您好 Artem,感谢您的调查。我正在尝试使用
SpringBootApplication v. '2.0.0.M2'的所有默认值。我有一个@RestController上面提到的两种方法。它与您在适用于我的 ActiveMQ 主题中的回答基本相同(配置方面)。但我不是从 JMS 队列接收来自 http REST 调用的消息。 -
我刚刚更新了使用 gradle 依赖项的问题。谢谢!
-
好的。感谢更新。我看到您同时拥有
web和webflux依赖项。仅与web一起工作如何?我无法在本地检查,因为 Boot 现在有点坏了 -
感谢您回来。好的,创建了一个新的快速项目并删除了其中对
spring-boot-starter-webflux的依赖。但看起来它没有帮助。仍然没有发送任何事件。
标签: java spring spring-integration reactive-programming project-reactor