【问题标题】:threads in Java and computationJava中的线程和计算
【发布时间】:2019-11-11 16:02:48
【问题描述】:

我是java新手,我正在尝试编写一个带有两个参数的程序:

  1. 我们必须将素数相加的数字
  2. 我们必须执行此操作的线程数

所以我使用了一个名为 Eratosthene 的方法,它存储了一个 boolean 数组,如果一个数字是素数,我们将其标记为真,然后我们标记所有的倍数这个数字是假的。

我尝试将我的数组划分为每个线程的子数组,并在每个子数组中进行操作,最后将子数组的所有结果相加。

但我不知道我在哪里做错了:有时程序并没有给出好的结果。

这是我的代码:

SumPrime.java

import java.util.*;
import java.util.concurrent.*;

public class SumPrimes {

    private boolean array[];
    private int numberOfWorkers;
    private Semaphore allFinished;

    public SumPrimes(int num, int threads){
        array = new boolean[num];
        numberOfWorkers = threads;
        for (int i = 2; i < num; i++)
            array[i] = true;
    }

    private class SumParallel extends Thread {
        int min;
        int max;
        long sum;

        SumParallel(int min, int max){
            this.min = min;
            this.max = max;
            sum = 0;
        }

        public void run() {
            for (int i = min; i < max; i++) {
                if (array[i]) {
                    for (int j = min; j*i < array.length; j++) {
                        array[i*j] = false;
                    }
                    sum += i;
                }
            }
            allFinished.release();
        }

        public long getSum() {
            return sum;
        }
    }

    public void SumInParallel() {
        allFinished = new Semaphore(0);

        List<SumParallel> workers = new ArrayList<SumParallel>();
        int lengthOfOneWorker = array.length / numberOfWorkers;
        for (int i = 0; i < numberOfWorkers; i++) {
            int start = i * lengthOfOneWorker;
            int end = (i+1) * lengthOfOneWorker;

            if (i == numberOfWorkers - 1)
                end = array.length;
            SumParallel worker = new SumParallel(start, end);
            workers.add(worker);
            worker.start();
        }

        try {
            allFinished.acquire(numberOfWorkers);
        } catch (InterruptedException ignored) {}

        int sum = 0;
        for (SumParallel w : workers){
            sum += w.getSum();
        }

        System.out.println("The sum of prime numbers is: " + sum);
    }

    public static void main(String[] args) {
        int limitNum = Integer.parseInt(args[0]);
        int threadNum = Integer.parseInt(args[1]);
        SumPrimes sum_primes = new SumPrimes(limitNum, threadNum);
        sum_primes.SumInParallel();
    }
}

你可以这样运行程序:

java SumPrimes 1000 3

我愿意接受任何改进我的代码的建议。

【问题讨论】:

  • 无关:您应该使用CountDownLatch,而不是Semaphore。参见例如CountDownLatch vs. Semaphore.
  • 如果您提供一个产生错误值的运行示例以及实际值应该是什么,这将有所帮助。
  • @JosephLarson 例如,如果我这样做 java SumPrimes 200 4 真正的答案是 4227 并且如果我使用此命令多次运行我的程序,有时我会得到很好的答案,但有时我会得到答案要么离好结果太远,要么离好结果太近
  • 这就是所谓的竞争条件,这就是为什么多线程编程会很困难,因为当你做错时结果会有所不同,而且它可能恰好看起来是正确的的时间,所以你甚至可能不知道(一开始)。为什么这里有比赛条件?因为第一个线程更新了其他线程使用的数组值。
  • @Andreas 我把你发送的链接加红了,但我看不出有理由不使用Semaphore。你能解释一下为什么我应该使用CountDownLatch吗?

标签: java multithreading sum primes


【解决方案1】:

您需要完全重新考虑线程的逻辑。

各个线程不能访问array的同一范围,例如如果线程具有min = 100max = 150,则只能使用和/或更改 100 到 149(含)范围内的元素。

您的代码:

