【问题标题】:Java "in-order" semaphoreJava“有序”信号量
【发布时间】:2018-11-02 19:10:09
【问题描述】:

我有一个问题和关于如何解决它的模糊想法,但我会尽量在上下文中过度分享以避免XY problem

我有一个异步方法,它立即返回一个guava ListenableFuture,我需要调用数十万或数百万次(未来本身可能需要一段时间才能完成)。我无法真正改变该方法的内部结构。它在内部涉及一些严重的资源争用,所以我想限制一次调用该方法的次数。所以我尝试使用Semaphore

public class ConcurrentCallsLimiter<In, Out> {
  private final Function<In, ListenableFuture<Out>> fn;
  private final Semaphore semaphore;
  private final Executor releasingExecutor;

  public ConcurrentCallsLimiter(
      int limit, Executor releasingExecutor,
      Function<In, ListenableFuture<Out>> fn) {
    this.semaphore = new Semaphore(limit);
    this.fn = fn;
    this.releasingExecutor = releasingExecutor;
  }

  public ListenableFuture<Out> apply(In in) throws InterruptedException {
    semaphore.acquire();
    ListenableFuture<Out> result = fn.apply(in);
    result.addListener(() -> semaphore.release(), releasingExecutor);
    return result;
  }
}

那么我可以将我的调用封装在这个类中,然后调用它:

ConcurrentLimiter<Foo, Bar> cl =
    new ConcurrentLimiter(10, executor, someService::turnFooIntoBar);
for (Foo foo : foos) {
  ListenableFuture<Bar> bar = cl.apply(foo);
  // handle bar (no pun intended)
}

有效。问题是尾部延迟非常糟糕。一些调用变得“不走运”并最终花费很长时间尝试在该方法调用中获取资源。一些内部指数退避逻辑加剧了这种情况,因此与更急切且在重试之前等待更短时间的新调用相比,不幸的调用获得所需资源的机会越来越少。

如果有类似于该信号量但具有顺序概念的东西,那么修复它的理想方法。例如,如果限制为 10,则当前第 11 个调用必须等待前 10 个中的 any 完成。我想要的是第 11 次调用必须等待第一次调用完成。这样一来,“倒霉”的电话就不会因为不断打进来的新电话而继续挨饿。

似乎我可以为调用分配一个整数序列号,并以某种方式跟踪尚未完成的最低序列号,但不太清楚如何完成这项工作,特别是因为实际上没有任何有用的“等待” AtomicInteger 上的方法或其他方法。

【问题讨论】:

    标签: java concurrency synchronization semaphore


    【解决方案1】:

    您可以使用公平参数创建信号量:

    Semaphore(int permits, boolean fair)
    

    //fair - 如果此信号量将保证争用许可的先进先出授予,则为 true,否则为 false

    【讨论】:

    • 我看到了这个,但是公平设置并不是我想要的。我不关心获得许可的顺序(我实际上是从一个线程中获取它们)。这更像是关心它们发布的顺序。
    • “我实际上是从一个线程中获取它们” - 真的吗?那么“更热切的新来电”从何而来?
    【解决方案2】:

    我想到的一个不太理想的解决方案是使用Queue 来存储所有期货:

    public class ConcurrentCallsLimiter<In, Out> {
      private final Function<In, ListenableFuture<Out>> fn;
      private final int limit;
      private final Queue<ListenableFuture<Out>> queue = new ArrayDeque<>();
    
      public ConcurrentCallsLimiter(int limit, Function<In, ListenableFuture<Out>> fn) {
        this.limit = limit;
        this.fn = fn;
      }
    
      public ListenableFuture<Out> apply(In in) throws InterruptedException {
        if (queue.size() == limit) {
          queue.remove().get();
        }
        ListenableFuture<Out> result = fn.apply(in);
        queue.add(result);
        return result;
      }
    }
    

    但是,这似乎是对内存的极大浪费。涉及的对象可能有点大,限制可能设置得相当高。所以我愿意接受没有 O(n) 内存使用的更好的答案。看起来它应该在 O(1) 中是可行的。

    【讨论】:

      【解决方案3】:

      “如果有类似于该信号量但具有顺序概念的东西。” 有这样的野兽。它被称为Blocking queue。创建这样一个队列,将 10 个项目放在那里,并使用 take 代替 acquireput 代替 release

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-03-11
        • 1970-01-01
        • 1970-01-01
        • 2015-09-30
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多