【问题标题】:Multithreaded quicksort or mergesort多线程快速排序或归并排序
【发布时间】:2026-01-14 20:30:02
【问题描述】:

如何为 Java 实现并发快速排序或合并排序算法?

我们在一台 16 核(虚拟)内核的 Mac 上遇到了问题,其中只有一个内核 (!) 使用默认的 Java 排序算法工作,而且看到这台非常精细的机器完全没有得到充分利用并不好.所以我们写了自己的(我写的),我们确实获得了很好的加速(我写了一个多线程快速排序,由于它的分区性质,它可以很好地并行化,但我也可以写一个合并排序)......但我的实现只能扩展最多 4 个线程,它是专有代码,我宁愿使用来自信誉良好的来源的线程,而不是使用我重新发明的*。

我在网上找到的唯一一个例子是如何在 Java 中编写多线程快速排序,它是忙循环(这真的很糟糕),使用:

while (helpRequested) { }

http://broadcast.oreilly.com/2009/06/may-column-multithreaded-algor.html

因此,除了无缘无故失去一个线程之外,它还确保通过在该 while 循环中忙循环来杀死性能(这令人难以置信)。

因此我的问题是:您是否知道 Java 中任何正确的多线程快速排序或合并排序实现都来自有信誉的来源?

我强调我知道复杂性保持在 O(n log n) 的事实,但我仍然非常高兴看到所有这些内核开始工作而不是闲置。请注意,对于其他任务,在相同的 16 个虚拟内核 Mac 上,我通过并行化代码看到了高达 x7 的加速(而且我绝不是并发专家)。

即使复杂度保持在 O(n log n),我也非常感谢 x7 或 x8 甚至 x16 的加速。

【问题讨论】:

  • 理想情况下它是可配置的:您可以将希望允许的最小/最大线程数传递给多线程排序。
  • 您真的需要多线程版本的快速排序吗?如果您要使用的线程数为 k,请快速划分为 k 个数组(选择 k-1 个枢轴)并独立调用您需要的任何排序。
  • @Moron:但是独立排序的分区不是必须合并吗?
  • 我的意思是,您不需要任何“多线程快速排序,它可以在同一个数组上并行工作并且可以根据线程数进行配置”。我的意思是,您只需要一个在一个数组上的一个线程上工作的快速排序,而不考虑多线程,即快速排序的任何常见实现都可以工作。所以代码看起来像:1)分区。 2) 创建线程 3) 在相关子数组的每个线程上运行快速排序。
  • @Moron:哦,我想我现在明白你的意思了!没有并发的分区,然后独立地同时对分区进行排序...感谢您的解释:-)

标签: java multithreading sorting quicksort mergesort


【解决方案1】:

试试fork/join framework by Doug Lea:

public class MergeSort extends RecursiveAction {
    final int[] numbers;
    final int startPos, endPos;
    final int[] result;

    private void merge(MergeSort left, MergeSort right) {
        int i=0, leftPos=0, rightPos=0, leftSize = left.size(), rightSize = right.size();
        while (leftPos < leftSize && rightPos < rightSize)
            result[i++] = (left.result[leftPos] <= right.result[rightPos])
                ? left.result[leftPos++]
                : right.result[rightPos++];
        while (leftPos < leftSize)
            result[i++] = left.result[leftPos++];
        while (rightPos < rightSize)
        result[i++] = right.result[rightPos++];
    }

    public int size() {
        return endPos-startPos;
    }

    protected void compute() {
        if (size() < SEQUENTIAL_THRESHOLD) {
            System.arraycopy(numbers, startPos, result, 0, size());
            Arrays.sort(result, 0, size());
        } else {
            int midpoint = size() / 2;
            MergeSort left = new MergeSort(numbers, startPos, startPos+midpoint);
            MergeSort right = new MergeSort(numbers, startPos+midpoint, endPos);
            coInvoke(left, right);
            merge(left, right);
        }
    }
}