for (int i = min; i < max; i++) {
    if (array[i]) {
        for (int j = min; j*i < array.length; j++) {
            array[i*j] = false;

i = 100, j = 100 开头,这就是i*j = 10000。如果数组真的那么大,这意味着您访问array[10000],但这是不允许。当然,数组并没有那么大,所以代码什么都不做

啊,你说,第一个线程有min = 0max = 50,所以它会将值从索引0(0*0)更改为2401(49*49),并且由于数组小于那个,它将更新整个数组,但这是不允许

现在,再想想吧。

如果范围是min = 100, max = 150,那么您需要首先清除该范围内的所有偶数,然后清除所有可被 3 整除的数字,然后所有...等等,但仅限于 该范围。

我会让你重新思考逻辑。


更新

要将Sieve of Eratosthenes 应用于某个范围,我们需要素数直到该范围最大值的平方根。

如果范围是min = 150, max = 200,那么maxPrime = sqrt(200) = 14,所以我们需要从2到14(含)的素数,那么我们可以更新范围150-199。

假设我们首先更新 array 以找到 2-14 范围内的所有素数,我们可以使用它来迭代目标范围 (150-199) 内这些素数的倍数。为此,我们需要从大于等于 min 的素数的最小倍数开始,因此我们需要将 min 向上舍入到 prime 的下一个倍数。

使用整数数学,到round up to next multiple,我们计算:

lower = (min + prime - 1) / prime * prime

这给了我们主要的逻辑:

maxPrime = (int) Math.sqrt(max);
for (int prime = 2; prime <= maxPrime; prime++) {
    if (array[prime]) {
        int lower = (min + prime - 1) / prime * prime;
        for (int i = lower; i < max; i += prime)
            array[i] = false

我们还应该让每个线程负责首先设置范围内的所有布尔值,以便该部分也成为多线程。

主逻辑现在必须首先在主线程中找到 2-sqrt(N) 范围内的素数,然后在线程之间分割剩余的范围。

这是我的尝试:

public static long sumPrimes(int n, int threadCount) {
    // Find and sum the "seed" primes needed by the threads
    int maxSeedPrime = (int) Math.sqrt(n + 2); // extra to be sure no "float errors" occur
    boolean[] seedPrime = new boolean[maxSeedPrime + 1];
    AtomicLong totalSum = new AtomicLong(sumPrimes(seedPrime, seedPrime, 0, maxSeedPrime));

    // Split remaining into ranges and start threads to calculate sums
    Thread[] threads = new Thread[threadCount];
    for (int t = 0, rangeMin = maxSeedPrime + 1; t < threadCount; t++) {
        int min = rangeMin;
        int max = min + (n - min + 1) / (threadCount - t) - 1;
        threads[t] = new Thread(() ->
            totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max))
        );
        threads[t].start();
        rangeMin = max + 1;
    }

    // Wait for threads to end
    for (int t = 0; t < threadCount; t++) {
        try {
            threads[t].join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // Return the calculated sum
    return totalSum.get();
}
private static long sumPrimes(boolean[] seedPrime, boolean[] rangePrime, int min, int max/*inclusive*/) {
    // Initialize range
    for (int i = Math.max(min, 2); i <= max; i++) {
        rangePrime[i - min] = true;
    }

    // Mark non-primes in range
    int maxPrime = (int) Math.sqrt(max + 1); // extra to be sure no "float errors" occur
    for (int prime = 2; prime <= maxPrime; prime++) {
        if (seedPrime[prime]) {
            int minMultiple = (min + prime - 1) / prime * prime;
            if (minMultiple <= prime)
                minMultiple = prime * 2;
            for (int multiple = minMultiple; multiple <= max ; multiple += prime) {
                rangePrime[multiple - min] = false;
            }
        }
    }

    // Sum the primes
    long sum = 0;
    for (int prime = min; prime <= max; prime++) {
        if (rangePrime[prime - min]) {
            sum += prime;
        }
    }
    return sum;
}

测试

public static void main(String[] args) {
    test(1000, 3);
    test(100000000, 4);
}
public static void test(int n, int threadCount) {
    long start = System.nanoTime();
    long sum = sumPrimes(n, threadCount);
    long end = System.nanoTime();
    System.out.printf("sumPrimes(%,d, %d) = %,d (%.9f seconds)%n",
                      n, threadCount, sum, (end - start) / 1e9);
}

输出

sumPrimes(1,000, 3) = 76,127 (0.005595600 seconds)
sumPrimes(100,000,000, 4) = 279,209,790,387,276 (0.686881000 seconds)

更新 2

上面的代码使用了 lambda 表达式:

threads[t] = new Thread(() ->
    totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max))
);

如果您不想使用 lambda 表达式,例如因此它将在 Java 7 上运行,您可以改用匿名类:

threads[t] = new Thread() {
    @Override
    public void run() {
        totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max));
    }
};

【讨论】:

  • 但是在第二个循环中我说如果i*j &lt; array.length。这意味着我总是跑到范围内,而且我永远不会超过范围,不是吗?
  • @Mohammadreza 说数组有 200 个元素,你有 4 个线程。然后第三个线程进入处理范围 100-149(含),外部循环将在该范围内迭代 i。但是j 将从 100 开始迭代,而j*i 从 10000 开始,这不是 &lt;= 200,因此 j 循环根本不会迭代
  • 经过一些思考和一些测试,我总是在我开始的地方,我真的不知道如何解决这个问题。因为我想做的是去修改其他子数组。我不想只停留在每个线程的每个子数组的范围内。我希望每个线程也修改其他线程的子数组
  • 您的观察结果对于通用素数检验是正确的,因为它足以将除数运行到所讨论数字的平方根。然而这里sum+=i; 部分很重要,所以i 必须运行到max,而这个内部for 对大数没有任何作用,这可以通过额外的if 来强调,但是检查只是一个额外的步骤,没有任何好处。
  • 感谢您更新的答案。我想知道在这个版本中是每个线程只修改它的子数组,还是每个线程都修改比他更大的其他线程的子数组?
【解决方案2】:

多线程通常也意味着您想要更快地制作某些东西。因此,首先回顾一下您的初始设计并使其在单线程上更快可能是值得的。那么这是一个要击败的目标。此外,为了在不编写精炼基准的情况下比较运行时间,您需要一个“可见”长度的运行时间。
在我的机器上,使用“设置”

int max = 1_000_000_000;
boolean sieve[] = new boolean[max];
long sum = 0; // will be 24739512092254535 at the end

您的原始代码,

for(int i=2;i<max;i++)
    if(!sieve[i]) {
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;
        sum+=i;
    }

运行 24-28 秒。正如@Andreas 帖子下面的 cmets 中所讨论的,后来在里面(是的,现在我看到它被接受了,大部分讨论都消失了),内部循环做了很多额外的检查(因为它一直做一个比较,即使它实际上不会启动)。因此,外部循环可以分为两部分:首先筛选和求和(直到 max 的最后一个“未知”除数,不超过其平方根),然后对其余部分求和:

int maxunique=(int)Math.sqrt(max);
for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;
        sum+=i;
    }
