【问题标题】:Processing tasks in parallel and sequentially Java并行和顺序处理 Java 任务
【发布时间】:2020-05-21 23:16:52
【问题描述】:

在我的程序中,用户可以通过一个界面触发不同的任务,这需要一些时间来处理。因此它们由线程执行。到目前为止,我已经实现了它,因此我有一个带有一个线程的执行器,它一个接一个地执行所有任务。但现在我想把所有东西都并行化一点。

即我想并行运行任务,除非它们具有相同的路径,然后我想按顺序运行它们。例如,我的池中有 10 个线程,当一个任务进来时,该任务应该分配给当前正在处理具有相同路径的任务的工作人员。如果worker当前没有在处理具有相同路径的任务,则该任务应该由当前空闲的worker处理。

其他信息:任务是在本地文件系统中的文件上执行的任何类型的任务。例如,重命名文件。因此,该任务具有属性path。而且我不想同时对同一个文件执行两个任务,所以这样的路径相同的任务应该顺序执行。

这是我的示例代码,但还有工作要做:

我的一个问题是,我需要一种安全的方法来检查工作人员当前是否正在运行并获取当前正在运行的工作人员的路径。我所说的安全是指不会发生同时访问或其他线程问题。

    public class TasksOrderingExecutor {
    
        public interface Task extends Runnable {
            //Task code here
            String getPath();
        }
    
        private static class Worker implements Runnable {
    
            private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();

            //some variable or mechanic to give the actual path of the running tasks??
    
            private volatile boolean stopped;
    
            void schedule(Task task) {
                tasks.add(task);
            }
    
            void stop() {
                stopped = true;
            }
    
            @Override
            public void run() {
                while (!stopped) {
                    try {
                        Task task = tasks.take();
                        task.run();
                    } catch (InterruptedException ie) {
                        // perhaps, handle somehow
                    }
                }
            }
        }
    
        private final Worker[] workers;
        private final ExecutorService executorService;
    
        /**
         * @param queuesNr nr of concurrent task queues
         */
        public TasksOrderingExecutor(int queuesNr) {
            Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
            executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
            workers = new Worker[queuesNr];
            for (int i = 0; i < queuesNr; i++) {
                Worker worker = new Worker();
                executorService.submit(worker);
                workers[i] = worker;
            }
        }
    
        public void submit(Task task) {
            Worker worker = getWorker(task);
            worker.schedule(task);
        }
    
        public void stop() {
            for (Worker w : workers) w.stop();
            executorService.shutdown();
        }
    
        private Worker getWorker(Task task) {
            //check here if a running worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
            return workers[task.getPath() //HERE I NEED HELP//];
        }
    }

【问题讨论】:

  • “路径”是什么意思?你是说文件路径?你应该在你的问题前面解释一下。
  • 为什么要将具有相似路径的任务与具有该路径的其他任务分配给同一个工作人员?
  • @BasilBourque 是的,很抱歉我的问题中的解释较晚。是的,路径我的意思是像 C:\myfolder\mysubfolder 这样的文件路径
  • 附带说明:如果您支持重命名功能,那么,突然间,该任务有两条路径......(不是试图让事情变得更复杂,不过)。

标签: java multithreading synchronization threadpoolexecutor


【解决方案1】:

看来你有两个问题:

  • 您想要检查提交给执行器服务的任务的状态
  • 您希望并行运行任务,并可能确定它们的优先级

Future

对于第一个问题,捕获当您向执行器服务提交任务时返回的Future 对象。您可以检查Future 对象的完成状态。

Future< Task > future = myExecutorService.submit( someTask ) ;
…
boolean isCancelled = future.isCancelled() ;  // Returns true if this task was cancelled before it completed normally.
boolean isDone = future.isDone();  // Returns true if this task completed.

Future 是一种类型,该类型可以是您的 Task 类本身。调用 Future::get 会产生 Task 对象。然后,您可以查询该 Task 对象以获取其包含的文件路径。

Task task = future.get() ;
String path = task.getPath() ;  // Access field via getter from your `Task` object.

Executors

不要实例化new ThreadPoolExecutor,而是使用Executors 实用程序类代表您实例化执行程序服务。正如其 Javadoc 的第一行所述,大多数常见场景不需要直接实例化 ThreadPoolExecutor

ExecutorService es = Executors.newFixedThreadPool​( 3 ) ;  // Instantiate an executor service backed by a pool of three threads.

对于第二个问题,使用由线程池而不是单个线程支持的执行器服务。执行器服务自动将提交的任务分配给可用线程。

至于分组或优先级,使用多个执行器服务。您可以实例化多个。您可以拥有任意数量的执行器服务,前提是您的部署机器对 CPU 内核和内存的需求不会超载(考虑您的最大同时使用量)。

ExecutorService esSingleThread = Executors.newSingleThreadExecutor() ;
ExecutorService esMultiThread = Executors.newCachedThreadPool() ;

一个执行器服务可能由单个线程支持以限制部署计算机上的需求,而其他执行器服务可能由线程池支持以完成更多工作。您可以将这些多个执行器服务用作您的多个队列。如您的问题代码所示,您无需管理队列和工作人员。发明执行器是为了进一步简化多线程的工作。

并发

你说:

而且我不想同时对同一个文件执行两个任务,所以这样的路径相同的任务应该顺序执行。

你应该有一个更好的方法来处理并发冲突,只是在线程上调度任务。

Java 有办法管理对文件的并发访问。搜索以了解更多信息,因为 Stack Overflow 已经对此进行了介绍。


也许我没有完全理解你的需求,所以如果我不在基地,请发表评论。

【讨论】:

  • 问题是不应该锁定一个文件,而是一个文件夹。如果某个任务正在一个文件夹上运行,那么当另一个任务进来时,它应该等到这个文件夹上的一个任务完成。抱歉,规范不准确。
【解决方案2】:

您似乎需要某种“任务调度程序”,它根据某些标识符(这里是应用任务的文件的路径)执行或保存某些任务。

你可以使用这样的东西:

public class Dispatcher<I> implements Runnable {

/**
 * The executor used to execute the submitted task
 */
private final Executor executor;

/**
 * Map of the pending tasks
 */
private final Map<I, Deque<Runnable>> pendingTasksById = new HashMap<>();

/**
 * set containing the id that are currently executed
 */
private final Set<I> runningIds = new HashSet<>();

/**
 * Action to be executed by the dispatcher
 */
private final BlockingDeque<Runnable> actionQueue = new LinkedBlockingDeque<>();

public Dispatcher(Executor executor) {
    this.executor = executor;
}

/**
 * Task in the same group will be executed sequentially (but not necessarily in the same thread)
 * @param id the id of the group the task belong
 * @param task the task to execute
 */
public void submitTask(I id, Runnable task) {
    actionQueue.addLast(() -> {
        if (canBeLaunchedDirectly(id)) {
            executeTask(id, task);
        } else {
            addTaskToPendingTasks(id, task);
            ifPossibleLaunchPendingTaskForId(id);
        }
    });
}


@Override
public void run() {
    while (!Thread.currentThread().isInterrupted()) {
        try {
            actionQueue.takeFirst().run();
        } catch (InterruptedException e) {
            Thread.currentThread().isInterrupted();
            break;
        }
    }
}


private void addTaskToPendingTasks(I id, Runnable task) {
    this.pendingTasksById.computeIfAbsent(id, i -> new LinkedList<>()).add(task);
}


/**
 * @param id an id of a group
 * @return true if a task of the group with the provided id is currently executed
 */
private boolean isRunning(I id) {
    return runningIds.contains(id);
}

/**
 * @param id an id of a group
 * @return an optional containing the first pending task of the group,
 * an empty optional if no such task is available
 */
private Optional<Runnable> getFirstPendingTask(I id) {
    final Deque<Runnable> pendingTasks = pendingTasksById.get(id);
    if (pendingTasks == null) {
        return Optional.empty();
    }
    assert !pendingTasks.isEmpty();
    final Runnable result = pendingTasks.removeFirst();
    if (pendingTasks.isEmpty()) {
        pendingTasksById.remove(id);
    }
    return Optional.of(result);
}

private boolean canBeLaunchedDirectly(I id) {
    return !isRunning(id) && pendingTasksById.get(id) == null;
}

private void executeTask(I id, Runnable task) {
    this.runningIds.add(id);
    executor.execute(() -> {
        try {
            task.run();
        } finally {
            actionQueue.addLast(() -> {
                runningIds.remove(id);
                ifPossibleLaunchPendingTaskForId(id);
            });
        }
    });
}

private void ifPossibleLaunchPendingTaskForId(I id) {
    if (isRunning(id)) {
        return;
    }
    getFirstPendingTask(id).ifPresent(r -> executeTask(id, r));
}

}

要使用它,您需要在单独的线程中启动它(或者您可以将其调整为更清洁的解决方案),如下所示:

    final Dispatcher<Path> dispatcher = new Dispatcher<>(Executors.newCachedThreadPool());
    new Thread(dispatcher).start();
    dispatcher.submitTask(path, task1);
    dispatcher.submitTask(path, task2);

这是一个基本示例,您可能需要保留线程,甚至更好地将所有线程封装在一个类中。

【讨论】:

    【解决方案3】:

    您所需要的只是一个演员的哈希映射,以文件路径为键。不同的actor将并行运行,具体的actor将按顺序处理任务。 您的解决方案是错误的,因为 Worker 类使用阻塞操作 take 但在有限的线程池中执行,这可能导致线程饥饿(一种死锁)。 Actor 在等待下一条消息时不会阻塞。

    import org.df4j.core.dataflow.ClassicActor;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.*;
    
    public class TasksOrderingExecutor {
    
    public static class Task implements Runnable {
        private final String path;
        private final String task;
    
        public Task(String path, String task) {
            this.path = path;
            this.task = task;
        }
    
        //Task code here
        String getPath() {
            return path;
        }
    
        @Override
        public void run() {
            System.out.println(path+"/"+task+" started");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
            }
            System.out.println(path+"/"+task+" stopped");
        }
    }
    
    static class Worker extends ClassicActor<Task> {
    
        @Override
        protected void runAction(Task task) throws Throwable {
            task.run();
        }
    }
    
    private final ExecutorService executorService;
    
    private final Map<String,Worker> workers = new HashMap<String,Worker>(){
        @Override
        public Worker get(Object key) {
            return super.computeIfAbsent((String) key, (k) -> {
                Worker res = new Worker();
                res.setExecutor(executorService);
                res.start();
                return res;
            });
        }
    };
    
    /**
     * @param queuesNr nr of concurrent task queues
     */
    public TasksOrderingExecutor(int queuesNr) {
        executorService = ForkJoinPool.commonPool();
    }
    
    public void submit(Task task) {
        Worker worker = getWorker(task);
        worker.onNext(task);
    }
    
    public void stop() throws InterruptedException {
        for (Worker w : workers.values()) {
            w.onComplete();
        }
        executorService.shutdown();
        executorService.awaitTermination(10, TimeUnit.SECONDS);
    }
    
    private Worker getWorker(Task task) {
        //check here if a runnig worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
        return workers.get(task.getPath());
    }
    
    public static void main(String[] args) throws InterruptedException {
        TasksOrderingExecutor orderingExecutor = new TasksOrderingExecutor(20);
        orderingExecutor.submit(new Task("path1", "task1"));
        orderingExecutor.submit(new Task("path1", "task2"));
        orderingExecutor.submit(new Task("path2", "task1"));
        orderingExecutor.submit(new Task("path3", "task1"));
        orderingExecutor.submit(new Task("path2", "task2"));
        orderingExecutor.stop();
    }
    }
    

    执行协议表明key相同的任务顺序执行,不同key的任务并行执行:

    path3/task1 started
    path2/task1 started
    path1/task1 started
    path3/task1 stopped
    path2/task1 stopped
    path1/task1 stopped
    path2/task2 started
    path1/task2 started
    path2/task2 stopped
    path1/task2 stopped
    

    我使用了自己的演员库DF4J,但可以使用任何其他演员库。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-03-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多