【问题标题】:Single threading a task without queuing further requests单线程任务而不排队进一步的请求
【发布时间】:2011-02-10 08:08:12
【问题描述】:

我需要异步执行任务,同时丢弃任何进一步的请求,直到任务完成。

同步方法只是将任务排队并且不会跳过。我最初想使用 SingleThreadExecutor 但也将任务排队。然后我查看了 ThreadPoolExecutor,但它读取队列以获取要执行的任务,因此将执行一项任务并且至少有一项任务排队(其他任务可以使用 ThreadPoolExecutor.DiscardPolicy 丢弃)。

我唯一能想到的就是使用信号量来阻塞队列。我提供了以下示例来展示我想要实现的目标。有没有更简单的方法?我错过了什么明显的东西吗?

import java.util.concurrent.*;

public class ThreadPoolTester {
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    private static Semaphore processEntry = new Semaphore(1);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        if (!processEntry.tryAcquire()) return;
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        try {
                            System.out.println("start " + index);
                            Thread.sleep(1000); // pretend to do work
                            System.out.println("stop " + index);
                            return null;

                        } finally {
                            processEntry.release();
                        }
                    }
                }
            );
    }
}

样本输出

start 0
stop 0
start 5
stop 5
start 10
stop 10
start 15
stop 15

采用 axtavt 的答案并转换上面的示例给出了以下更简单的解决方案。

import java.util.concurrent.*;

public class SyncQueueTester {
    private static ExecutorService executor = new ThreadPoolExecutor(1, 1, 
            1000, TimeUnit.SECONDS, 
            new SynchronousQueue<Runnable>(),
            new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        System.out.println("start " + index);
                        Thread.sleep(1000); // pretend to do work
                        System.out.println("stop " + index);
                        return null;
                    }
                }
            );
    }
}

【问题讨论】:

    标签: java concurrency unit-of-work java.util.concurrent


    【解决方案1】:

    看起来由 SynchronousQueue 支持的执行器具有所需的策略可以满足您的需求:

    executor = new ThreadPoolExecutor(
        1, 1, 
        1000, TimeUnit.SECONDS, 
        new SynchronousQueue<Runnable>(),
        new ThreadPoolExecutor.DiscardPolicy());
    

    【讨论】:

    • 你能告诉我,为什么你更喜欢使用 DiscardPolicy 而不是捕获 RejectedExecutionException?
    【解决方案2】:

    如果没有队列,我会说不需要执行器。单独使用信号量似乎就足够了。我正在使用下面的代码来避免在它已经运行时运行相同的代码。只需确保semaphorestatic volatile,这使信号量成为该类的唯一信号量,并在更改后立即将信号量引用传播到其他线程的堆

    if (this.getSemaphore().tryAcquire()) {
            try {
                process();
            } catch (Exception e) {
            } finally {
                this.getSemaphore().release();
            }
    }
    else {
        logger.info(">>>>> Job already running, skipping go");
    }
    

    【讨论】:

    • 任务需要在单独的线程中运行,以免主线程被阻塞或受到其他影响(“主线”实际上是一个摇摆应用程序)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-02-14
    • 1970-01-01
    • 1970-01-01
    • 2015-07-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多