我们通过以下代码找到了该问题的解决方案:
这个队列是一个混合的 SynchronousQueue / LinkedBlockingQueue。
public class OverflowingSynchronousQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 1L;
private SynchronousQueue<E> synchronousQueue = new SynchronousQueue<E>();
public OverflowingSynchronousQueue() {
super();
}
public OverflowingSynchronousQueue(int capacity) {
super(capacity);
}
@Override
public boolean offer(E e) {
// Create a new thread or wake an idled thread
return synchronousQueue.offer(e);
}
public boolean offerToOverflowingQueue(E e) {
// Add to queue
return super.offer(e);
}
@Override
public E take() throws InterruptedException {
// Return tasks from queue, if any, without blocking
E task = super.poll();
if (task != null) {
return task;
} else {
// Block on the SynchronousQueue take
return synchronousQueue.take();
}
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// Return tasks from queue, if any, without blocking
E task = super.poll();
if (task != null) {
return task;
} else {
// Block on the SynchronousQueue poll
return synchronousQueue.poll(timeout, unit);
}
}
}
为了让它工作,我们需要包装 RejectedExecutionHandler 以在任务被拒绝时调用“offerToOverflowingQueue”。
public class OverflowingRejectionPolicyAdapter implements RejectedExecutionHandler {
private OverflowingSynchronousQueue<Runnable> queue;
private RejectedExecutionHandler adaptedRejectedExecutionHandler;
public OverflowingRejectionPolicyAdapter(OverflowingSynchronousQueue<Runnable> queue,
RejectedExecutionHandler adaptedRejectedExecutionHandler)
{
super();
this.queue = queue;
this.adaptedRejectedExecutionHandler = adaptedRejectedExecutionHandler;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!queue.offerToOverflowingQueue(r)) {
adaptedRejectedExecutionHandler.rejectedExecution(r, executor);
}
}
}
这是我们如何创建 ThreadPoolExecutor
public static ExecutorService newSaturatingThreadPool(int corePoolSize,
int maxPoolSize,
int maxQueueSize,
long keepAliveTime,
TimeUnit timeUnit,
String threadNamePrefix,
RejectedExecutionHandler rejectedExecutionHandler)
{
OverflowingSynchronousQueue<Runnable> queue = new OverflowingSynchronousQueue<Runnable>(maxQueueSize);
OverflowingRejectionPolicyAdapter rejectionPolicyAdapter = new OverflowingRejectionPolicyAdapter(queue,
rejectedExecutionHandler);
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize,
keepAliveTime,
timeUnit,
queue,
new NamedThreadFactory(threadNamePrefix),
rejectionPolicyAdapter);
return executor;
}