【问题标题】:Scheduling tasks with a maximum / minimum duration between tasks以任务之间的最大/最小持续时间安排任务
【发布时间】:2015-12-28 18:58:02
【问题描述】:

刷新数据库中的记录。我们要么得到一个明确的刷新通知,要么每 60 秒轮询一次。每秒刷新不超过一次。

如果有请求进来,如果在一秒钟内没有发生请求,它应该排队立即刷新。否则,它应该在最后一次刷新结束后安排刷新 1 秒,除非此类任务已经安排在该时间或更早的时间。

在没有显式刷新的一分钟后,计时器应启动并刷新,以防未发送通知。

可能会有大量通知进入(每秒数百个)。

刷新可以通过一个单独的线程来完成。

什么是优雅的设计方式?

这是我所拥有的,但这可能会导致请求过多:

private NotificationCenter() {
    recordFetchService = Executors.newSingleThreadScheduledExecutor();
    recordFetchService.scheduleWithFixedDelay(refreshCommand, minTimeBetweenRefresh, maxTimeBetweenRefresh, TimeUnit.MILLISECONDS);
}

private void queueRefresh() {
    // explicit refresh requested. Schedule a refreshCommand to fire immediately, unless that would break our contract
    if (!pending.isDone() && pending.getDelay(TimeUnit.MILLISECONDS) < minTimeBetweenRefresh) {
        // a refresh is already scheduled
    } else {
        pending = recordFetchService.schedule(refreshCommand, 0L, TimeUnit.MILLISECONDS);
    }
}

【问题讨论】:

    标签: java multithreading timer scheduledexecutorservice


    【解决方案1】:

    在“每秒数百条通知”的情况下,AtomicBoolean 会想到将状态从“什么都不做”切换到“要做某事”,反之亦然。将“要做某事”状态与Semaphore 结合起来,您可以选择确定“要做某事”发生的确切时间。

    下面是一个(可运行的)示例实现/设计,它结合了AtomicBooleanSemaphore 以在使用通知时定期刷新数据。这可能不是最优雅的方式,但我确实认为它以相对直接的方式实现了目标。

    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class RefreshTask {
    
        private static final long MIN_WAIT_MS = 100L;
        private static final long MAX_WAIT_MS = 1000L;
    
        private ScheduledExecutorService scheduler;
        private ExecutorService executor;
        private volatile boolean stopping; 
    
        private final Semaphore refreshLock = new Semaphore(0);
        private final AtomicBoolean refreshing = new AtomicBoolean();
        private volatile long lastRefresh;
    
        public void start() {
    
            stopping = false;
            refreshing.set(true);
            lastRefresh = System.currentTimeMillis();
            executor = Executors.newSingleThreadExecutor();
            executor.execute(new RefreshLoop());
            scheduler = Executors.newSingleThreadScheduledExecutor();
        }
    
        public void stop() {
    
            stopping = true;
            if (executor != null) {
                refreshLock.release();
                scheduler.shutdownNow();
                executor.shutdownNow();
            }
        }
    
        /** Trigger a (scheduled) refresh of data. */
        public void refresh() {
    
            if (refreshing.compareAndSet(false, true)) {
                final long dataAge = System.currentTimeMillis() - lastRefresh;
                if (dataAge >= MIN_WAIT_MS) {
                    refreshLock.release();
                    // println("Refresh lock released.");
                } else {
                    long waitTime = MIN_WAIT_MS - dataAge;
                    scheduler.schedule(new RefreshReleaser(), waitTime, TimeUnit.MILLISECONDS);
                    println("Refresh scheduled in " + waitTime + " ms.");
                }
            } else {
                // println("Refresh already triggered.");
            }
        }
    
        protected void refreshData() {
    
            // Refresh data from database
            println("DATA refresh");
        }
    
        class RefreshLoop implements Runnable {
    
            @Override
            public void run() {
    
                while (!stopping) {
                    try {
                        refreshData();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    lastRefresh = System.currentTimeMillis();
                    refreshing.set(false);
                    try {
                        if (!refreshLock.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS)) {
                            if (!refreshing.compareAndSet(false, true)) {
                                // Unlikely state, but can happen if "dataAge" in the refresh-method is around MAX_WAIT_MS.
                                // Resolve the race-condition by removing the extra permit.
                                if (refreshLock.tryAcquire()) {
                                    println("Refresh lock race-condition detected, removed additional permit.");
                                } else {
                                    println("Refresh lock race-condition detected, but no additional permit found.");
                                }
                            }
                            println("Refreshing after max waiting time.");
                        } // else refreshing already set to true
                    } catch (InterruptedException ie) {
                        if (!stopping) {
                            ie.printStackTrace();
                        }
                    }
                }
                println("Refresh loop stopped.");
            }
        }
    
        class RefreshReleaser implements Runnable {
    
            @Override
            public void run() {
    
                if (refreshing.get()) {
                    refreshLock.release();
                    println("Scheduled refresh lock release.");
                } else {
                    println("Programming error, scheduled refresh lock release can only be done in refreshing state.");
                }
            }
        }
    
        /* *** some testing *** */
    
        public static void main(String[] args) {
    
            RefreshTask rt = new RefreshTask();
            try {
                println("Starting");
                rt.start();
                Thread.sleep(2 * MIN_WAIT_MS);
                println("Triggering refresh");
                rt.refresh();
                Thread.sleep(MAX_WAIT_MS + (MIN_WAIT_MS / 2));
                println("Triggering refresh 2");
                rt.refresh();
                Thread.sleep(MIN_WAIT_MS);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                rt.stop();
            }
        }
    
        public static final long startTime = System.currentTimeMillis();
    
        public static void println(String msg) {
            println(System.currentTimeMillis() - startTime, msg);
        }
    
        public static void println(long tstamp, String msg) {
            System.out.println(String.format("%05d ", tstamp) + msg);
        }
    
    }
    

    【讨论】:

      猜你喜欢
      • 2020-11-15
      • 2016-10-09
      • 2017-08-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-11
      • 2021-07-28
      • 2021-02-06
      相关资源
      最近更新 更多