【发布时间】:2018-11-02 19:10:09
【问题描述】:
我有一个问题和关于如何解决它的模糊想法,但我会尽量在上下文中过度分享以避免XY problem。
我有一个异步方法,它立即返回一个guava ListenableFuture,我需要调用数十万或数百万次(未来本身可能需要一段时间才能完成)。我无法真正改变该方法的内部结构。它在内部涉及一些严重的资源争用,所以我想限制一次调用该方法的次数。所以我尝试使用Semaphore:
public class ConcurrentCallsLimiter<In, Out> {
private final Function<In, ListenableFuture<Out>> fn;
private final Semaphore semaphore;
private final Executor releasingExecutor;
public ConcurrentCallsLimiter(
int limit, Executor releasingExecutor,
Function<In, ListenableFuture<Out>> fn) {
this.semaphore = new Semaphore(limit);
this.fn = fn;
this.releasingExecutor = releasingExecutor;
}
public ListenableFuture<Out> apply(In in) throws InterruptedException {
semaphore.acquire();
ListenableFuture<Out> result = fn.apply(in);
result.addListener(() -> semaphore.release(), releasingExecutor);
return result;
}
}
那么我可以将我的调用封装在这个类中,然后调用它:
ConcurrentLimiter<Foo, Bar> cl =
new ConcurrentLimiter(10, executor, someService::turnFooIntoBar);
for (Foo foo : foos) {
ListenableFuture<Bar> bar = cl.apply(foo);
// handle bar (no pun intended)
}
这种有效。问题是尾部延迟非常糟糕。一些调用变得“不走运”并最终花费很长时间尝试在该方法调用中获取资源。一些内部指数退避逻辑加剧了这种情况,因此与更急切且在重试之前等待更短时间的新调用相比,不幸的调用获得所需资源的机会越来越少。
如果有类似于该信号量但具有顺序概念的东西,那么修复它的理想方法。例如,如果限制为 10,则当前第 11 个调用必须等待前 10 个中的 any 完成。我想要的是第 11 次调用必须等待第一次调用完成。这样一来,“倒霉”的电话就不会因为不断打进来的新电话而继续挨饿。
似乎我可以为调用分配一个整数序列号,并以某种方式跟踪尚未完成的最低序列号,但不太清楚如何完成这项工作,特别是因为实际上没有任何有用的“等待” AtomicInteger 上的方法或其他方法。
【问题讨论】:
标签: java concurrency synchronization semaphore