for(int i=maxunique+1;i<max;i++)
    if(!sieve[i])
        sum+=i;

这个在我的机器上运行 14-16 秒。显着的收获,还没有涉及线程。

然后是线程,if(!sieve[i]) 的问题:在计算总和时,这种检查不能在小于i 的低素数的内部循环超过i 之前发生,所以sieve[i]真的告诉它是否是素数。因为例如如果一个线程像for(int i=4;i&lt;10001;i+=2)sieve[i]=true;一样运行,而另一个线程同时检查sieve[10000],它仍然会是false,而10000会被误认为是一个素数。
第一次尝试可能是在一个线程上进行筛选(它的外循环“仅”是max 的平方根),然后并行求和:

for(int i=2;i<=maxunique;i++)
    if(!sieve[i])
        for(int j=i*2;j<max;j+=i)
            sieve[j]=true;

int numt=4;
Thread sumt[]=new Thread[numt];
long sums[]=new long[numt];
for(int i=0;i<numt;i++) {
    long ii=i;
    Thread t=sumt[i]=new Thread(new Runnable() {
        public void run() {
            int from=(int)Math.max(ii*max/numt,2);
            int to=(int)Math.min((ii+1)*max/numt,max);
            long sum=0;
            for(int i=from;i<to;i++)
                if(!sieve[i])
                    sum+=i;
            sums[(int)ii]=sum;
        }
    });
    t.start();
}

for(int i=0;i<sumt.length;i++) {
    sumt[i].join();
    sum+=sums[i];
}

