【问题标题】:Concurrently merging many sorted arrays同时合并多个排序数组
【发布时间】:2013-05-06 19:26:52
【问题描述】:

我目前正在开发一个同时对字符串进行排序的程序。我的程序接收一个文件,将文件的每一行读入一个数组,然后将字符串数组拆分为更小的字符串数组。然后程序为每个较小的数组启动一个线程,并对它们进行快速排序。一旦每个线程完成对其数组的排序,主线程就会从线程对象中收集所有结果。然后应该将较小的、现已排序的数组合并为一个大的、已排序的数组。

我目前使用单线程合并排序将快速排序线程返回的排序数组嵌套在一起解决了这个问题。现在的问题是,由于合并不会同时发生,因此使用少量线程(1-4)对文件进行排序实际上使程序排序尽可能快。如果我稍微增加线程数(比如 15 个线程),程序实际上运行的速度比使用更少线程的速度要慢很多。为了解决这个问题,我希望在我的合并排序/数组嵌套中引入并发。

我希望做的是:一旦两个线程完成了对 in-file 的部分的快速排序,一个新线程会将这两个部分嵌套在一起,直到 in-file 的每个部分都已排序。

非常感谢您的每一点帮助,我感谢示例代码和/或伪代码。提前致谢! :)


对数组进行排序的当前代码:

public synchronized String[] sort(){
    String[] sortedWords = new String[words.length];
    SortingThread[] sts = new SortingThread[threads];

    for(int i = 0; i < threads; i++){
        sts[i] = new SortingThread(this, splitWords[i]);
    }

    for(SortingThread st : sts){
        st.start();
    }

    for(SortingThread st : sts){
        try {
            st.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    indexes = new int[sts.length];

    for(int i = 0; i < indexes.length; i++){
        indexes[i] = 0;
    }


//This is where my merge-sorting currently starts.

    ArrayList<String> toAddTo = new ArrayList<String>();

    while(!allIndexesHaveBeenRead(sts)){
        String globalMinimum = null;
        int globalMinThread = -1;
        currentIteration: for (int i = 0; i < sts.length; i++) {
            String current;
            try{
                current = sts[i].sorted[indexes[i]];
            } catch(Exception e){
                continue currentIteration;
            }
            try{
                if(globalMinimum == null){
                    globalMinimum = current;
                    globalMinThread = i;
                }
                else if(current.compareTo(globalMinimum) < 0){
                    globalMinimum = current;
                    globalMinThread = i;
                }
            } catch (NullPointerException e){
                continue;
            }
        }
        toAddTo.add(globalMinimum);
        indexes[globalMinThread]++;
    }

    sortedWords = toAddTo.toArray(sortedWords);

    int len = 0;
    for (int i = 0; i < sortedWords.length; i++) {
        if(sortedWords[i] != null){
            len++;
        }
    }

    String[] toReturn = new String[len];

    for (int i = 0; i < toReturn.length; i++) {
        toReturn[i] = sortedWords[i];
    }

    return toReturn;
}

【问题讨论】:

  • 您是否将其与纯顺序算法进行了比较?到目前为止,文件读取不是该过程中效率最低的部分吗?数组的大小是多少?你有多少个处理器内核?无论如何,如果您需要帮助,您应该发布您的代码,以便我们提出改进建议。
  • 谢谢@JBNizet。我试图排序的文件包含 267000 个单词。对于较小的文件,顺序算法可能会更好,但是对于这么大的文件,我发现递归+并发是要走的路。代码示例传入(编辑 OP)。
  • 你有几个核心?除非您有 16 个内核,否则我希望更少的线程会更快。我建议尝试逻辑 CPU 的数量或您拥有的核心数量,因为超过这个数量可能会带来更多的开销而不是收益。

标签: java arrays concurrency merge


【解决方案1】:

你的问题场景是这样的

  • 一个主线程需要完成N个任务
  • 它从一个池中生成 M 个线程并处理 N 个任务
  • 它等待至少一个线程完成任务并对结果进行处理
  • 继续处理结果,直到完成所有 N 个任务

Java 5 中的 CompletionService,完全符合要求,

这是您的问题陈述的解决方案,

 public class Sorter implements Callable<List<String>> {

    private List<String> data;

public Sorter(List<String> input) {
    data = input;
}

@Override
public List<String> call() throws Exception {
    Collections.sort(data);
    return data;
}

 }

在主类中,

  CompletionService service = new  ExecutorCompletionService(Executors.newFixedThreadPool(5));

    List<String> result = new ArrayList<String>();

    String readline = null;
    Callable<List<String>> sorter = null;
    String[] words = null;
    int noOfRunningFutures = 0;

     while ((readline = br.readLine()) != null) {
        words = readline.split(" ");
        List<String> input = Arrays.asList(words);
        sorter = new Sorter(input);

        service.submit(sorter);

        // add them to the number of futures which I am creating - to keep track of the Queue length
        noOfRunningFutures ++;
    }


    while (noOfRunningFutures > 0) 
    {
        try {

            // this is a blocking call - whenever there is a worker which is already completed
            // then it is fetched from the Queue                 
            Future<List<String>> completed = service.take();
            noOfRunningFutures --;

            // get the value from computed from the Future
            List<String> sorted =  completed.get();

            result.addAll(sorted);

            Collections.sort(result);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

希望对你有所帮助。

【讨论】:

    【解决方案2】:

    我在 SourceForge 上管理一个 fork-join 项目 TymeacDSE,它完全符合您的要求。它对子集进行排序,然后将子集的组合并到一个最终数组中。看看here

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-11-20
      • 2018-04-19
      • 1970-01-01
      • 1970-01-01
      • 2020-05-24
      • 2011-02-13
      • 1970-01-01
      • 2014-11-10
      相关资源
      最近更新 更多