【问题标题】:Watching Future<T> objects for completion using ExecutorService使用 ExecutorService 观察 Future<T> 对象以完成
【发布时间】:2016-12-26 21:12:02
【问题描述】:

我有一个程序,其中我使用提交给 ExecutorService 的 Callable 对象从 S3 下载文件。文件很大,需要几分钟才能完全下载。创建另一个从下载器获取未来并观察它完成的 Callable 类是否有意义?我的最终目标是将所有完整下载添加到缓存中位于中心位置的列表中。

例如:

public void add(final String s3urlToDownload){

    Future<S3Object> futureS3obj = cachedPoolExecutor.submit(new S3Downloader(s3urlToDownload));

    // Instead of calling futureS3obj.get() and waiting, submit the Future to the "Watcher" service.
    // Within FutureWatcher, the S3Object will be added to the List once the download is complete.
    cachedPoolExecutor.submit(new FutureWatcher(downloadedList, futureS3obj))

}

【问题讨论】:

  • 如果您已经拥有Future,那么额外的Callable 有什么好处?
  • @Kayaman,据我了解,如果我调用Futureget() 方法,add 方法将阻塞。这就是为什么我提交一个新的 Callable FutureWatcher
  • 那么Callable 带来了什么?调用它的call() 方法也会阻塞,因为它委托给get()
  • 这个准确吗:有N个文件要下载。它们应该被并行下载,并且需要一种机制来知道什么时候都准备好了?如果是这样,可能还有其他方法可以解决这个问题(如果这是准确的,我可能会尝试一个答案)。
  • @MichaelEaster,没错。

标签: java future executorservice callable


【解决方案1】:

这里有一些假物品,用于说明。 “下载”只是随机休眠:

// for illustration only
class S3Object {
    String id;
}

// for illustration only
class S3Downloader {

    public S3Object download(String url) {
        int min = 2;
        int max = 5;
        Random rand = new Random();
        int random = rand.nextInt((max - min) + 1) + min;

        try { Thread.sleep(1000 * random); } catch (Exception ex) {}
        S3Object result = new S3Object();
        result.id = url;
        return result;
    }
}

我们可以定义一个任务来下载文件,更新一个(线程安全的)列表,并减少一个CountDownLatch

class MyTask implements Runnable {
    private final List<S3Object> list;
    private final CountDownLatch latch;
    private final String url; 

    public MyTask(List<S3Object> list, CountDownLatch latch, String url) {
        this.list = list;
        this.latch = latch;
        this.url = url;
    }     

    public void run() {
        S3Downloader downloader = new S3Downloader();
        S3Object result = downloader.download(url);
        list.add(result);
        latch.countDown();
    }
}

一个示例 Runner 说明了“客户端”。 go方法是驱动,使用add方法(不会阻塞):

public class Runner {
    private ExecutorService pool = Executors.newCachedThreadPool();
    private int numUrls = 20;
    private CountDownLatch latch = new CountDownLatch(numUrls);
    private List<S3Object> results = Collections.synchronizedList(new ArrayList<S3Object>());

    public void add(String url) {
        pool.submit(new MyTask(results, latch, url));
    }

    public void go() throws Exception {

        for(int i = 0; i < numUrls; i++) {
            String url = "http://example" + i;
            add(url);
        }

        // wait for all downloads
        latch.await();

        for (S3Object result : results) {
            System.out.println("result id: " + result.id);
        }
    }
}

生产代码必须处理错误并可能适当地重组客户端。

【讨论】:

  • 谢谢迈克尔,我应该能够重构我的代码以包含上述框架。
【解决方案2】:

不要制作消耗资源的“观察者”,而是让当前下载完成后通知主服务器。

【讨论】:

    猜你喜欢
    • 2018-09-22
    • 1970-01-01
    • 2021-05-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-23
    相关资源
    最近更新 更多