【问题标题】:Multithreaded merge sort, adding additional threads多线程归并排序,增加额外线程
【发布时间】:2016-01-03 19:26:53
【问题描述】:

我在java中的多线程合并排序算法中遇到了一个问题。

我应该通过将原始数组划分为subArrays,将代码修改为3、4、5、6、7、8线程合并排序。目前它有 2 个subArrays。 如何将原始数组拆分为 3、4、5、6、7、8 subArrays 以实现我的目标? 此外,我应该多写一些方法,因为mergeSort 方法现在调用lefthalfrighthalf 方法。所以对于 3,4,5,6,7,8 线程我应该写额外的方法。 我该如何处理?

two_threaded_merge_sort.java

public class two_threaded_merge_sort {

public static void finalMerge(int[] a, int[] b) {
    int[] result = new int[a.length + b.length];
    int i=0; 
    int j=0; 
    int r=0;
    while (i < a.length && j < b.length) {
        if (a[i] <= b[j]) {
            result[r]=a[i];
            i++;
            r++;
        } else {
            result[r]=b[j];
            j++;
            r++;
        }
        if (i==a.length) {
            while (j<b.length) {
                result[r]=b[j];
                r++;
                j++;
            }
        }
        if (j==b.length) {
            while (i<a.length) {
                result[r]=a[i];
                r++;
                i++;
            }
        }
    }
}

public static void main(String[] args) throws InterruptedException {
    Random rand = new Random();
    int[] original = new int[9000000];
    for (int i=0; i<original.length; i++) {
        original[i] = rand.nextInt(1000);
    }

    long startTime = System.currentTimeMillis();
    int[] subArr1 = new int[original.length/2];
    int[] subArr2 = new int[original.length - original.length/2];
    System.arraycopy(original, 0, subArr1, 0, original.length/2);
    System.arraycopy(original, original.length/2, subArr2, 0, original.length - original.length/2);

    Worker runner1 = new Worker(subArr1);
    Worker runner2 = new Worker(subArr2);
    runner1.start();
    runner2.start();
    runner1.join();
    runner2.join();
    finalMerge (runner1.getInternal(), runner2.getInternal());
    long stopTime = System.currentTimeMillis();
    long elapsedTime = stopTime - startTime;
    System.out.println("2-thread MergeSort takes: " + (float)elapsedTime/1000 + " seconds");
}

}

Worker.java

class Worker extends Thread {
private int[] internal;

public int[] getInternal() {
    return internal;
}

public void mergeSort(int[] array) {
    if (array.length > 1) {
        int[] left = leftHalf(array);
        int[] right = rightHalf(array);

        mergeSort(left);
        mergeSort(right);

        merge(array, left, right);
    }
}

public int[] leftHalf(int[] array) {
    int size1 = array.length / 2;
    int[] left = new int[size1];
    for (int i = 0; i < size1; i++) {
        left[i] = array[i];
    }
    return left;
}

public int[] rightHalf(int[] array) {
    int size1 = array.length / 2;
    int size2 = array.length - size1;
    int[] right = new int[size2];
    for (int i = 0; i < size2; i++) {
        right[i] = array[i + size1];
    }
    return right;
}

public void merge(int[] result, int[] left, int[] right) {
    int i1 = 0;   
    int i2 = 0;   

    for (int i = 0; i < result.length; i++) {
        if (i2 >= right.length || (i1 < left.length && left[i1] <= right[i2])) {
            result[i] = left[i1];   
            i1++;
        } else {
            result[i] = right[i2];   
            i2++;
        }
    }
}

Worker(int[] arr) {
    internal = arr;
}

public void run() {
    mergeSort(internal);
}
}

非常感谢!

