【问题标题】:Wrapping an asynchronous computation into a synchronous (blocking) computation将异步计算包装成同步(阻塞)计算
【发布时间】:2011-01-11 22:22:25
【问题描述】:

类似的问题:

我有一个对象,我想向库客户端(尤其是脚本客户端)公开一个方法,如下所示:

interface MyNiceInterface
{
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
    public Future<Baz> doSomething(Foo fooArg, Bar barArg);
    // doSomethingAndBlock is the straightforward way;
    // doSomething has more control but deals with
    // a Future and that might be too much hassle for
    // scripting clients
}

但我可用的原始“东西”是一组事件驱动的类:

interface BazComputationSink
{
    public void onBazResult(Baz result);
}

class ImplementingThing
{
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
}

ImplementingThing 接受输入,做一些神秘的事情,比如在任务队列中排队,然后当结果出现时,sink.onBazResult() 被调用一个线程,该线程可能与 ImplementingThing.doSomethingAsync( ) 被调用。

有没有一种方法可以使用我拥有的事件驱动函数以及并发原语来实现 MyNiceInterface,以便脚本客户端可以愉快地等待阻塞线程?

编辑:我可以为此使用FutureTask 吗?

【问题讨论】:

    标签: java concurrency asynchronous blocking


    【解决方案1】:

    嗯,有一个简单的解决方案,比如:

    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
      final AtomicReference<Baz> notifier = new AtomicReference();
      doSomethingAsync(fooArg, barArg, new BazComputationSink() {
        public void onBazResult(Baz result) {
          synchronized (notifier) {
            notifier.set(result);
            notifier.notify();
          }
        }
      });
      synchronized (notifier) {
        while (notifier.get() == null)
          notifier.wait();
      }
      return notifier.get();
    }
    

    当然,这假设您的 Baz 结果永远不会为空……

    【讨论】:

    • 内部回调 (BazComputationSink() {} ),它会抱怨 - 初始化通知程序。如果初始化为 null,我会在第一次命中时抛出 NullPointerException,因为我的结果还没有准备好返回到异步任务中。
    • 不建议这样做:如果你的回调立即返回,notify 会在等待之前被调用并且你会遇到死锁(如果你有缓存逻辑可能会发生这种情况)
    • @for3st 这就是为什么你总是在进入等待之前检查条件,这就是为什么存在那个 for 循环的原因。您担心的事情不会发生在所展示的核心上。
    • 看起来不错,但如果结果可以为空怎么办?也不清楚为什么会有同步块和while循环。
    【解决方案2】:

    使用您自己的 Future 实现:

    public class BazComputationFuture implements Future<Baz>, BazComputationSink {
    
        private volatile Baz result = null;
        private volatile boolean cancelled = false;
        private final CountDownLatch countDownLatch;
    
        public BazComputationFuture() {
            countDownLatch = new CountDownLatch(1);
        }
    
        @Override
        public boolean cancel(final boolean mayInterruptIfRunning) {
            if (isDone()) {
                return false;
            } else {
                countDownLatch.countDown();
                cancelled = true;
                return !isDone();
            }
        }
    
        @Override
        public Baz get() throws InterruptedException, ExecutionException {
            countDownLatch.await();
            return result;
        }
    
        @Override
        public Baz get(final long timeout, final TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            countDownLatch.await(timeout, unit);
            return result;
        }
    
        @Override
        public boolean isCancelled() {
            return cancelled;
        }
    
        @Override
        public boolean isDone() {
            return countDownLatch.getCount() == 0;
        }
    
        public void onBazResult(final Baz result) {
            this.result = result;
            countDownLatch.countDown();
        }
    
    }
    
    public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
        BazComputationFuture future = new BazComputationFuture();
        doSomethingAsync(fooArg, barArg, future);
        return future;
    }
    
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
        return doSomething(fooArg, barArg).get();
    }
    

    该解决方案在内部创建一个 CountDownLatch,一旦接收到回调,它就会被清除。如果用户调用 get,CountDownLatch 用于阻塞调用线程,直到计算完成并调用 onBazResult 回调。 CountDownLatch 将确保如果在调用 get() 之前发生回调,则 get() 方法将立即返回结果。

    【讨论】:

    • +1 因为我想我理解它......但是你能解释一下并发方面吗? (倒计时锁存器的使用)
    • 好的,谢谢。嗯...(困惑)我以为我读到 FutureTask 本身实际上并没有创建任何线程,它只是您可以用来实现 Future 以及将其提交到 Executor 的东西。
    • 出于好奇。为什么不将已取消的私有布尔值声明为 volatile?
    • 我认为不需要它,因为它只会从同一个线程中设置/读取,因此在所描述的场景中不需要跨线程看到对值的更改。如果允许来自调用者的单独线程查看未来并取消请求,那么它应该是易变的。
    • 好吧,不知道你是否计划让它对其他线程可见。
    【解决方案3】:

    googleguava library 有一个易于使用的 SettableFuture,它使这个问题变得非常简单(大约 10 行代码)。

    public class ImplementingThing {
    
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
        try {
            return doSomething(fooArg, barArg).get();
        } catch (Exception e) {
            throw new RuntimeException("Oh dear");
        }
    };
    
    public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
        final SettableFuture<Baz> future = new SettableFuture<Baz>();
        doSomethingAsync(fooArg, barArg, new BazComputationSink() {
            @Override
            public void onBazResult(Baz result) {
                future.set(result);
            }
        });
        return future;
    };
    
    // Everything below here is just mock stuff to make the example work,
    // so you can copy it into your IDE and see it run.
    
    public static class Baz {}
    public static class Foo {}
    public static class Bar {}
    
    public static interface BazComputationSink {
        public void onBazResult(Baz result);
    }
    
    public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Baz baz = new Baz();
                sink.onBazResult(baz);
            }
        }).start();
    };
    
    public static void main(String[] args) {
        System.err.println("Starting Main");
        System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
        System.err.println("Ending Main");
    }
    

    【讨论】:

      【解决方案4】:

      一个很简单的例子,只是为了理解CountDownLatch,没有任何 额外的代码。

      java.util.concurrent.CountDownLatch 是一种并发构造,它允许一个或多个线程等待给定的一组操作完成。

      CountDownLatch 使用给定的计数进行初始化。通过调用countDown() 方法会减少此计数。等待此计数达到零的线程可以调用await() 方法之一。调用await() 会阻塞线程,直到计数为零。

      下面是一个简单的例子。 Decrementer 在CountDownLatch 上调用countDown() 3 次后,等待的 Waiter 从await() 调用中释放。

      您也可以提一些TimeOut 等待。

      CountDownLatch latch = new CountDownLatch(3);
      
      Waiter      waiter      = new Waiter(latch);
      Decrementer decrementer = new Decrementer(latch);
      
      new Thread(waiter)     .start();
      new Thread(decrementer).start();
      
      Thread.sleep(4000);
      public class Waiter implements Runnable{
      
          CountDownLatch latch = null;
      
          public Waiter(CountDownLatch latch) {
              this.latch = latch;
          }
      
          public void run() {
              try {
                  latch.await();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
      
              System.out.println("Waiter Released");
          }
      }
      

      //-------------

      public class Decrementer implements Runnable {
      
          CountDownLatch latch = null;
      
          public Decrementer(CountDownLatch latch) {
              this.latch = latch;
          }
      
          public void run() {
      
              try {
                  Thread.sleep(1000);
                  this.latch.countDown();
      
                  Thread.sleep(1000);
                  this.latch.countDown();
      
                  Thread.sleep(1000);
                  this.latch.countDown();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
      

      Reference

      如果您不想使用CountDownLatch,或者您的要求与 Facebook 相同,但功能不同。表示如果正在调用一种方法,则不要调用另一种方法。

      在这种情况下你可以声明一个

      private volatile Boolean isInprocessOfLikeOrUnLike = false;
      

      然后您可以在方法调用的开头检查是否为false,然后调用方法,否则返回.. 取决于您的实现。

      【讨论】:

      • 易失性布尔值?我认为你想要一个 volatile 布尔值(小写 b),或者更好:一个 AtomicReference
      【解决方案5】:

      根据 Paul Wagland 的回答,这是一个更通用的解决方案:

      public abstract class AsyncRunnable<T> {
          protected abstract void run(AtomicReference<T> notifier);
      
          protected final void finish(AtomicReference<T> notifier, T result) {
              synchronized (notifier) {
                  notifier.set(result);
                  notifier.notify();
              }
          }
      
          public static <T> T wait(AsyncRunnable<T> runnable) {
              final AtomicReference<T> notifier = new AtomicReference<>();
      
              // run the asynchronous code
              runnable.run(notifier);
      
              // wait for the asynchronous code to finish
              synchronized (notifier) {
                  while (notifier.get() == null) {
                      try {
                          notifier.wait();
                      } catch (InterruptedException ignore) {}
                  }
              }
      
              // return the result of the asynchronous code
              return notifier.get();
          }
      }
      

      这是一个如何使用它的例子::

          String result = AsyncRunnable.wait(new AsyncRunnable<String>() {
              @Override
              public void run(final AtomicReference<String> notifier) {
                  // here goes your async code, e.g.:
                  new Thread(new Runnable() {
                      @Override
                      public void run() {
                          finish(notifier, "This was a asynchronous call!");
                      }
                  }).start();
              }
          });
      

      可以在此处找到更详细的代码版本:http://pastebin.com/hKHJUBqE

      编辑: 与该问题相关的示例是:

      public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) {
          return AsyncRunnable.wait(new AsyncRunnable<Baz>() {
              @Override
              protected void run(final AtomicReference<Baz> notifier) {
                  doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                      public void onBazResult(Baz result) {
                          synchronized (notifier) {
                              notifier.set(result);
                              notifier.notify();
                          }
                      }
                  });
              }
          });
      }
      

      【讨论】:

      • 我理解你的回答,但我不知道如何将它映射到我六年前的问题。
      • 我更新了答案,以便您可以将其映射到问题
      • 顺便说一句,如果你必须在可能的地方这样做,我会考虑使用这个:github.com/ReactiveX/RxJava。 ReactiveX 非常适合您的问题,尽管它有一个陡峭的学习曲线(至少对我而言)。这是一个很棒的介绍:youtube.com/watch?v=_t06LRX0DV0.
      • 好的,谢谢!我的问题已经过时了,但你永远不知道,有时我提出了一个问题,我搜索 SO,它提出了我的一个老问题。
      • 我只是想分享我的解决方案。你永远不知道谁会觉得它有用。
      【解决方案6】:

      这对于 RxJava 2.x 来说非常简单:

      try {
          Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
                  doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
                  .toFuture().get();
      } catch (InterruptedException e) {
          e.printStackTrace();
      } catch (ExecutionException e) {
          e.printStackTrace();
      }
      

      或者没有 Lambda 表示法:

      Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
                      @Override
                      public void subscribe(SingleEmitter<Baz> emitter) {
                          doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                              @Override
                              public void onBazResult(Baz result) {
                                  emitter.onSuccess(result);
                              }
                          });
                      }
                  }).toFuture().get();
      

      更简单:

      Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
                      doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
                      .blockingGet();
      

      Kotlin 版本:

      val baz = Single.create<Baz> { emitter -> 
          doSomethingAsync(fooArg, barArg) { result -> emitter.onSuccess(result) } 
      }.blockingGet()
      

      【讨论】:

        【解决方案7】:

        最简单的方法(对我有用)是

        1. 创建阻塞队列
        2. 调用异步方法 - 使用向阻塞队列提供结果的处理程序。
        3. 轮询队列(这就是 你阻止)的结果。

          public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) throws InterruptedException {
              final BlockingQueue<Baz> blocker = new LinkedBlockingQueue();
              doSomethingAsync(fooArg, barArg, blocker::offer);
              // Now block until response or timeout
              return blocker.poll(30, TimeUnit.SECONDS);
          }
          

        【讨论】:

        • @oguzismail 感谢您强调这一点。我添加了一些必要的细节。由于它提供的解决方案的性质,它仍然是一个非常简短的答案。
        • 这是最好的答案
        猜你喜欢
        • 2018-04-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-03-12
        • 2013-05-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多