【发布时间】:2011-11-29 07:50:48
【问题描述】:
我正在尝试使用ExecutorService 及其函数invokeAll 编写Java 程序。我的问题是:invokeAll 函数是否同时解决任务?我的意思是,如果我有两个处理器,会同时有两个工人吗?因为我无法使其正确缩放。如果我给newFixedThreadPool(2)或1,完成问题需要相同的时间。
List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);
Map 是实现Callable 的类,wp 是部分解的向量,该类保存不同时间的一些信息。
为什么不能扩展?可能是什么问题?
这是 PartialSolution 的代码:
import java.util.HashMap;
import java.util.Vector;
public class PartialSolution
{
public String fileName;//the name of a file
public int b, e;//the index of begin and end of the fragment from the file
public String info;//the fragment
public HashMap<String, Word> hm;//here i retain the informations
public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce
public PartialSolution(String name, int b, int e, String i, boolean ok)
{
this.fileName = name;
this.b = b;
this.e = e;
this.info = i;
hm = new HashMap<String, Word>();
if(ok == true)
{
hmt = new HashMap<String, Vector<Word>>();
}
else
{
hmt = null;
}
}
}
这是地图的代码:
public class Map implements Callable<PartialSolution>
{
private PartialSolution ps;
private Vector<String> keyWords;
public Map(PartialSolution p, Vector<String> kw)
{
this.ps = p;
this.keyWords = kw;
}
@Override
public PartialSolution call() throws Exception
{
String[] st = this.ps.info.split("\\n");
for(int j = 0 ; j < st.length ; j++)
{
for(int i = 0 ; i < keyWords.size() ; i++)
{
if(keyWords.elementAt(i).charAt(0) != '\'')
{
int k = 0;
int index = 0;
int count = 0;
while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1)
{
k = index + keyWords.elementAt(i).length();
count++;
}
if(count != 0)
{
Word wr = this.ps.hm.get(keyWords.elementAt(i));
if(wr != null)
{
Word nw = new Word(ps.fileName);
nw.nrap = wr.nrap + count;
nw.lines = wr.lines;
int grep = count;
while(grep > 0)
{
nw.lines.addElement(ps.b + j);
grep--;
}
this.ps.hm.put(keyWords.elementAt(i), nw);
}
else
{
Word nw = new Word(ps.fileName);
nw.nrap = count;
int grep = count;
while(grep > 0)
{
nw.lines.addElement(ps.b + j);
grep--;
}
this.ps.hm.put(keyWords.elementAt(i), nw);
}
}
}
else
{
String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1);
StringBuffer sb = new StringBuffer(regex);
regex = sb.toString();
Pattern pt = Pattern.compile(regex);
Matcher m = pt.matcher(st[j]);
int count = 0;
while(m.find())
{
count++;
}
if(count != 0)
{
Word wr = this.ps.hm.get(keyWords.elementAt(i));
if(wr != null)
{
Word nw = new Word(this.ps.fileName);
nw.nrap = wr.nrap + count;
nw.lines = wr.lines;
int grep = count;
while(grep > 0)
{
nw.lines.addElement(ps.b + j);
grep--;
}
this.ps.hm.put(keyWords.elementAt(i), nw);
}
else
{
Word nw = new Word(this.ps.fileName);
nw.nrap = count;
int grep = count;
while(grep > 0)
{
nw.lines.addElement(ps.b + j);
grep--;
}
this.ps.hm.put(keyWords.elementAt(i), nw);
}
}
}
}
}
this.ps.info = null;
return this.ps;
}
}
所以在地图中,我从片段中取出每一行并搜索每个表达式的出现次数,我还保存了行数。在处理完所有片段后,在同一个 PartialSolution 中,我将信息保存在哈希图中并返回新的 PartialSolution。在下一步中,我将 PartialSolutions 与相同的 fileName 结合起来,并将它们引入一个 Callable 类 Reduce,它与 map 相同,不同之处在于它进行其他操作,但也返回一个 PartialSolution。
这是运行地图任务的代码:
List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);
在任务中,我创建 Map 类型的任务,并在列表中获取它们。我不知道如何阅读 JVM 线程转储。我希望我给你的信息足够好。如果有帮助,我在 NetBeans 7.0.1 中工作。
谢谢你, 亚历克斯
【问题讨论】:
-
你有多少任务?他们做什么?是否有很多 I/O?
-
我的任务是那些使用 PartialSolution 的可调用类,它们有一些文本并计算一个单词出现该文本和行的次数。 PartialSolution 实际上是文本的一部分,我想为每个部分获取这些信息,然后将它们与另一个名为 Reduce 的 Callable 类结合起来。我想同时处理这些部分。取决于我拥有的处理器数量。 I/O 将在最后,届时我将统一所有任务和 10 个部分,并且将只有一个包含有关该文档的所有信息。 Google 使用的是 MapReduce。
-
我想知道的是invokeAll方法,如果我用10个线程创建了ExcutorService,会同时解决10个任务还是一次解决一个?在 Map 中,我有一个构造函数,并且我实现了返回另一个 PartialSolution 的函数 call() ,但这次使用了正确的信息。还有一个问题,如果我说 list.get(i).get() 这将在解决之后返回 PartialSolution 吗?我真的不明白如果我使用 2 个线程而不是 1 个线程,为什么时间没有改善。为什么它不能正确扩展?
-
你可以使用
homework标签。 (也希望没人抄你的代码)
标签: java multithreading executorservice threadpoolexecutor forkjoinpool