【问题讨论】:

    标签: java multithreading algorithm sorting


    【解决方案1】:

    需要有一个排序函数将数组分成 k 个部分,然后创建 k 个线程对每个部分进行排序,使用自上而下或自下而上的方法,(自下而上会稍微快一些),并等待所有线程完成。

    此时有 k 个已排序的部分。这些可以使用 k 路合并(复杂)一次合并,或者一次合并一对部分(2 路合并),可能使用多个线程,但此时进程可能是内存带宽有限的,所以多线程可能没有多大帮助。

    当将数组分成 k 部分时,可以使用类似这样的方法来保持大小相似:

    int r = n % k;
    int s = n / k;
    int t;
    for each part{
        t = r ? 1 : 0;
        r -= t;
        size = s + t;
    }
    

    int r = n % k;
    int s = n / k + 1;
    while(r--){
       next part size = s; // n / k + 1
    }
    s -= 1;
    while not done{
       next part size = s; // n / k
    }
    

    【讨论】:

      【解决方案2】:

      在我看来,您的辛勤工作已经完成。现在您必须使用线程数对算法进行参数化。

      你的算法有两个部分

      1. 拆分工作。
      2. 合并 k 部分。

      还有两个组件:

      1. 主要算法
      2. 工人。

      关于线程

      在我看来,Start/join 方法在这种情况下没有用,因为直到所有线程都完成后才能开始最后的合并。我更喜欢“2 路合并”(@rcgldr 答案)和线程池(ExecutorService)。 您必须小心线程同步和共享内存。

      总结一下,我提出了一点不同的解决方案:

      import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.Iterator;
      import java.util.List;
      import java.util.Random;
      import java.util.concurrent.Executors;
      import java.util.concurrent.ExecutorService;
      
      public class MultithreadedMergeSort {
      
          private int[] array;
          private int numThreads;
          private List<int[]> sortedFragments;
      
          private MultithreadedMergeSort(int numThreads, int[] array) {
              this.numThreads = numThreads;
              this.array = array;
          }
      
          // Basic algorithm: it sort recursively a fragment
          private static void recursiveMergeSort(int[] array, int begin, int end) {
              if (end - begin > 1) {
                  int middle = (begin + end) / 2;
                  recursiveMergeSort(array, begin, middle);
                  recursiveMergeSort(array, middle, end);
                  merge(array, begin, middle, end);
              }
          }
      
          // Basic algorithm: it merges two consecutives sorted fragments
          private static void merge(int[] array, int begin, int middle, int end) {
              int[] firstPart = Arrays.copyOfRange(array, begin, middle);
              int i = 0;
              int j = middle;
              int k = begin;
              while (i < firstPart.length && j < end) {
                  if (firstPart[i] <= array[j]) {
                      array[k++] = firstPart[i++];
                  } else {
                      array[k++] = array[j++];
                  }
              }
              if (i < firstPart.length) {
                  System.arraycopy(firstPart, i, array, k, firstPart.length - i);
              }
          }
      
          public static void sort(int[] array, int numThreads) throws InterruptedException {
              if (array != null && array.length > 1) {
                  if (numThreads > 1) {
                      new MultithreadedMergeSort(numThreads, array).mergeSort();
                  } else {
                      recursiveMergeSort(array, 0, array.length);
                  }
              }
          }
      
          private synchronized void mergeSort() throws InterruptedException {
              // A thread pool
              ExecutorService executors = Executors.newFixedThreadPool(numThreads);
              this.sortedFragments = new ArrayList<>(numThreads - 1);
              int begin = 0;
              int end = 0;
      
              // it split the work
              for (int i = 1; i <= (numThreads - 1); i++) {
                  begin = end;
                  end = (array.length * i) / (numThreads - 1);
                  // sending the work to worker
                  executors.execute(new MergeSortWorker(begin, end));
              }
              // this is waiting until work is done
              wait();
      
              // shutdown the thread pool.
              executors.shutdown();
          }
      
          private synchronized int[] notifyFragmentSorted(int begin, int end) {
              if (begin > 0 || end < array.length) {
                  // the array is not completely sorted
      
                  Iterator<int[]> it = sortedFragments.iterator();
                  // searching a previous or next fragment
                  while (it.hasNext()) {
                      int[] f = it.next();
                      if (f[1] == begin || f[0] == end) {
                          // It found a previous/next fragment
                          it.remove();
                          return f;
                      }
                  }
                  sortedFragments.add(new int[]{begin, end});
              } else {
                  // the array is sorted
                  notify();
              }
              return null;
          }
      
          private class MergeSortWorker implements Runnable {
      
              int begin;
              int end;
      
              public MergeSortWorker(int begin, int end) {
                  this.begin = begin;
                  this.end = end;
              }
      
              @Override
              public void run() {
                  // Sort a fragment
                  recursiveMergeSort(array, begin, end);
                  // notify the sorted fragment
                  int[] nearFragment = notifyFragmentSorted(begin, end);
      
                  while (nearFragment != null) {
                      // there's more work: merge two consecutives sorted fragments, (begin, end) and nearFragment
                      int middle;
                      if (nearFragment[0] < begin) {
                          middle = begin;
                          begin = nearFragment[0];
                      } else {
                          middle = nearFragment[0];
                          end = nearFragment[1];
                      }
                      merge(array, begin, middle, end);
                      nearFragment = notifyFragmentSorted(begin, end);
                  }
              }
          }
      
          public static void main(String[] args) throws InterruptedException {
              int numThreads = 5;
      
              Random rand = new Random();
              int[] original = new int[9000000];
              for (int i = 0; i < original.length; i++) {
                  original[i] = rand.nextInt(1000);
              }
      
              long startTime = System.currentTimeMillis();
      
              MultithreadedMergeSort.sort(original, numThreads);
      
              long stopTime = System.currentTimeMillis();
              long elapsedTime = stopTime - startTime;
              // warning: Take care with microbenchmarks
              System.out.println(numThreads + "-thread MergeSort takes: " + (float) elapsedTime / 1000 + " seconds");
          }
      }
      

      【讨论】:

      • 非常感谢您的回答。但是有什么方法可以将一个数组平均分成 4 和 6 和 8 部分?
      • 如何在控制台上打印上述代码的结果?以 20 个数组元素为例。谢谢
      • @Zevs 例如:System.out.print(original[0]); for (int i = 1; i &lt; original.length; i++) {System.out.print(", "+original[i]);}
      猜你喜欢
      • 1970-01-01
      • 2011-01-13
      • 1970-01-01
      • 1970-01-01
      • 2011-02-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-05-21
      相关资源
      最近更新 更多