【问题标题】:How to properly manage closable resources in Reactor如何正确管理 Reactor 中的可关闭资源
【发布时间】:2017-06-21 14:38:43
【问题描述】:

我有一个 http 客户端和执行程序,所有工作完成后应该关闭。

我正在尝试以在 RxJava 1.x 中描述的方式使用 Flux.using 方法: https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter08/ResourceManagement.java

我的资源创建方法:

public static Flux<GithubClient> createResource(String token,
                                                int connectionCount) {

    return Flux.using(
            () -> {
                logger.info(Thread.currentThread().getName() + " : Created and started the client.");
                return new GithubClient(token, connectionCount);
            },
            client -> {
                logger.info(Thread.currentThread().getName() + " : About to create Observable.");
                return Flux.just(client);
            },
            client -> {
                logger.info(Thread.currentThread().getName() + " : Closing the client.");
                client.close();
            },
            false
    ).doOnSubscribe(subscription -> logger.info("subscribed"));
}

然后我会使用:

Flux<StateMutator> dataMutators = GithubClient.createResource(
            config.getAccessToken(),
            config.getConnectionCount())
            .flatMap(client -> client.loadRepository(organization, repository)

问题是客户端连接甚至在第一个请求发出之前就关闭了。

[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : Created and started the client.
[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : About to create Observable.
[main] INFO com.sapho.services.githubpublic.client.GithubClient - subscribed
[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : Closing the client.

java.lang.IllegalStateException: Client instance has been closed.

at jersey.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
at org.glassfish.jersey.client.JerseyClient.checkNotClosed(JerseyClient.java:273)

没有找到 Reactor 的任何示例。

谢谢

【问题讨论】:

    标签: java project-reactor


    【解决方案1】:

    我阅读了再次使用的文档并发现了我的错误。使用return Flux.just(client); 返回客户端没有意义,因为 Flux 会立即终止,这会触发客户端关闭。

    我最终实现了:

    public static Flux<StateMutator> createAndExecute(GithubPublicConfiguration config,
                                                      Function<GithubClient, Flux<StateMutator>> toExecute) {
    
        return Flux.using(
                () -> {
                    logger.debug(Thread.currentThread().getName() + " : Created and started the client.");
                    return new GithubClient(entityModelHandler, config.getAccessToken(), config.getConnectionCount());
                },
                client -> toExecute.apply(client),
                client -> {
                    logger.debug(Thread.currentThread().getName() + " : Closing the client.");
                    client.close();
                },
                false
        );
    }
    

    然后我打电话给:

    GithubClient.createAndExecute(config,
                client -> client.loadRepository(organization, repository))
    

    现在所有操作都按适当的顺序进行。

    【讨论】:

      猜你喜欢
      • 2011-08-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-03-15
      • 2021-09-23
      • 2016-03-10
      相关资源
      最近更新 更多