(来源:http://www.ibm.com/developerworks/java/library/j-jtp03048.html?S_TACT=105AGX01&S_CMP=LP

【讨论】:

  • @dfa:+1,一篇我不知道的精彩论文和一篇很棒的文章,太棒了!
【解决方案2】:

Java 8 提供了java.util.Arrays.parallelSort,它使用 fork-join 框架对数组进行并行排序。该文档提供了有关当前实现的一些详细信息(但这些是非规范性说明):

排序算法是一种并行排序合并,它将数组分解为子数组,这些子数组本身已排序然后合并。当子数组长度达到最小粒度时,使用适当的 Arrays.sort 方法对子数组进行排序。如果指定数组的长度小于最小粒度,则使用适当的 Arrays.sort 方法对其进行排序。该算法需要一个不大于原始数组大小的工作空间。 ForkJoin 公共池用于执行任何并行任务。

列表似乎没有相应的并行排序方法(尽管RandomAccess 列表应该可以很好地进行排序),因此您需要使用toArray,对该数组进行排序,然后将结果存储回来进入列表。 (我问过一个关于这个here的问题。)

【讨论】:

    【解决方案3】:

    对此很抱歉,但您要求的内容是不可能的。我相信其他人提到排序是 IO 绑定的,它们很可能是正确的。 Doug Lea 的 IBM 代码是一个不错的作品,但我相信它主要是作为如何编写代码的示例。如果您在他的文章中注意到,他从未发布过它的基准,而是发布了其他工作代码的基准,例如计算平均值和并行查找最小最大值。如果您使用通用合并排序、快速排序、使用 Join Fork Pool 的 Dougs 合并排序以及我使用快速排序 Join Fork Pool 编写的基准,以下是基准。您会看到合并排序最适合 N 为 100 或更少。 1000 到 10000 的快速排序,如果您有 100000 或更高,则使用加入分叉池的快速排序会击败其余的。这些测试是随机数数组运行 30 次以创建每个数据点的平均值,并且在具有大约 2 gigs ram 的四核上运行。下面我有快速排序的代码。这主要表明,除非您尝试对非常大的数组进行排序,否则您应该放弃尝试改进代码排序算法,因为并行算法在小 N 上运行非常慢。

    Merge Sort
    10  7.51E-06
    100 1.34E-04
    1000    0.003286269
    10000   0.023988694
    100000  0.022994328
    1000000 0.329776132
    
    
    Quick Sort
    5.13E-05
    1.60E-04
    7.20E-04
    9.61E-04
    0.01949271
    0.32528383
    
    
    Merge TP
    1.87E-04
    6.41E-04
    0.003704411
    0.014830678
    0.019474009
    0.19581768
    
    Quick TP
    2.28E-04
    4.40E-04
    0.002716065
    0.003115251
    0.014046681
    0.157845389
    
    import jsr166y.ForkJoinPool;
    import jsr166y.RecursiveAction;
    
    //  derived from
    //  http://www.cs.princeton.edu/introcs/42sort/QuickSort.java.html
    //  Copyright © 2007, Robert Sedgewick and Kevin Wayne.
    //  Modified for Join Fork by me hastily. 
    public class QuickSort {
    
        Comparable array[];
        static int limiter = 10000;
    
        public QuickSort(Comparable array[]) {
            this.array = array;
        }
    
        public void sort(ForkJoinPool pool) {
            RecursiveAction start = new Partition(0, array.length - 1);        
            pool.invoke(start);
        }
    
        class Partition extends RecursiveAction {
    
            int left;
            int right;
    
            Partition(int left, int right) {
                this.left = left;
                this.right = right;
            }
    
            public int size() {
                return right - left;
            }
    
            @SuppressWarnings("empty-statement")
            //void partitionTask(int left, int right) {
            protected void compute() {
                int i = left, j = right;
                Comparable tmp;
                Comparable pivot = array[(left + right) / 2];
    
                while (i <= j) {
                    while (array[i].compareTo(pivot) < 0) {
                        i++;
                    }
                    while (array[j].compareTo(pivot) > 0) {
                        j--;
                    }
    
                    if (i <= j) {
                        tmp = array[i];
                        array[i] = array[j];
                        array[j] = tmp;
                        i++;
                        j--;
                    }
                }
    
    
                Partition leftTask = null;
                Partition rightTask = null;
    
                if (left < i - 1) {
                    leftTask = new Partition(left, i - 1);
                }
                if (i < right) {
                    rightTask = new Partition(i, right);
                }
    
                if (size() > limiter) {
                    if (leftTask != null && rightTask != null) {
                        invokeAll(leftTask, rightTask);
                    } else if (leftTask != null) {
                        invokeAll(leftTask);
                    } else if (rightTask != null) {
                        invokeAll(rightTask);
                    }
                }else{
                    if (leftTask != null) {
                        leftTask.compute();
                    }
                    if (rightTask != null) {
                        rightTask.compute();
                    }
                }
            }
        }
    }
    

    【讨论】:

    • 这是可能的(假设 CPU 绑定问题和足够的核心/硬件线程用于亲和力):-)(我更正了否决票)。之所以可能是因为排序 canshould 将当前操作“大小”考虑在内,以决定是否应该实际发生并行操作。这类似于在叶子附近切换到“简单排序”。应该通过剖析和分析来收集应该发生转换的确切尺寸。
    【解决方案4】:

    刚刚编写了上面的 MergeSort,性能很差。

    代码块引用“coInvoke(left, right);”但是没有引用 this 并用 invokeAll(left, right); 替换它

    测试代码是:

    MergeSort mysort = new MyMergeSort(array,0,array.length);
    ForkJoinPool threadPool = new ForkJoinPool();
    threadPool.invoke(mysort);
    

    但由于性能不佳不得不停止。

    我看到上面的文章已经快一年了,也许现在情况已经改变了。

    我发现替代文章中的代码可以工作:http://blog.quibb.org/2010/03/jsr-166-the-java-forkjoin-framework/

    【讨论】:

      【解决方案5】:

      您可能确实考虑过这一点,但它可能有助于从更高的层次看待具体问题,例如,如果您不只对一个数组或列表进行排序,那么使用传统方法同时对单个集合进行排序可能会容易得多算法而不是尝试同时对单个集合进行排序。

      【讨论】:

        【解决方案6】:

        过去几天我自己也遇到了多线程排序问题。正如on this caltech slide 解释的那样,您可以通过简单地对分而治之方法的每个步骤进行多线程处理,而明显的线程数量(划分数量)是有限的。我猜这是因为虽然您可以使用机器的所有 64 个内核在 64 个线程上运行 64 个分区,但 4 个分区只能在 4 个线程上运行,2 对 2,1 对 1 等等。所以对于许多级别您的机器未充分利用的递归。

        昨晚我想到了一个可能对我自己的工作有用的解决方案,所以我会在这里发布。

        如果,您的排序函数的第一个标准是基于最大大小 s 的整数,无论​​是实际整数还是字符串中的字符,这样该整数或字符就完全定义了排序的*别,然后我认为有一个非常快速(且简单)的解决方案。只需使用该初始整数将您的排序问题划分为 s 个较小的排序问题,然后使用您选择的标准单线程排序算法对这些问题进行排序。我认为,可以一次性完成对 s 类的划分。 s 独立排序后没有合并问题,因为你已经知道 1 类中的所有内容都在 2 类之前排序,以此类推。

        示例:如果您希望基于 strcmp() 进行排序,则使用字符串中的第一个字符将数据分成 256 个类,然后在下一个可用线程上对每个类进行排序,直到它们全部完成。

        这种方法充分利用了所有可用的内核,直到问题解决,我认为它很容易实现。虽然我还没有实现它,所以它可能有我还没有找到的问题。它显然不适用于浮点排序,并且对于大 s 效率低下。它的性能还很大程度上取决于用于定义类的整数/字符的熵。

        这可能是 F* Steeg 的建议,但我明确表示,在某些情况下,您可以从较大的排序创建多个较小的排序。

        【讨论】:

          【解决方案7】:
          import java.util.Arrays;
          import java.util.concurrent.ForkJoinPool;
          import java.util.concurrent.RecursiveTask;
          
          public class IQ1 {
              public static void main(String[] args) {
                  // Get number of available processors
                  int numberOfProcessors = Runtime.getRuntime().availableProcessors();
                  System.out.println("Number of processors : " + numberOfProcessors);
                  // Input data, it can be anything e.g. log records, file records etc
                  long[][] input = new long[][]{
                        { 5, 8, 9, 14, 20 },
                        { 17, 56, 59, 80, 102 },
                        { 2, 4, 7, 11, 15 },
                        { 34, 37, 39, 45, 50 }
                      };
          
                  /* A special thread pool designed to work with fork-and-join task splitting
                   * The pool size is going to be based on number of cores available 
                   */
                  ForkJoinPool pool = new ForkJoinPool(numberOfProcessors);
                  long[] result = pool.invoke(new Merger(input,  0, input.length));
          
                  System.out.println(Arrays.toString(result));
              }
              /* Recursive task which returns the result
               * An instance of this will be used by the ForkJoinPool to start working on the problem
               * Each thread from the pool will call the compute and the problem size will reduce in each call
               */
              static class Merger extends RecursiveTask<long[]>{
                  long[][] input;
                  int low;
                  int high;
          
                  Merger(long[][] input, int low, int high){
                      this.input = input;
                      this.low = low;
                      this.high = high;
                  }
          
                  @Override
                  protected long[] compute() {            
                      long[] result = merge();
                      return result;
                  }
          
                  // Merge
                  private long[] merge(){
                      long[] result = new long[input.length * input[0].length];
                      int i=0;
                      int j=0;
                      int k=0;
                      if(high - low < 2){
                          return input[0];
                      }
                      // base case
                      if(high - low == 2){
                          long[] a = input[low];
                          long[] b = input[high-1];
                          result = mergeTwoSortedArrays(a, b);
                      }
                      else{
                          // divide the problem into smaller problems
                          int mid = low + (high - low) / 2;
                          Merger first = new Merger(input, low, mid);
                          Merger second = new Merger(input, mid, high);
                          first.fork();
                          long[] secondResult = second.compute();
                          long[] firstResult = first.join();
          
                          result = mergeTwoSortedArrays(firstResult, secondResult);
                      }
          
                      return result;
                  }
          
                  // method to merge two sorted arrays
                  private long[] mergeTwoSortedArrays(long[] a, long[] b){
                      long[] result = new long[a.length + b.length];
                      int i=0;
                      int j=0;
                      int k=0;
                          while(i<a.length && j<b.length){
                              if(a[i] < b[j]){
                                  result[k] = a[i];
                                  i++;
                              } else{
                                  result[k] = b[j];
                                  j++;
                              }
                              k++;
                          }
          
                          while(i<a.length){
                              result[k] = a[i];
                              i++;
                              k++;
                          }
          
                          while(j<b.length){
                              result[k] = b[j];
                              j++;
                              k++;
                          }
          
                  return result;
              }
              }
          }
          

          【讨论】:

            【解决方案8】:

            对于合并排序,最方便的多线程范式是 fork-join 范式。这是从 Java 8 及更高版本提供的。以下代码演示了使用 fork-join 的合并排序。

            import java.util.*;
            import java.util.concurrent.*;
            
            public class MergeSort<N extends Comparable<N>> extends RecursiveTask<List<N>> {
                private List<N> elements;
            
                public MergeSort(List<N> elements) {
                    this.elements = new ArrayList<>(elements);
                }
            
                @Override
                protected List<N> compute() {
                    if(this.elements.size() <= 1)
                        return this.elements;
                    else {
                        final int pivot = this.elements.size() / 2;
                        MergeSort<N> leftTask = new MergeSort<N>(this.elements.subList(0, pivot));
                        MergeSort<N> rightTask = new MergeSort<N>(this.elements.subList(pivot, this.elements.size()));
            
                        leftTask.fork();
                        rightTask.fork();
            
                        List<N> left = leftTask.join();
                        List<N> right = rightTask.join();
            
                        return merge(left, right);
                    }
                }
            
                private List<N> merge(List<N> left, List<N> right) {
                    List<N> sorted = new ArrayList<>();
                    while(!left.isEmpty() || !right.isEmpty()) {
                        if(left.isEmpty())
                            sorted.add(right.remove(0));
                        else if(right.isEmpty())
                            sorted.add(left.remove(0));
                        else {
                            if( left.get(0).compareTo(right.get(0)) < 0 )
                                sorted.add(left.remove(0));
                            else
                                sorted.add(right.remove(0));
                        }
                    }
            
                    return sorted;
                }
            
                public static void main(String[] args) {
                    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
                    List<Integer> result = forkJoinPool.invoke(new MergeSort<Integer>(Arrays.asList(7,2,9,10,1)));
                    System.out.println("result: " + result);
                }
            }
            

            虽然不太直接,但以下代码变体消除了 ArrayList 的过度复制。最初的未排序列表只创建一次,对 sublist 的调用不需要自己执行任何复制。在我们每次算法分叉时复制数组列表之前。此外,现在,当合并列表而不是创建一个新列表并在其中复制值时,我们每次重用左侧列表并将我们的值插入其中。通过避免额外的复制步骤,我们提高了性能。我们在这里使用 LinkedList,因为与 ArrayList 相比,插入相当便宜。我们还消除了对 remove 的调用,这在 ArrayList 上也可能很昂贵。

            import java.util.*;
            import java.util.concurrent.*;
            
            public class MergeSort<N extends Comparable<N>> extends RecursiveTask<List<N>> {
                private List<N> elements;
            
                public MergeSort(List<N> elements) {
                    this.elements = elements;
                }
            
                @Override
                protected List<N> compute() {
                    if(this.elements.size() <= 1)
                        return new LinkedList<>(this.elements);
                    else {
                        final int pivot = this.elements.size() / 2;
                        MergeSort<N> leftTask = new MergeSort<N>(this.elements.subList(0, pivot));
                        MergeSort<N> rightTask = new MergeSort<N>(this.elements.subList(pivot, this.elements.size()));
            
                        leftTask.fork();
                        rightTask.fork();
            
                        List<N> left = leftTask.join();
                        List<N> right = rightTask.join();
            
                        return merge(left, right);
                    }
                }
            
                private List<N> merge(List<N> left, List<N> right) {
                    int leftIndex = 0;
                    int rightIndex = 0;
                    while(leftIndex < left.size() || rightIndex < right.size()) {
                        if(leftIndex >= left.size())
                            left.add(leftIndex++, right.get(rightIndex++));
                        else if(rightIndex >= right.size())
                            return left;
                        else {
                            if( left.get(leftIndex).compareTo(right.get(rightIndex)) < 0 )
                                leftIndex++;
                            else
                                left.add(leftIndex++, right.get(rightIndex++));
                        }
                    }
            
                    return left;
                }
            
                public static void main(String[] args) {
                    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
                    List<Integer> result = forkJoinPool.invoke(new MergeSort<Integer>(Arrays.asList(7,2,9,-7,777777,10,1)));
                    System.out.println("result: " + result);
                }
            }
            

            我们还可以通过使用迭代器而不是在执行合并时直接调用 get 来进一步改进代码。其原因是通过索引获取 LinkedList 的时间性能较差(线性),因此通过使用迭代器,我们消除了在每次获取时内部迭代链表所导致的减速。迭代器上对 next 的调用是常数时间,而不是调用获取的线性时间。以下代码被修改为使用迭代器。

            import java.util.*;
            import java.util.concurrent.*;
            
            public class MergeSort<N extends Comparable<N>> extends RecursiveTask<List<N>> {
                private List<N> elements;
            
                public MergeSort(List<N> elements) {
                    this.elements = elements;
                }
            
                @Override
                protected List<N> compute() {
                    if(this.elements.size() <= 1)
                        return new LinkedList<>(this.elements);
                    else {
                        final int pivot = this.elements.size() / 2;
                        MergeSort<N> leftTask = new MergeSort<N>(this.elements.subList(0, pivot));
                        MergeSort<N> rightTask = new MergeSort<N>(this.elements.subList(pivot, this.elements.size()));
            
                        leftTask.fork();
                        rightTask.fork();
            
                        List<N> left = leftTask.join();
                        List<N> right = rightTask.join();
            
                        return merge(left, right);
                    }
                }
            
                private List<N> merge(List<N> left, List<N> right) {
                    ListIterator<N> leftIter = left.listIterator();
                    ListIterator<N> rightIter = right.listIterator();
                    while(leftIter.hasNext() || rightIter.hasNext()) {
                        if(!leftIter.hasNext()) {
                            leftIter.add(rightIter.next());
                            rightIter.remove();
                        }
                        else if(!rightIter.hasNext())
                            return left;
                        else {
                            N rightElement = rightIter.next();
                            if( leftIter.next().compareTo(rightElement) < 0 )
                                rightIter.previous();
                            else {
                                leftIter.previous();
                                leftIter.add(rightElement);
                            }
                        }
                    }
            
                    return left;
                }
            
                public static void main(String[] args) {
                    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
                    List<Integer> result = forkJoinPool.invoke(new MergeSort<Integer>(Arrays.asList(7,2,9,-7,777777,10,1)));
                    System.out.println("result: " + result);
                }
            }
            

            最后是最复杂的代码版本,此迭代使用完全就地操作。仅创建初始的 ArrayList,并且从未创建任何其他集合。因此,逻辑特别难以遵循(所以我把它留到最后)。但应该尽可能接近理想的实现。

            import java.util.*;
            import java.util.concurrent.*;
            
            public class MergeSort<N extends Comparable<N>> extends RecursiveTask<List<N>> {
                private List<N> elements;
            
                public MergeSort(List<N> elements) {
                    this.elements = elements;
                }
            
                @Override
                protected List<N> compute() {
                    if(this.elements.size() <= 1)
                        return this.elements;
                    else {
                        final int pivot = this.elements.size() / 2;
                        MergeSort<N> leftTask = new MergeSort<N>(this.elements.subList(0, pivot));
                        MergeSort<N> rightTask = new MergeSort<N>(this.elements.subList(pivot, this.elements.size()));
            
                        leftTask.fork();
                        rightTask.fork();
            
                        List<N> left = leftTask.join();
                        List<N> right = rightTask.join();
            
                        merge(left, right);
                        return this.elements;
                    }
                }
            
                private void merge(List<N> left, List<N> right) {
                    int leftIndex = 0;
                    int rightIndex = 0;
                    while(leftIndex < left.size() ) {
                        if(rightIndex == 0) {
                            if( left.get(leftIndex).compareTo(right.get(rightIndex)) > 0 ) {
                                swap(left, leftIndex++, right, rightIndex++);
                            } else {
                                leftIndex++;
                            }
                        } else {
                            if(rightIndex >= right.size()) {
                                if(right.get(0).compareTo(left.get(left.size() - 1)) < 0 )
                                    merge(left, right);
                                else
                                    return;
                            }
                            else if( right.get(0).compareTo(right.get(rightIndex)) < 0 ) {
                                swap(left, leftIndex++, right, 0);
                            } else {
                                swap(left, leftIndex++, right, rightIndex++);
                            }
                        }
                    }
            
                    if(rightIndex < right.size() && rightIndex != 0)
                        merge(right.subList(0, rightIndex), right.subList(rightIndex, right.size()));
                }
            
                private void swap(List<N> left, int leftIndex, List<N> right, int rightIndex) {
                    //N leftElement = left.get(leftIndex);
                    left.set(leftIndex, right.set(rightIndex, left.get(leftIndex)));
                }
            
                public static void main(String[] args) {
                    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
                    List<Integer> result = forkJoinPool.invoke(new MergeSort<Integer>(new ArrayList<>(Arrays.asList(5,9,8,7,6,1,2,3,4))));
                    System.out.println("result: " + result);
                }
            }
            

            【讨论】:

              【解决方案9】:

              为什么您认为并行排序会有所帮助?我认为大多数排序是 i/o 绑定的,而不是处理。除非您的比较进行大量计算,否则不太可能加快速度。

              【讨论】: