【发布时间】:2015-08-10 20:03:17
【问题描述】:
我从here 中了解到,Set 有几个不同的线程安全选项。在我的应用程序中,我有 10 个线程同时向一个集合添加内容(不必设置,但更好)。在所有线程完成后,我需要遍历集合。
我读到 ConcurrentSkipListSet 和 Collections.newSetFromMap(new ConcurrentHashMap()) 都有不一致的批处理操作(addAll、removeAll 等)和迭代器。我的实验也证实了这一点。当我使用 ConcurrentSkipListSet 时,在所有线程添加后,读数有点随机。我得到随机不同大小的集合。
然后我尝试了 Collections.synchronizedSet(new HashSet()),我认为它应该是线程安全的,因为它同时阻止了多个写访问。 然而,它似乎有同样的不一致的阅读问题。我仍然在结果集中随机得到不同的尺寸。
我应该怎么做才能确保读数一致?如前所述,我不必使用 Set。我可以使用 List 或其他,只要有办法避免重复添加
显示代码有点困难,因为它是一个非常大的包的一部分。但总的来说是这样的
public class MyRecursiveTask extends RecursiveTask<Integer> {
private List<String> tasks;
protected ConcurrentSkipListSet<String> dictionary;
public MyRecursiveTask(ConcurrentSkipListSet<String> dictionary,
List<String> tasks){
this.dictionary=dictionary;
this.tasks=tasks;
}
protected Integer compute() {
if (this.tasks.size() > 100) {
List<RecursiveFeatureExtractor> subtasks =
new ArrayList<>();
subtasks.addAll(createSubtasks());
int count=0;
for (MyRecursiveTask subtask : subtasks)
subtask.fork();
for (MyRecursiveTask subtask : subtasks)
count+=subtask.join();
return count;
} else {
int count=0;
for (File task: tasks) {
// code to process task
String outcome = [method to do some task]
dictionary.add(outcome);
count++;
}
return count;
}
}
private List<MyRecursiveTask> createSubtasks() {
List<MyRecursiveTask> subtasks =
new ArrayList<>();
int total = tasks.size() / 2;
List<File> tasks1= new ArrayList<>();
for (int i = 0; i < total; i++)
tasks1.add(tasks.get(i));
MyRecursiveTask subtask1 = new MyRecursiveTask(
dictionary, tasks1);
List<File> tasks2= new ArrayList<>();
for (int i = total; i < tasks.size(); i++)
tasks2.add(tasks.get(i));
MyRecursiveTask subtask2 = new MyRecursiveTask(
dictionary, tasks2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
}
然后是创建此类线程工作者列表的代码:
....
List<String> allTasks = new ArrayList<String>(100000);
....
//code to fill in "allTasks"
....
ConcurrentSkipListSet<String> dictionary = new ConcurrentSkipListSet<>();
//I also tried "dictionary = Collections.Collections.synchronizedSet(new
//HashSet<>())" and changed other bits of code accordingly.
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
MyRecursiveTask mrt = new MyRecursiveTask (dictionary,
);
int total= forkJoinPool.invoke(mrt);
System.out.println(dictionary.size()); //this value is a bit random. If real
//size should be 999, when I run the code once i may get 989; second i may
//get 999; third I may get 990 etc....
谢谢
【问题讨论】:
-
请给我们一段代码,显示您的问题,以及预期和实际输出。
-
您可能将数据竞争与一般竞争条件混淆了。并发类只会让你免于前者,但你的代码会受到后者的影响。
-
刚刚添加了一些代码。我怀疑比赛条件。并发集合上唯一的操作就是往里面添加东西,没有其他检查集合内的元素。
标签: java concurrency