这有点简洁,所有线程(我有 4 个核心)检查相同数量的候选者,结果更快。有时几乎是一秒钟,但大部分时间大约是一半(~0.4 ... ~0.8 秒)。所以这个真的不值得努力,筛分循环是这里真正耗时的部分。

人们可以决定允许冗余工作,并为筛子中遇到的每个素数启动一个线程,即使它不是实际的素数,只是还没有被勾选:

List<Thread> threads=new ArrayList<>();
for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        int ii=i;
        Thread t=new Thread(new Runnable() {
            public void run() {
                for(int j=ii*2;j<max;j+=ii)
                    sieve[j]=true;
            }
        });
        t.start();
        threads.add(t);
    }
//System.out.println(threads.size());
for(int i=0;i<threads.size();i++)
    threads.get(i).join();

for(int i=maxunique+1;i<max;i++)
    if(!sieve[i])
        sum+=i;

评论的println() 会告诉(在我的机器上)创建了 3500-3700 个线程(而如果有人在原始循环中放置一个计数器,结果证明 3401 将是最小值,会遇到许多素数在单线程筛环中)。虽然超调不是灾难性的,但线程数相当高,增益也不是太高,尽管它比之前的尝试更明显:运行时间为 10-11 秒(当然可以降低一半更多秒,通过使用并行求和循环)。
人们可以通过关闭循环来解决一些冗余工作,当它们被证明是过滤非素数时:

for(int j=ii*2;j<max && !sieve[ii];j+=ii)

这个其实有点效果,对我来说运行时间是8.6-10.1秒。

由于创建 3401 个线程并不比创建 3700 个线程更疯狂,因此限制它们的数量可能是一个好主意,这就是更容易向Threads 挥手告别的地方。虽然在技术上可以计算它们,但有各种内置基础架构可以为我们做到这一点。
Executors 可以帮助将线程数限制为固定数量 (newFixedThreadPool()),或者更好,到可用的 CPU 数量 (newWorkStealingPool()):

ExecutorService es=Executors.newWorkStealingPool();
ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<Object>(es);

int count=0;

for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        int ii=i;
        count++;
        ecs.submit(new Callable<Object>() {
            public Object call() throws Exception {
                // if(!sieve[ii])
                for(int j=ii*2;j<max /**/ && !sieve[ii] /**/;j+=ii)
                    sieve[j]=true;
                return null;
            }
        });
    }
System.out.println(count);
while(count-->0)
    ecs.take();
es.shutdown();
long sum=0;

for(int i=2;i<max;i++)
    if(!sieve[i])
        sum+=i;

这样它会产生与前一种相似的结果(8.6-10.5s)。但是,对于低 CPU 数量(4 核),交换条件会导致一些加速(取消注释 if 并在循环中注释相同的条件,在 /**/ 之间),因为任务按其提交顺序运行,因此大多数冗余循环可以在一开始就退出,从而使重复检查浪费时间。然后对我来说是 8.5-9.3s,击败了直接线程尝试的最佳和最差时间。但是,如果您的 CPU 数量很高(我也在一个超级计算节点上运行它,根据 Runtime.availableProcessors() 提供了 32 个内核),任务将重叠更多,并且非欺骗版本(所以总是做检查的那个)是会更快。

如果你想要一个小的加速,具有相当好的可读性,你可以并行化内部循环(Threads 也可以,只是非常乏味),使用流:

long sum=0;
for(int i=2;i<=maxunique;i++)
    if(!sieve[i]) {
        sum+=i;
        int ii=i;
        IntStream.range(1, (max-1)/i).parallel().forEach(
            j -> sieve[ii+j*ii]=true);
    }

for(int i=maxunique+1;i<max;i++)
    if(!sieve[i])
        sum+=i;

这个非常像最初的优化循环对,并且仍然有一些速度,对我来说是 9.4-10.0 秒。所以它比其他的慢(约 10% 左右),但它要简单得多。


