【问题标题】:Asynchronous Iterator异步迭代器
【发布时间】:2014-02-04 07:26:27
【问题描述】:

我有以下代码:

while(slowIterator.hasNext()) {
  performLengthTask(slowIterator.next());
}

因为迭代器和任务都很慢,所以将它们放在单独的线程中是有意义的。这是迭代器包装器的快速而肮脏的尝试:

class AsyncIterator<T> implements Iterator<T> {
    private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);

    private AsyncIterator(final Iterator<T> delegate) {
      new Thread() {
        @Override
        public void run() {
          while(delegate.hasNext()) {
            queue.put(delegate.next()); // try/catch removed for brevity
          }
        }
      }.start();
    }

    @Override
    public boolean hasNext() {
      return true;
    }

    @Override
    public T next() {
        return queue.take(); // try/catch removed for brevity
    }
    // ... remove() throws UnsupportedOperationException
  }

然而,这个实现缺乏对“hasNext()”的支持。 hasNext() 方法当然可以阻塞,直到它知道是否返回 true。我可以在我的 AsyncIterator 中有一个 peek 对象,我可以更改 hasNext() 以从队列中获取一个对象并让 next() 返回这个 peek。但这会导致 hasNext() 无限期阻塞,如果已达到委托迭代器的结尾。

我当然可以自己进行线程通信,而不是使用 ArrayBlockingQueue:

private static class AsyncIterator<T> implements Iterator<T> {

  private final Queue<T> queue = new LinkedList<T>();
  private boolean delegateDone = false;

  private AsyncIterator(final Iterator<T> delegate) {
    new Thread() {
      @Override
      public void run() {
        while (delegate.hasNext()) {
          final T next = delegate.next();
          synchronized (AsyncIterator.this) {
            queue.add(next);
            AsyncIterator.this.notify();
          }
        }
        synchronized (AsyncIterator.this) {
          delegateDone = true;
          AsyncIterator.this.notify();
        }
      }
    }.start();
  }

  @Override
  public boolean hasNext() {
    synchronized (this) {
      while (queue.size() == 0 && !delegateDone) {
        try {
          wait();
        } catch (InterruptedException e) {
          throw new Error(e);
        }
      }
    }
    return queue.size() > 0;
  }

  @Override
  public T next() {
    return queue.remove();
  }

  @Override
  public void remove() {
    throw new UnsupportedOperationException();
  }
}

然而,所有额外的同步、等待和通知并没有真正使代码更具可读性,而且很容易在某处隐藏竞争条件。

有更好的想法吗?

更新

是的,我确实知道常见的观察者/可观察模式。然而,通常的实现并没有预见到数据流的终结,它们也不是迭代器。

我在这里特别想要一个迭代器,因为实际上上面提到的循环存在于一个外部库中,它需要一个迭代器。

【问题讨论】:

  • 听起来像一个经典的生产者/消费者问题stackoverflow.com/questions/2332537/…,除了你希望每个线程只有一个
  • 正常使用迭代器,将任务转储到ExecutorService。这不应该需要重新发明抽象。
  • 考虑使用 rxjava (github.com/Netflix/RxJava):它完全符合您的要求。它是一个以称为“Observable”的异步可迭代类型为中​​心的库。它完全充实了一整套转换、聚合和并发功能。
  • @LouisWasserman:不,如果 Iterator(更新了我的问题),我特别需要一个实现。
  • @isnot2bad:没错。迭代器会产生大量 I/O 负载,而处理会产生大量 CPU 负载。如果我同步执行此操作,我的 CPU 和硬盘会轮流闲置,而另一个则处于压力之下。

标签: java multithreading asynchronous concurrency


【解决方案1】:

这是一个棘手的问题,但我认为这次我得到了正确的答案。 (我删除了我的第一个答案。)

答案是使用哨兵。我还没有测试过这段代码,为了清楚起见,我删除了 try/catch:

public class AsyncIterator<T> implements Iterator<T> {

    private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);
    private T sentinel = (T) new Object();
    private T next;

    private AsyncIterator(final Iterator<T> delegate) {
        new Thread() {
            @Override
            public void run() {
                while (delegate.hasNext()) {
                    queue.put(delegate.next());
                }
                queue.put(sentinel);
            }
        }.start();
    }

    @Override
    public boolean hasNext() {
        if (next != null) {
            return true;
        }
        next = queue.take(); // blocks if necessary
        if (next == sentinel) {
            return false;
        }
        return true;
    }

    @Override
    public T next() {
        T tmp = next;
        next = null;
        return tmp;
    }

}

