【问题标题】:Java ExecutorService - scalingJava ExecutorService - 缩放
【发布时间】:2011-11-29 07:50:48
【问题描述】:

我正在尝试使用ExecutorService 及其函数invokeAll 编写Java 程序。我的问题是:invokeAll 函数是否同时解决任务?我的意思是,如果我有两个处理器,会同时有两个工人吗?因为我无法使其正确缩放。如果我给newFixedThreadPool(2)或1,完成问题需要相同的时间。

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
    tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);

Map 是实现Callable 的类,wp 是部分解的向量,该类保存不同时间的一些信息。

为什么不能扩展?可能是什么问题?

这是 PartialSolution 的代码:

import java.util.HashMap;
import java.util.Vector;

public class PartialSolution 
{
    public String fileName;//the name of a file
    public int b, e;//the index of begin and end of the fragment from the file
    public String info;//the fragment
    public HashMap<String, Word> hm;//here i retain the informations
    public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce

    public PartialSolution(String name, int b, int e, String i, boolean ok)
    {
        this.fileName = name;
        this.b = b;
        this.e = e;
        this.info = i;
        hm = new HashMap<String, Word>();
        if(ok == true)
        {
            hmt = new HashMap<String, Vector<Word>>();
        }
        else
        {
             hmt = null;
        }    
    }
}

这是地图的代码:

public class Map implements Callable<PartialSolution>
{
    private PartialSolution ps;
    private Vector<String> keyWords;

    public Map(PartialSolution p, Vector<String> kw)
    {
        this.ps = p;
        this.keyWords = kw;
    }

    @Override
    public PartialSolution call() throws Exception 
    {
        String[] st = this.ps.info.split("\\n");
        for(int j = 0 ; j < st.length ; j++)
        {
            for(int i = 0 ; i < keyWords.size() ; i++)
            {
                if(keyWords.elementAt(i).charAt(0) != '\'')
                {
                    int k = 0;
                    int index = 0;
                    int count = 0;

                    while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1)
                    {
                        k = index + keyWords.elementAt(i).length();
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                } 
                else
                {
                    String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1);
                    StringBuffer sb = new StringBuffer(regex);
                    regex = sb.toString();
                    Pattern pt = Pattern.compile(regex);
                    Matcher m = pt.matcher(st[j]);
                    int count = 0;
                    while(m.find())
                    {
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                }
            }
        }
        this.ps.info = null;
        return this.ps;
    }
}

所以在地图中,我从片段中取出每一行并搜索每个表达式的出现次数,我还保存了行数。在处理完所有片段后,在同一个 PartialSolution 中,我将信息保存在哈希图中并返回新的 PartialSolution。在下一步中,我将 PartialSolutions 与相同的 fileName 结合起来,并将它们引入一个 Callable 类 Reduce,它与 map 相同,不同之处在于它进行其他操作,但也返回一个 PartialSolution。

这是运行地图任务的代码:

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
   tasks.add(new Map(ps, keyWords));
}    
list = executor.invokeAll(tasks);

在任务中,我创建 Map 类型的任务,并在列表中获取它们。我不知道如何阅读 JVM 线程转储。我希望我给你的信息足够好。如果有帮助,我在 NetBeans 7.0.1 中工作。

谢谢你, 亚历克斯

【问题讨论】:

  • 你有多少任务?他们做什么?是否有很多 I/O?
  • 我的任务是那些使用 PartialSolution 的可调用类,它们有一些文本并计算一个单词出现该文本和行的次数。 PartialSolution 实际上是文本的一部分,我想为每个部分获取这些信息,然后将它们与另一个名为 Reduce 的 Callable 类结合起来。我想同时处理这些部分。取决于我拥有的处理器数量。 I/O 将在最后,届时我将统一所有任务和 10 个部分,并且将只有一个包含有关该文档的所有信息。 Google 使用的是 MapReduce。
  • 我想知道的是invokeAll方法,如果我用10个线程创建了ExcutorService,会同时解决10个任务还是一次解决一个?在 Map 中,我有一个构造函数,并且我实现了返回另一个 PartialSolution 的函数 call() ,但这次使用了正确的信息。还有一个问题,如果我说 list.get(i).get() 这将在解决之后返回 PartialSolution 吗?我真的不明白如果我使用 2 个线程而不是 1 个线程,为什么时间没有改善。为什么它不能正确扩展?
  • 你可以使用homework 标签。 (也希望没人抄你的代码)

