【问题标题】:Access Mono object inside map operation after flatmap operation平面地图操作后访问地图操作内的 Mono 对象
【发布时间】:2019-11-23 16:36:41
【问题描述】:

我正在尝试使用自定义过滤器使用 Spring Cloud Gateway 创建网关代理路由器。以阻塞和命令的方式覆盖属性时,一切都按预期工作。

exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + newTargetURLHost + )

字符串变量newTargetURLHost是通过以下方式获得的:

newTarget = serviceReturnsMono.getServerMapping(id).block().getHost();

我对 Webflux 还很陌生,但上面的代码对我来说已经是一种代码味道了。进一步阅读后,这不是使用响应式时的最佳方法。我尝试以更具功能性/反应性的方式进行重写,但未能让反应性流发出所需的值。

Mono.just(serviceReturnsMono.getServerMapping(id))
                    .flatMap(flat -> flat)
                    .subscribeOn(Schedulers.immediate())
                    .map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost() ))
                    .subscribe();

当上述代码执行时,exchange 属性不会发生变化。

我也尝试了以下方法但没有成功:

            serverMappingMono
                    .map(serverMapping -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + serverMapping.getHost() ))
                    .subscribe();

作为测试,当我修改如下代码进行故障排除时,以下确实会发出一个硬编码的字符串并且交换属性发生了变化。

                    .map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + "testHostName" ))
                    .subscribe();

任何想法或指示将不胜感激。

更新:过滤代码如下:


    private ReturnsMonoServerMappingService returnsMonoServerMappingService;

    @Override
    public int getOrder() {
        return 10001;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        final String id = exchange.getRequest().getHeaders().getFirst("reference");

        return chain.filter(exchange).then(Mono.fromRunnable(() -> {

            Mono<ServerMapping> serverMapping = returnsMonoServerMappingService.getServerMapping(id);
            serverMapping
                    .map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost()  )))
                    .subscribe();

        }));
    }
}

来自 Thomas 的更新解决方案:

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

    final String id = exchange.getRequest()
                                 .getHeaders()
                                 .getFirst("reference");

    return returnsMonoServerMappingService.getServerMapping(id)
                     .doOnSuccess(serverMapping -> {                
                         exchange.getAttributes()
                             .put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost()
        }).then(chain.filter(exchange));
}

我忽略的缺失部分是“doOnSuccess”,因为returnsMonoServerMappingService 已经返回了一个单声道。然后通过“then”链接交换以委托给链中的下一个过滤器。

【问题讨论】:

  • 你不应该订阅,订阅者是调用客户端。你是对的,你永远不应该阻止。请发布您的整个过滤器代码,而不是 sn-p。
  • Thomas Andolf,过滤码上传。
  • 看起来您缺少过滤器代码的顶部...请编辑
  • 你还没有描述你想要做什么。您想获取标头,查找服务器映射,然后将其作为属性添加到请求中?

标签: java functional-programming spring-webflux project-reactor


【解决方案1】:

我在我的手机上写这个,所以无法测试它并且我是从内存中写的,但我认为它应该是这样的。或者至少你明白了要点。

我们首先提取 id。然后我们查找服务器映射,如果一切顺利,我们将其作为一个属性,然后我们继续过滤器链。

您几乎不应该在应用程序中订阅,调用客户端通常是订阅者。并且永远不要阻塞在响应式应用程序中。

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

    final String id = exchange.getRequest()
                                 .getHeaders()
                                 .getFirst("reference");
    
    return returnsMonoServerMappingService.getServerMapping(id)
                     .doOnSuccess(serverMapping -> {                
                         exchange.getAttributes()
                             .put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost()
        }).then(chain.filter(exchange));
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-08-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多