【问题标题】:ScheduledThreadPoolExecutor scheduleWithFixedDelay and "urgent" executionScheduledThreadPoolExecutor scheduleWithFixedDelay 和“紧急”执行
【发布时间】:2009-09-09 19:13:47
【问题描述】:

我有以下标准库不能很好地解决的问题,我想知道是否有人见过另一个库,所以我不需要一起破解自定义解决方案。我有一个当前使用 scheduleWithFixedDelay() 在线程池上调度的任务,我需要修改代码以处理与异步事件相关的任务的“紧急”执行请求。因此,如果任务计划在执行之间延迟 5 分钟发生,并且在最后一次执行完成后 2 分钟发生事件,我想立即执行任务,然后在完成后等待 5 分钟再次运行之前的紧急执行。现在我能想到的最好的解决方案是让事件处理程序在 scheduleWithFixedDelay() 返回的 ScheduledFuture 对象上调用 cancel() 并立即执行任务,然后在任务中设置一个标志来告诉它重新安排自己具有相同的延迟参数。此功能是否已经可用,而我只是在文档中遗漏了一些内容?

【问题讨论】:

  • 概述的解决方案听起来不错。与其让紧急任务重新添加计划任务,不如让事件处理程序能够很好地执行此操作。无论如何,它都需要处理新的 Future。

标签: java


【解决方案1】:

根据要求(soz,很匆忙)我的 EventBasedExecutor

警告:这目前仅适用于定期运行的计划任务。您可以更改代码来处理所有任务,我到目前为止还没有,因为我只有定期运行的任务。我还在一个单线程线程池中运行它(我只需要一个调度的运行器线程,即每 X 秒一直在一个专用线程中运行)

我们开始吧:

public class EventBasedExecutor extends ScheduledThreadPoolExecutor implements EventBasedExecutorService {

    private List<RunnableScheduledFuture<?>> workers = new ArrayList<>();

    private int index;

    public EventBasedExecutor(int corePoolSize) {
        super(corePoolSize,  new ThreadFactoryBuilder().setDaemon(true).setNameFormat("message-sender-%d").build());
    }

    @Override
    protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
        if(!workers.contains(runnable)) {
            workers.add(task);
        }
        return super.decorateTask(runnable, task);
    }

    @Override
    public void executeEarly() {
        if(index >= workers.size()) {
            index = 0;
        }

        if(workers.size() == 0) {
            return;
        }

        RunnableScheduledFuture<?> runnableScheduledFuture = workers.get(index);
        index ++;
        execute(runnableScheduledFuture);
        System.out.println("Executing");
    }


    public static void main(String[] args) throws InterruptedException {
        EventBasedExecutor executor = new EventBasedExecutor(10);

        long currentTimeMillis = System.currentTimeMillis();

        // this will never run
        executor.scheduleAtFixedRate(() -> {
            System.out.println("hello");
        }, 5000, 5000, TimeUnit.HOURS);

        executor.executeEarly();

        System.out.println("Run after: " + (System.currentTimeMillis() - currentTimeMillis));
    }

}

这将在专用工作线程中执行任务。

它将打印:

Executing
hello
Run after: 39

玩得开心 :)

阿图尔

【讨论】:

    【解决方案2】:

    如果您使用ScheduledThreadPoolExecutor,则可以覆盖一个方法decorateTask(实际上有两个,用于可运行和可调用任务),您可以覆盖该方法以在某处存储对该任务的引用。

    当您需要紧急执行时,您只需在该引用上调用 run() 即可使其运行并以相同的延迟重新安排。

    快速破解尝试:

    公共类 UrgentScheduledThreadPoolExecutor 扩展
            ScheduledThreadPoolExecutor {
        RunnableScheduledFuture 调度任务;
    
        公共 UrgentScheduledThreadPoolExecutor(int corePoolSize) {
            超级(核心池大小);
        }
    
        @覆盖
        protected RunnableScheduledFuture decorateTask(Runnable runnable,
                RunnableScheduledFuture 任务){
            计划任务 = 任务;
            return super.decorateTask(runnable, task);
        }
    
        公共无效运行紧急(){
            this.scheduledTask.run();
        }
    }

    可以这样使用:

    公共类 UrgentExecutionTest {
    
        公共静态 void main(String[] args) 抛出异常 {
            UrgentScheduledThreadPoolExecutor pool = new UrgentScheduledThreadPoolExecutor(5);
    
            pool.scheduleWithFixedDelay(new Runnable() {
                SimpleDateFormat 格式 = new SimpleDateFormat("ss");
    
                @覆盖
                公共无效运行(){
                    System.out.println(format.format(new Date()));
                }
            }, 0, 2L, TimeUnit.SECONDS);
            线程.sleep(7000);
            pool.runUrgently();
            pool.awaitTermination(600, TimeUnit.SECONDS);
        }
    }

    并产生以下输出: 06 08 10 11 13 15

    【讨论】:

    • 这不会启动线程池中的线程,而是启动当前线程。需要一种在后台启动它们的方法,就像在其他情况下启动它们一样......
    • @pandaadb 你有什么解决办法吗?
    • @AppiDevo 我确实这样做了。您有问题要回答还是我应该在此处添加?
    • @pandaadb 我没有悬而未决的问题。您可以在此处发布作为答案,而不是作为评论。
    • @AppiDevo 没问题。给我几分钟,我正在处理一个不同的问题并编写一个测试用例:)
    【解决方案3】:

    还有一个明显简单的解决方案,不需要新类。 这个想法是取消通知的时间表并再次重新安排:

    class MyClass {
      
        final static int DECISION_POINT = 1; //millisecond
    
        final ScheduledExecutorService executor = newSingleThreadScheduledExecutor();
        private ScheduledFuture<?> periodicFuture;
    
        MyClass() {
            periodicFuture = executor.scheduleWithFixedDelay(this::doWork, 1, 2,
                TimeUnit.SECONDS);
        }
    
        void doWorkAsap() {
            if (periodicFuture.getDelay(TimeUnit.MILLISECONDS) > DECISION_POINT) {
                periodicFuture.cancel(true);
                periodicFuture = executor.scheduleWithFixedDelay(this::doWork,
                    0, 2000, TimeUnit.MILLISECONDS);
            }
        }
    
        void doWork() { ... }
    
    }
    

    这仅在任务之间的延迟相对于整体系统性能相当大并且创建新 ScheduledFuture 的开销是可接受的某些情况下才有效。此外,需要特别注意不归路,这里称为DECISION_POINT,在这里安排新的未来没有意义,因为事情的自然顺序已经足够快了。对于比上例更紧凑的时间表,请使用类似于 pandaab 的方法。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-08-14
      • 2021-10-13
      • 2012-11-08
      • 2019-10-14
      • 2013-05-05
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多