【问题标题】:RxJava: Thread pool for network callsRxJava:网络调用的线程池
【发布时间】:2020-02-26 19:04:07
【问题描述】:

我正在尝试创建一种机制来限制并发网络请求的数量。我的想法是我想要一个固定的线程池,比如 20 个线程,并使用该池最多只允许 20 个传出 HTTP 请求。

我一直在尝试做的是:

public class HttpClient {
  private final Scheduler scheduler;

  public HttpClient(int maxRequests) {
    this.scheduler = Schedulers.from(Executors.newFixedThreadPool(maxRequests));
  }

  public Single<...> request() {
    return this.httpRequest()
      .subscribeOn(this.scheduler);
  }

  // sends the http request and returns a response
  private Single<...> httpRequest() {
    return ...
  }
}

但这不起作用。我尝试将maxRequests 设置为 1,发送 5 个请求,然后在接收请求的服务器上设置一个断点,以便将第一个请求“卡”在那里,以查看其他 4等待一个可用的线程。但是所有 5 个请求都执行了,过了一会儿,我在所有 5 个请求上都得到了一个超时异常。

我也尝试过使用observeOn,但也没有用。

编辑:我还尝试使用以下代码实现信号量逻辑:


public HttpClient(int maxRequests) {
  this.concurrentRequestsSemaphore = new Semaphore(maxRequests, true);
}

public Single<...> request() {
  return Completable.fromAction(concurrentRequestsSemaphore::acquireUninterruptibly)
   .andThen(this.httpRequest())
   .doFinally(concurrentRequestsSemaphore::release);
}

Semaphore 是信号量的本地 Java 实现。这个机制按预期工作,如果maxRequests 是 2,我发送了 5 个请求,2 个会出去,另外 3 个会卡在 fromAction 内等待。但是这种方法带来了其他意想不到的行为,例如即使在 2 个请求收到响应之后,其他 3 个请求都没有被执行,因为 .doFinally(concurrentRequestsSemaphore::release) 从未被执行。我做了一些测试,它只在 X 请求收到响应后执行。 X 会是什么完全无法预测。因此,可能有 20 个许可的信号量,20 个请求会发出并返回响应,而没有其他请求会被执行,因为该信号量从未被任何请求释放。

【问题讨论】:

  • concurrentRequestsSemaphore::release 应该在 http 请求完成时调用,而不是在 this.httpRequest() 返回时调用。
  • 我也试过了。我在.andThen() 通话后尝试了这个:.map(request -&gt; { concurrentRequestsSemaphore.release(); return this.httpRequest(); }。同样的问题发生了。该请求在得到响应后没有直接进入map
  • 这是我的建议:首先编写一个干净、简单的同步代码,其中一个线程获得许可,从阻塞队列中获取下一个 http 参数,并启动一个线程来执行同步 http 请求,然后最后释放许可证,让它工作,然后,如果你还想体验sadomaso,把它转换成神秘的rxjava代码。
  • 感谢您的建议!如果您认为我在 RxJava 中尝试使信号量逻辑异步化而不必要地复杂化,那是因为我的整个应用程序都在 Vert.x 上运行。如果我尝试同步实现逻辑,它会阻塞 Vert.x 的事件循环,这是不好的
  • 这就是我所说的:仅在您自己的线程上使用同步信号量。您可以在自己的线程上完成所有工作,也可以只在获取信号量的限制器线程上完成。

标签: java multithreading rx-java rx-java2


【解决方案1】:

您没有显示private Single&lt;...&gt; httpRequest() 的正文。我假设你在那里调用了一些异步方法。异步方法只占用线程来处理响应,当请求本身移动到服务器并返回时,没有使用任何线程。这就解释了为什么您会看到所有 5 个请求都到达了服务器。 通常,为了限制某种活动的数量,使用java.util.concurrent.Semaphores,但它们通过阻塞线程来限制活动。从逻辑上讲,由于您的程序是异步的,因此您需要使用异步信号量,但它是一种罕见的野兽。 所以你有以下选择:

  • 根本不限制异步 http 请求的数量,因为它们无论如何都不会占用太多资源
  • 启动一个特殊线程,从普通同步信号量中获取许可,然后启动异步http请求。当请求完全完成时释放信号量
  • 使用固定线程池同步启动http请求
  • 使用异步信号量。我知道的唯一实现在我的库中DF4JAsyncSemaphore 是标准 Semaphore 的扩展,因此具有同步和异步接口,InpSignal 仅用于异步程序。 InpSignal 的使用示例是 AsyncServerSocketChannel.java,它用于限制 Echo Server 实现中打开的客户端连接数。

【讨论】:

  • 之所以要限制HTTP请求数,是因为超载。我们在生成报告的服务中使用这种机制,每个报告都需要向我们的另一个服务发送数十甚至数百个请求,并且大多数时候我们会因为服务过载而不断超时。我编辑了我的问题以显示我也尝试实现的信号量逻辑
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-07-03
  • 1970-01-01
  • 1970-01-01
  • 2015-02-07
  • 2010-11-10
  • 1970-01-01
  • 2017-03-03
相关资源
最近更新 更多