【发布时间】:2020-02-26 19:04:07
【问题描述】:
我正在尝试创建一种机制来限制并发网络请求的数量。我的想法是我想要一个固定的线程池,比如 20 个线程,并使用该池最多只允许 20 个传出 HTTP 请求。
我一直在尝试做的是:
public class HttpClient {
private final Scheduler scheduler;
public HttpClient(int maxRequests) {
this.scheduler = Schedulers.from(Executors.newFixedThreadPool(maxRequests));
}
public Single<...> request() {
return this.httpRequest()
.subscribeOn(this.scheduler);
}
// sends the http request and returns a response
private Single<...> httpRequest() {
return ...
}
}
但这不起作用。我尝试将maxRequests 设置为 1,发送 5 个请求,然后在接收请求的服务器上设置一个断点,以便将第一个请求“卡”在那里,以查看其他 4等待一个可用的线程。但是所有 5 个请求都执行了,过了一会儿,我在所有 5 个请求上都得到了一个超时异常。
我也尝试过使用observeOn,但也没有用。
编辑:我还尝试使用以下代码实现信号量逻辑:
public HttpClient(int maxRequests) {
this.concurrentRequestsSemaphore = new Semaphore(maxRequests, true);
}
public Single<...> request() {
return Completable.fromAction(concurrentRequestsSemaphore::acquireUninterruptibly)
.andThen(this.httpRequest())
.doFinally(concurrentRequestsSemaphore::release);
}
Semaphore 是信号量的本地 Java 实现。这个机制按预期工作,如果maxRequests 是 2,我发送了 5 个请求,2 个会出去,另外 3 个会卡在 fromAction 内等待。但是这种方法带来了其他意想不到的行为,例如即使在 2 个请求收到响应之后,其他 3 个请求都没有被执行,因为 .doFinally(concurrentRequestsSemaphore::release) 从未被执行。我做了一些测试,它只在 X 请求收到响应后执行。 X 会是什么完全无法预测。因此,可能有 20 个许可的信号量,20 个请求会发出并返回响应,而没有其他请求会被执行,因为该信号量从未被任何请求释放。
【问题讨论】:
-
concurrentRequestsSemaphore::release应该在 http 请求完成时调用,而不是在this.httpRequest()返回时调用。 -
我也试过了。我在
.andThen()通话后尝试了这个:.map(request -> { concurrentRequestsSemaphore.release(); return this.httpRequest(); }。同样的问题发生了。该请求在得到响应后没有直接进入map -
这是我的建议:首先编写一个干净、简单的同步代码,其中一个线程获得许可,从阻塞队列中获取下一个 http 参数,并启动一个线程来执行同步 http 请求,然后最后释放许可证,让它工作,然后,如果你还想体验sadomaso,把它转换成神秘的rxjava代码。
-
感谢您的建议!如果您认为我在 RxJava 中尝试使信号量逻辑异步化而不必要地复杂化,那是因为我的整个应用程序都在 Vert.x 上运行。如果我尝试同步实现逻辑,它会阻塞 Vert.x 的事件循环,这是不好的
-
这就是我所说的:仅在您自己的线程上使用同步信号量。您可以在自己的线程上完成所有工作,也可以只在获取信号量的限制器线程上完成。
标签: java multithreading rx-java rx-java2