【问题标题】:How to create a Spring Reactor Flux from Http integration flow?如何从 Http 集成流创建 Spring Reactor Flux?
【发布时间】:2017-12-17 23:09:11
【问题描述】:

我有一个与How to create a Spring Reactor Flux from a ActiveMQ queue?非常相似的问题

不同之处在于消息来自 Http 端点而不是 JMS 队列。问题是由于某种原因没有填充消息通道,或者它没有被 Flux.from() 拾取。日志条目显示 GenericMessage 是从 Http 集成流创建的,其有效负载作为路径变量,但没有被排队/发布到通道?我试过.channel(MessageChannels.queue()).channel(MessageChannels.publishSubscribe()) 没有任何区别,事件流是空的。代码如下:

@Bean
public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                from(Http.inboundChannelAdapter("/eventmessage/{id}")
                        .requestMapping(r -> r
                        .methods(HttpMethod.POST)                                                                                   
                        )
                        .payloadExpression("#pathVariables.id")
                        )                           
                        .channel(MessageChannels.queue())
                        .log(LoggingHandler.Level.DEBUG)                            
                        .log()  
                        .toReactivePublisher();
    }


@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){     
    return Flux.from(httpReactiveSource())              
            .map(Message::getPayload);

}

更新1:

build.gradle

buildscript {
    ext {
        springBootVersion = '2.0.0.M2'
    }
    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-freemarker')
    compile('org.springframework.boot:spring-boot-starter-integration') 
    compile('org.springframework.boot:spring-boot-starter-web')
    compile('org.springframework.boot:spring-boot-starter-webflux') 
    compile('org.springframework.integration:spring-integration-http')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('io.projectreactor:reactor-test')

}

更新2

@SpringBootApplication@RestController 在一个文件中定义时有效,但在 @SpringBootApplication@RestController 在不同文件中时停止工作。

TestApp.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class TestApp {
     public static void main(String[] args) {
            SpringApplication.run(TestApp.class, args);
    }
}

TestController.java

package com.example.controller;


import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;



import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;



@RestController
public class TestController {
     @Bean
        public Publisher<Message<String>> httpReactiveSource() {
            return IntegrationFlows.
                    from(Http.inboundChannelAdapter("/message/{id}")
                            .requestMapping(r -> r
                                    .methods(HttpMethod.POST)
                            )
                            .payloadExpression("#pathVariables.id")
                    )
                    .channel(MessageChannels.queue())
                    .toReactivePublisher();
        }

        @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<String> eventMessages() {
            return Flux.from(httpReactiveSource())
                    .map(Message::getPayload);
        }

}

【问题讨论】:

  • 你使用什么 Spring Boot 配置 (pom)?如何将 Servlet 容器与 WebFlux 混合使用?
  • 您好 Artem,感谢您的调查。我正在尝试使用 SpringBootApplication v. '2.0.0.M2' 的所有默认值。我有一个@RestController 上面提到的两种方法。它与您在适用于我的 ActiveMQ 主题中的回答基本相同(配置方面)。但我不是从 JMS 队列接收来自 http REST 调用的消息。
  • 我刚刚更新了使用 gradle 依赖项的问题。谢谢!
  • 好的。感谢更新。我看到您同时拥有 webwebflux 依赖项。仅与web 一起工作如何?我无法在本地检查,因为 Boot 现在有点坏了
  • 感谢您回来。好的,创建了一个新的快速项目并删除了其中对 spring-boot-starter-webflux 的依赖。但看起来它没有帮助。仍然没有发送任何事件。

标签: java spring spring-integration reactive-programming project-reactor


【解决方案1】:

这对我很有效:

@SpringBootApplication
@RestController
public class SpringIntegrationSseDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
    }

    @Bean
    public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                from(Http.inboundChannelAdapter("/message/{id}")
                        .requestMapping(r -> r
                                .methods(HttpMethod.POST)
                        )
                        .payloadExpression("#pathVariables.id")
                )
                .channel(MessageChannels.queue())
                .toReactivePublisher();
    }

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> eventMessages() {
        return Flux.from(httpReactiveSource())
                .map(Message::getPayload);
    }

}

我在 POM 中有这个依赖项:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.BUILD-SNAPSHOT</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-http</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

我运行应用程序并有两个终端:

curl http://localhost:8080/events

收听 SSE。

在第二个中,我执行以下操作:

curl -X POST http://localhost:8080/message/foo

curl -X POST http://localhost:8080/message/bar

curl -X POST http://localhost:8080/message/666

所以,第一个终端的响应如下:

data:foo

data:bar

data:666

注意,我们不需要 spring-boot-starter-webflux 依赖。 Flux 到 SSE 可以很好地与 Servlet 容器上的常规 MVC 配合使用。

Spring Integration 也将很快支持 WebFlux:https://jira.spring.io/browse/INT-4300。因此,您将能够在那里进行如下配置:

   IntegrationFlows
    .from(Http.inboundReactiveGateway("/sse")
                        .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))

并且完全只依赖 WebFlux 而没有任何 Servlet 容器依赖。

【讨论】:

  • 嗨,Artem,感谢您再次为此付出的努力以及解释。您的代码有效,它有助于将其范围缩小到这样一个事实,即由于某种原因在一个文件中定义 @SpringBootApplication@RestController 使其工作,但是当它们位于单独的文件中时(这实际上听起来更像是一个真实的-life case)它没有。
  • 看,问题在于您的TestController 上错过了@Configuration。是的,httpReactiveSource()@Bean 的处理和注册是正确的,这要归功于“轻配置”机制。但是由于您直接从eventMessages() 方法调用httpReactiveSource() 方法,因此没有代理调用并且httpReactiveSource() 不会委托给bean 工厂以进行正确的bean 解析。所以,真的考虑将httpReactiveSource() 定义移动到@Configuration 类,并在TestController 中使用@Autowired 来表示Publisher&lt;Message&lt;String&gt;&gt;
  • 就是这样,阿尔乔姆!当通过@Configuration 定义时,它现在可以工作。我们终于找到了问题的根源!谢谢!你们在这个框架上做得很棒!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-06-17
  • 2019-11-12
  • 2017-01-27
  • 1970-01-01
  • 1970-01-01
  • 2018-10-01
  • 1970-01-01
相关资源
最近更新 更多