【发布时间】:2014-02-04 07:26:27
【问题描述】:
我有以下代码:
while(slowIterator.hasNext()) {
performLengthTask(slowIterator.next());
}
因为迭代器和任务都很慢,所以将它们放在单独的线程中是有意义的。这是迭代器包装器的快速而肮脏的尝试:
class AsyncIterator<T> implements Iterator<T> {
private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);
private AsyncIterator(final Iterator<T> delegate) {
new Thread() {
@Override
public void run() {
while(delegate.hasNext()) {
queue.put(delegate.next()); // try/catch removed for brevity
}
}
}.start();
}
@Override
public boolean hasNext() {
return true;
}
@Override
public T next() {
return queue.take(); // try/catch removed for brevity
}
// ... remove() throws UnsupportedOperationException
}
然而,这个实现缺乏对“hasNext()”的支持。 hasNext() 方法当然可以阻塞,直到它知道是否返回 true。我可以在我的 AsyncIterator 中有一个 peek 对象,我可以更改 hasNext() 以从队列中获取一个对象并让 next() 返回这个 peek。但这会导致 hasNext() 无限期阻塞,如果已达到委托迭代器的结尾。
我当然可以自己进行线程通信,而不是使用 ArrayBlockingQueue:
private static class AsyncIterator<T> implements Iterator<T> {
private final Queue<T> queue = new LinkedList<T>();
private boolean delegateDone = false;
private AsyncIterator(final Iterator<T> delegate) {
new Thread() {
@Override
public void run() {
while (delegate.hasNext()) {
final T next = delegate.next();
synchronized (AsyncIterator.this) {
queue.add(next);
AsyncIterator.this.notify();
}
}
synchronized (AsyncIterator.this) {
delegateDone = true;
AsyncIterator.this.notify();
}
}
}.start();
}
@Override
public boolean hasNext() {
synchronized (this) {
while (queue.size() == 0 && !delegateDone) {
try {
wait();
} catch (InterruptedException e) {
throw new Error(e);
}
}
}
return queue.size() > 0;
}
@Override
public T next() {
return queue.remove();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
然而,所有额外的同步、等待和通知并没有真正使代码更具可读性,而且很容易在某处隐藏竞争条件。
有更好的想法吗?
更新
是的,我确实知道常见的观察者/可观察模式。然而,通常的实现并没有预见到数据流的终结,它们也不是迭代器。
我在这里特别想要一个迭代器,因为实际上上面提到的循环存在于一个外部库中,它需要一个迭代器。
【问题讨论】:
-
听起来像一个经典的生产者/消费者问题stackoverflow.com/questions/2332537/…,除了你希望每个线程只有一个
-
正常使用迭代器,将任务转储到
ExecutorService。这不应该需要重新发明抽象。 -
考虑使用 rxjava (github.com/Netflix/RxJava):它完全符合您的要求。它是一个以称为“Observable”的异步可迭代类型为中心的库。它完全充实了一整套转换、聚合和并发功能。
-
@LouisWasserman:不,如果 Iterator(更新了我的问题),我特别需要一个实现。
-
@isnot2bad:没错。迭代器会产生大量 I/O 负载,而处理会产生大量 CPU 负载。如果我同步执行此操作,我的 CPU 和硬盘会轮流闲置,而另一个则处于压力之下。
标签: java multithreading asynchronous concurrency