更新:
  1. 我修复了一系列错误:xy&lt;maxuniques 现在是 xy&lt;=maxuniques。虽然它不/幸运地没有影响巨大的结果,但它确实在max=9 这样简单的情况下失败了(当maxunique=3xy&lt;3 循环时,9 将保持素数,总和是 26 而不是 17 )。嗯。也修复了几个延续循环(所以它们现在从maxunique+1 继续)。

  2. 创建无限数量的子任务困扰着我,幸运的是发现了一个倒置设计,我们不检查是否达到sqrt(max)(即maxunique),但我们知道如果我们有完成对低于某个limit 的数字的筛选,我们可以继续检查直到limit*limit 的数字,因为在范围内(limit ... limit*limit)内仍然是质数的东西实际上是质数(我们仍然可以请记住,此上限以maxunique 为界。因此可以并行筛选。

基础算法,仅用于检查(单线程):

int limit=2;
do {
    int upper=Math.min(maxunique+1,limit*limit);
    for(int i=limit;i<upper;i++)
        if(!sieve[i]) {
            sum+=i;
            for(int j=i*2;j<max;j+=i)
                sieve[j]=true;
        }
    limit=upper;
} while(limit<=maxunique);

for(int i=limit;i<max;i++)
    if(!sieve[i])
        sum+=i;

由于某种原因,它比原来的双循环变体稍慢(13.8-14.5 秒 vs 13.7-14.0 秒,最少/最多 20 次运行),但无论如何我对并行化很感兴趣。
可能是由于素数分布不均匀,使用并行流效果不佳(我认为它只是将工作预先划分为看似相等的部分),但基于Executor 的方法效果很好:

ExecutorService es=Executors.newWorkStealingPool();
ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<>(es);

int limit=2;
int count=0;
do {
    int upper=Math.min(maxunique+1,limit*limit);
    for(int i=limit;i<upper;i++)
        if(!sieve[i]) {
            sum+=i;
            int ii=i;
            count++;
            ecs.submit(new Callable<Object>() {
                public Object call() throws Exception {
                    for(int j=ii*2;j<max;j+=ii)
                        sieve[j]=true;
                    return null;
                }
            });
        }
    while(count>0) {
        count--;
        ecs.take();
    }
    limit=upper;
} while(limit<=maxunique);

es.shutdown();

for(int i=limit;i<max;i++)
    if(!sieve[i])
        sum+=i;

对于低 CPU 计数的环境,这是迄今为止最快的一个(7.4-9.0 秒与“无限线程数”的 8.7-9.9 秒和另一个 Executor 的 8.5-9.2 秒相比-基于一个)。然而,一开始它运行的并行任务数量很少(当limit=2 时,它只启动两个并行循环,用于 2 和 3),最重要的是,这些是运行时间最长的循环(步数最小) ,因此在 CPU 计数较高的环境中,它仅落后于基于 Executor 的原始环境,2.9-3.6 秒与 2.7-3.2 秒)。
当然,可以在一开始就实施单独的加速,明确收集必要数量的素数以使可用内核饱和,然后切换到这种基于limit 的方法,然后结果可能会击败其他方法,不管核心数。不过我觉得我现在可以抵挡住诱惑。

【讨论】:

  • 非常感谢您的深入解释。你提出了一些好的观点。
【解决方案3】:

我认为你的问题是这段代码:

   public void run() {
        for (int i = min; i < max; i++) {
            if (array[i]) {
                for (int j = min; j*i < array.length; j++) {
                    array[i*j] = false;
                }
                sum += i;
            }
        }
        allFinished.release();
    }

想象一下您后来的一个线程,在列表末尾附近工作。第一项不是素数,但识别它不是素数的工作还没有完成——它来自不同的线程,并且该线程刚刚开始。所以你相信这个值是素数(它还没有被标记为非素数)并相应地工作。

如果你提供一个产生不好结果的例子,我们可以很容易地测试这个理论。

【讨论】:

  • 例如,您可以多次运行java SumPrimes 200 4 并查看答案。正确的结果必须是 4227,如果你多次运行这个命令,你会看到它给了你正确的答案,但有时,它给你的答案是错误的
  • 听起来像是一种竞争条件,这就是我试图在回答中解释的内容。
  • 我不认为您可以将数组拆分为多个部分并按照您的方式进行测试。我认为你能做的最好的事情就是分开标记错误,即使那样,你也不能比你的子线程标记错误更快。
猜你喜欢
  • 2016-09-06
  • 2021-07-08
  • 2016-08-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-07-02
  • 2018-05-17
相关资源
最近更新 更多