标签: java multithreading executorservice threadpoolexecutor forkjoinpool


【解决方案1】:

我想知道的是invokeAll方法,如果我用10个线程创建了ExcutorService,是同时解决10个任务还是一次解决一个?

如果您将十个任务提交给具有十个线程的 ExecutorService,它将同时运行它们。他们是否可以完全并行并相互独立,取决于他们在做什么。但他们每个人都有自己的主题。

还有一个问题,如果我说 list.get(i).get() 这将在解决后返回 PartialSolution?

是的,它将阻塞直到计算完成(如果尚未完成)并返回其结果。

我真的不明白如果我使用 2 个线程而不是 1 个线程,为什么时间没有改善。

我们需要查看更多代码。他们是否在某些共享数据上同步?这些任务需要多长时间?如果它们很短,您可能不会注意到任何差异。如果它们需要更长的时间,请查看 JVM 线程转储以验证它们是否都在运行。

【讨论】:

  • +1。但是有一个错误:invokeAll 返回已完成期货的列表。换句话说:它只在所有任务完成后才返回。
【解决方案2】:

如果你用两个线程创建线程池,那么两个任务会同时运行。

我看到有两件事可能导致两个线程与一个线程花费相同的时间。

如果只有一个 Map 任务占用了您的大部分时间,那么额外的线程不会使该任务运行得更快。它不能比最慢的工作更快地完成。

另一种可能性是您的地图任务经常从共享向量中读取。这可能会导致足够的争用来抵消拥有两个线程的收益。

你应该在 jvisualvm 中打开它,看看每个线程在做什么。

【讨论】:

  • 我已经安装了 VisualVM 但我不知道如何使用它,我的意思是我不知道要查看,如何读取数据。请帮忙。
  • 我已经做了这个步骤:Profiler -> CPU -> 右键单击​​然后线程转储...但是我什么都不懂。
  • @StanciuAlexandru-Marian 我建议使用 ThreadFactory 将您的线程命名为有意义的名称。然后在线程列表中找到线程。然后检查代码运行时每个线程的状态如何变化。这将告诉您每个线程正在执行多少工作。如果有一个线程正在等待,您可以进行线程转储以查看它在等待什么。
  • 我已经解决了我的问题。这是我电脑的错。虽然我有 Intel 2 Duo Core,但它似乎工作得非常糟糕。我不知道为什么,什么时候发生的。我在我的教师集群上进行了测试,它运行得非常快并且可以扩展。感谢您的所有帮助,希望我没有给您带来太多麻烦。
【解决方案3】:

Java 8 在Executors - newWorkStealingPool 中引入了另外一种 API 来创建工作窃取池。您不必创建RecursiveTaskRecursiveAction,但仍然可以使用ForkJoinPool

public static ExecutorService newWorkStealingPool()

使用所有可用处理器作为目标并行级别创建一个工作窃取线程池。

默认情况下,它将以 CPU 内核数作为并行参数。如果你有核心 CPU,你可以有 8 个线程来处理工作任务队列。

Work stealing of idle worker threads from busy worker threads improves overall performance. Since task queue is unbounded in nature, this ForkJoinPool is recommended for the tasks executing in short time intervals.

ExecutorServiceForkJoinPoolThreadPoolExecutor 如果您没有共享数据和共享锁定(同步)和线程间通信,性能会很好。如果任务队列中的所有任务相互独立,性能会有所提高。

ThreadPoolExecutor构造函数来自定义和控制任务的工作流程:

 ThreadPoolExecutor(int corePoolSize, 
                       int maximumPoolSize, 
                       long keepAliveTime, 
                       TimeUnit unit, 
                       BlockingQueue<Runnable> workQueue, 
                       ThreadFactory threadFactory,
                       RejectedExecutionHandler handler)

查看相关的 SE 问题:

How to properly use Java Executor?

Java's Fork/Join vs ExecutorService - when to use which?

【讨论】:

    猜你喜欢
    • 2016-05-07
    • 2014-11-15
    • 2010-09-29
    • 1970-01-01
    • 1970-01-01
    • 2012-07-31
    • 2015-02-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多