【问题标题】:Run Runnables concurrently in order按顺序同时运行 Runnables
【发布时间】:2017-06-19 20:08:25
【问题描述】:

这是我的情况:

  • 我有几个线程应该做后台工作,最好是使用 ThreadPool/ExecutorService 等
  • 有很多 Runnables 定期生成调用一个长方法。它们应该由后台工作人员处理。
  • runnables 有一个执行顺序(大约)。有趣的是:排序是动态的,随时可能发生变化。因此,应该尽可能晚地决定下一个运行哪个可运行,直接在运行之前。
  • 应该可以停止所有当前工作的可运行文件。如果无法做到这一点,则应通知他们,以便他们在完成工作后放弃工作。

我真的不知道如何解决这个问题,而且我对多线程和 Java 的 API 也不是很熟悉。

关于订购

我的意思是按顺序大约:如果他们按顺序开始,那就足够了。每个 Runnable 都在地图的瓦片上做一些工作。这个想法是以这样一种方式对runnables进行排序,首先加载用户正在查看的位置附近的瓷砖,然后再加载周围环境。请注意,因此执行顺序可能随时更改。

【问题讨论】:

  • 某种类型的优先队列,也许?
  • 如果您需要Runnables 才能按顺序运行,那么为什么要并行运行它们呢?
  • @Hovercraft Full Of Eels 是否允许 PriorityQueues 允许 Comparator 的排名突然改变?
  • @tsolakp 应该按顺序大约运行。订购的越多越好。
  • “大约按顺序”是什么意思?如何判断近似误差何时过大?如果需要任何订单,那么为什么要并行运行它们呢?每个流程的某些部分是否依赖于其他部分?流要么是串行的,要么是并行的,永远不会是微弱的。那么哪些部分是哪些呢?编写某种门,强制相关流等待它们的先决条件。

标签: java multithreading concurrency


【解决方案1】:

一种解决方案是将所有要处理的作业放入 PriorityBlockingQueue。 (此队列使用队列项的自然顺序或通过提供比较器自动排序)。那么在 ExecutorService 中运行的线程应该只从队列中获取元素。

例如

import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityQueueExample {

    public static void main(String[] args) throws InterruptedException {
        PriorityQueueExample priorityQueueExample = new PriorityQueueExample();
        priorityQueueExample.doTheWork();

    }

    private void doTheWork() throws InterruptedException {
        PriorityBlockingQueue<Customer> queue = new PriorityBlockingQueue<>(10, new CustomerComparator());
        queue.add(new Customer("John", 5));
        queue.add(new Customer("Maria", 2));
        queue.add(new Customer("Ana", 1));
        queue.add(new Customer("Pedro", 3));


        while(queue.size() > 0){
            System.out.println(queue.take());
        }
    }
}

class CustomerComparator implements Comparator<Customer> {

    @Override
    public int compare(Customer o1, Customer o2) {
        return o1.getUrgency() - o2.getUrgency();
    }
}

class Customer {
    private String name;
    private int urgency;

    public Customer(String name, int urgency) {
        this.name = name;
        this.urgency = urgency;
    }

    public String getName() {
        return name;
    }

    public int getUrgency() {
        return urgency;
    }

    @Override
    public String toString() {
        return "Customer{" +
                "name='" + name + '\'' +
                ", urgency=" + urgency +
                '}';
    }
}

【讨论】:

  • 如果优先级队列能够处理元素的顺序可能随时发生变化,这将是一个很好的解决方案。可悲的是,用于检索元素的dequeue() 始终采用数组中的第一个元素而不进行比较,而且它也是私有的,因此我无法使用始终返回正确元素的自定义实现来覆盖它。
  • 好的。我看到了问题。我回复了一个新答案以提供示例代码。
【解决方案2】:

1) 让您的瓷砖实现Callable。您也可以让他们返回Callable

2) 确定哪些是最先加载的位置。

3) 将它们或它们的Callables 传递给java.util.concurrent.ExecutorService.invokeAll