这里的见解是 hasNext() 需要阻塞直到下一个项目准备好。它还需要某种退出条件,并且由于线程问题,它不能为此使用空队列或布尔标志。哨兵在没有任何锁定或同步的情况下解决了问题。

编辑:缓存“next”,因此可以多次调用 hasNext()。

【讨论】:

  • 看起来很棒!实际上我想过这样的解决方案,但我从未尝试过,因为我不想使用null 作为哨兵(可能是列表的一部分),我认为(T) new Object() 肯定会导致 ClassCastException,因此从未尝试过。但是,您当然是对的!仔细想想,这很有道理。
  • 但是这里有一个(可解决的)问题:hasNext() 可能比next() 更频繁地被调用。所以:iterator.hasNext(); iterator.hasNext(); sysout(iterator.next()); 的输出应该与iterator.hasNext(); sysout(iterator.next()); 相同,但这里不同。您需要缓存 hasNext() 的结果。我建议你实现它,然后我很乐意接受你的回答:-)。
  • @ccleve 从委托中读取的后台线程应该是一个守护线程,否则应用程序可以在使用关闭挂钩触发关闭时保持挂起。我对您的实现进行了一些测试,它似乎工作正常。
  • @ccleve:这不适用于迭代器中的空元素。改变并不难(我实际上去了一个额外的布尔 hasNextKnown 属性)。但是如果没有 null 元素,那就没问题了。我会接受你的回答,但是我建议你要么修复 null 问题,要么为在谷歌上找到此代码并尝试天真使用它的其他人添加有关限制的备注。
  • @ccleve 我有 AsyncIterator 的改进(完整)版本。我应该编辑答案中的代码以显示我的版本吗?
【解决方案2】:

或者省去你的麻烦并使用 RxJava:

import java.util.Iterator;

import rx.Observable;
import rx.Scheduler;
import rx.observables.BlockingObservable;
import rx.schedulers.Schedulers;

public class RxAsyncIteratorExample {

    public static void main(String[] args) throws InterruptedException {
        final Iterator<Integer> slowIterator = new SlowIntegerIterator(3, 7300);

        // the scheduler you use here will depend on what behaviour you
        // want but io is probably what you want
        Iterator<Integer> async = asyncIterator(slowIterator, Schedulers.io());
        while (async.hasNext()) {
            performLengthTask(async.next());
        }
    }

    public static <T> Iterator<T> asyncIterator(
            final Iterator<T> slowIterator,
            Scheduler scheduler) {

        final Observable<T> tObservable = Observable.from(new Iterable<T>() {
            @Override
            public Iterator<T> iterator() {
                return slowIterator;
            }
        }).subscribeOn(scheduler);

        return BlockingObservable.from(tObservable).getIterator();
    }

    /**
     * Uninteresting implementations...
     */
    public static void performLengthTask(Integer integer)
            throws InterruptedException {
        log("Running task for " + integer);
        Thread.sleep(10000l);
        log("Finished task for " + integer);
    }

    private static class SlowIntegerIterator implements Iterator<Integer> {
        private int count;
        private final long delay;

        public SlowIntegerIterator(int count, long delay) {
            this.count = count;
            this.delay = delay;
        }

        @Override
        public boolean hasNext() {
            return count > 0;
        }

        @Override
        public Integer next() {
            try {
                log("Starting long production " + count);
                Thread.sleep(delay);
                log("Finished long production " + count);
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return count--;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    private static final long startTime = System.currentTimeMillis();

    private static void log(String s) {
        double time = ((System.currentTimeMillis() - startTime) / 1000d);
        System.out.println(time + ": " + s);
    }
}

给我:

0.031: Starting long production 3
7.332: Finished long production 3
7.332: Starting long production 2
7.333: Running task for 3
14.633: Finished long production 2
14.633: Starting long production 1
17.333: Finished task for 3
17.333: Running task for 2
21.934: Finished long production 1
27.334: Finished task for 2
27.334: Running task for 1
37.335: Finished task for 1

【讨论】:

  • 嗨,我看到 Rx 文档不推荐 BlockingObservable。这是将许多项目的普通迭代器变成可观察的唯一方法吗,因为这就是我想要做的。
猜你喜欢
  • 2018-12-08
  • 2014-06-11
  • 1970-01-01
  • 2020-01-15
  • 2021-10-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多