4) 一旦invokeAll 返回,获取与之前相邻的下一组图块并再次调用java.util.concurrent.ExecutorService.invokeAll

5) 如有必要,重复第 4 步。

【讨论】:

    【解决方案3】:

    您还可以使用 List 来模拟优先级队列。例如:

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    public class ListEmulateQueueExample {
    
        public static void main(String[] args) throws InterruptedException {
            ListEmulateQueueExample priorityQueueExample = new ListEmulateQueueExample();
            priorityQueueExample.doTheWork();
        }
    
        /**
         * uses a list to emulate a queue.
         */
        private void doTheWork() {
            List<Customer> customerList = Collections.synchronizedList(new ArrayList<>());
    
            Customer johnCustomer = new Customer("John", 5);
            Customer mariaCustomer = new Customer("Maria", 3);
            Customer anaCustomer = new Customer("Ana", 1);
    
            customerList.add(johnCustomer);
            customerList.add(mariaCustomer);
            customerList.add(anaCustomer);
    
            CustomerComparator customerComparator = new CustomerComparator();
            synchronized (customerList){
                customerList.sort(customerComparator);
            }
    
    
            System.out.println(customerList.remove(0));  // Ana
    
    
            johnCustomer.setUrgency(1);
            synchronized (customerList){
                customerList.sort(customerComparator);
            }
    
            System.out.println(customerList.remove(0));  // John
    
        }
    }
    

    【讨论】:

    • 遗憾的是,这也不起作用,因为在 ExecutorService 取出下一个元素之前,我必须完全 对列表进行排序。此外,如果只需要第一个元素,我不会将整个数组排序为 n*log(n) 到 n^2 复杂度。手动搜索它只需要 log(n) 到 n。我自己想出了一个解决方案,感谢您的帮助和意见。
    【解决方案4】:

    所以,我终于找到了解决这个问题的方法。它不是那么漂亮,也不是一种 hack,但它可以按预期工作。

    这个想法是:如果每个 Runnable 都是无状态的并且只调用一个方法,则它不需要知道它应该在创建时处理的图块。相反,它会在启动后要求提供所需的磁贴

    public class WorldRendererGL {
    
        protected Map<Vector2i, RenderedRegion> regions     = new ConcurrentHashMap<>();
        protected Queue<RegionLoader>           running     = new ConcurrentLinkedQueue<>();
        protected Set<RenderedRegion>           todo        = ConcurrentHashMap.newKeySet();
    
        protected ExecutorService               executor;
    
        /** Recalculate everything */
        public void invalidateTextures() {
            //Abort current calculations
            running.forEach(f -> f.invalid.set(true));
            running.clear();
            todo.addAll(regions.values());
    
            for (int i = 0; i < regions.size(); i++) {
                RegionLoader loader = new RegionLoader();
                running.add(loader);
                executor.submit(loader);
            }
        }
    
        protected class RegionLoader implements Runnable {
    
            /** Set this to true to nullify all calculations*/
            final AtomicBoolean invalid = new AtomicBoolean(false);
    
            @Override
            public void run() {
                try {
                    if (invalid.get())
                        return;
                    RenderedRegion region = null;
                    region = nextRegion(); // Get the correct work at runtime
                    if (region == null)
                        return;
                    BufferedImage texture = renderer.renderRegion(new RegionFile(region.region.regionFile));
                    if (!invalid.get()) {
                        region.texture = texture;
                        update.notifyObservers();
                    }
                } catch (Throwable e) {
                    e.printStackTrace();
                }
            }
        }
    
        protected RenderedRegion nextRegion() {
            Comparator<RenderedRegion> comp = (a, b) -> /*...*/);
            RenderedRegion min = null;
            for (Iterator<RenderedRegion> it = todo.iterator(); it.hasNext();) {
                RenderedRegion r = it.next();
                if (min == null || comp.compare(min, r) > 0)
                    min = r;
            }
            todo.remove(min);
            return min;
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-09-14
      • 1970-01-01
      • 2013-08-13
      • 2019-09-07
      • 2010-09-09
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多