【问题标题】:Parallizing the calculation of an integral对积分的计算进行并行化
【发布时间】:2020-10-10 02:11:18
【问题描述】:

这里我有一段代码,它是一个计算函数积分的函数。在代码中,function() 被定义为要集成的函数。

我正在学习并行编程,我需要并行编写这段代码。原始程序是顺序的,因为每次迭代都会对另一个处理器执行一次发送操作。为了使其并行化,我想要实现的是,每个循环迭代 3 个发送操作都被执行到其他 3 个可用处理器。假设有 1 个处理器负责划分任务(等级 = 0),另外 3 个处理器负责执行实际计算。

请注意,这是一大段代码,但我还包括了 cmets 以使其更清晰:

顺序代码:

    if (myRank == 0)
    {
        // I am the controller, distribute the work
        for (step = 0; step < maxSteps; step++)
        {
            x[0] = x_start + stepSize*step;
            x[1] = x_start + stepSize*(step+1);
            nextRank = step % (numProcs-1) + 1;
            // Send the work
            MPI_Send(x, 2, MPI_DOUBLE, nextRank, TAG_WORK, MPI_COMM_WORLD);
            // Receive the result
            MPI_Recv(y, 2, MPI_DOUBLE, nextRank, TAG_WORK, MPI_COMM_WORLD,
                MPI_STATUS_IGNORE);
            sum += stepSize*0.5*(y[0]+y[1]);
        }
        // Signal workers to stop by sending empty messages with tag TAG_END
        for (nextRank = 1; nextRank < numProcs; nextRank++)
            MPI_Send(&nextRank, 0, MPI_INT, nextRank, TAG_END, MPI_COMM_WORLD);
    }
    else
    {
        while (1)
        {
            // I am a worker, wait for work

            // Receive the left and right points of the trapezoid and compute
            // the corresponding function values. If the tag is TAG_END, don't
            // compute but exit.
            MPI_Recv(x, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD,
                &status);
            if (status.MPI_TAG == TAG_END) break;
            y[0] = f(x[0]);
            y[1] = f(x[1]);
            // Send back the computed result
            MPI_Send(y, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
        }
    }
    return sum;
}

为了使其并行化,我对它进行了硬编码,以明确我的工作。我以 3 步进行循环增量。我添加了新数组来存储 x 和 y 值。我所做的是首先收集特定数组中的 x 值。然后我将每个 x 值数组发送到一个新的处理器。然后我执行另一个函数来获取 y 值。然后我将它们发送回处理器(rank = 0)以添加所有“集成切片”。

尝试并行化的代码

 if (myRank == 0)
    {
        // I am the controller, distribute the work
        for (step = 0; step < maxSteps; step+3)
        {
            x1[0] = x_start + stepSize*step;
            x1[1] = x_start + stepSize*(step+1);
            x2[0] = x_start + stepSize*(step+1);
            x2[1] = x_start + stepSize*((step+1)+1);
            x3[0] = x_start + stepSize*(step+2);
            x3[1] = x_start + stepSize*((step+1)+2);
            nextRank = step % (numProcs-1) + 1;
            // Send the work
            MPI_Send(x1, 2, MPI_DOUBLE, 1, TAG_WORK, MPI_COMM_WORLD);
            MPI_Send(x2, 2, MPI_DOUBLE, 2, TAG_WORK, MPI_COMM_WORLD);
            MPI_Send(x3, 2, MPI_DOUBLE, 3, TAG_WORK, MPI_COMM_WORLD);
            // Receive the result
            MPI_Recv(y1, 2, MPI_DOUBLE, 1, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            sum += stepSize*0.5*(y1[0]+y1[1]);
            MPI_Recv(y2, 2, MPI_DOUBLE, 2, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            sum += stepSize*0.5*(y2[0]+y2[1]);
            MPI_Recv(y3, 2, MPI_DOUBLE, 3, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            sum += stepSize*0.5*(y3[0]+y3[1]);
        }
        // Signal workers to stop by sending empty messages with tag TAG_END
        for (nextRank = 1; nextRank < numProcs; nextRank++)
            MPI_Send(&nextRank, 0, MPI_INT, nextRank, TAG_END, MPI_COMM_WORLD);
    }
    else if (myRank = 1)
    {
        while (1)
        {
            MPI_Recv(x1, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
            if (status.MPI_TAG == TAG_END) break;
            y1[0] = func(x1[0]);
            y1[1] = func(x1[1]);
            // Send back the computed result
            MPI_Send(y1, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
        }
    }
    
    else if (myRank = 2)
    {
        while (1)
        {
            MPI_Recv(x2, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
            if (status.MPI_TAG == TAG_END) break;
            y2[0] = func(x2[0]);
            y2[1] = func(x2[1]);
            // Send back the computed result
            MPI_Send(y2, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
        }
    }
    
    else if (myRank = 3)
    {
        while (1)
        {
            MPI_Recv(x3, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
            if (status.MPI_TAG == TAG_END) break;
            y3[0] = func(x3[0]);
            y3[1] = func(x3[1]);
            // Send back the computed result
            MPI_Send(y3, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
        }
    }
    return sum;
}

问题是我不再得到输出。恐怕我造成了僵局,但我无法发现在哪里。我可以得到有关此方法的反馈吗?

来源:https://doc.itc.rwth-aachen.de/display/VE/PPCES+2012

【问题讨论】:

  • = 是 C 中的赋值运算符。使用 == 来检查相等性。
  • 能否请您满足我的好奇心,让我知道您从哪里获得原始代码?我之所以问,是因为它可以追溯到 2012 年,并且来自 PPCES 2012 的 MPI 部分 - 亚琛工业大学集群用户的年度教程。大多数代码逻辑来自我之前的其他人,但代码清理和长 cmets 绝对是我的......顺便说一句,您的问题的答案是应该在 PPCES 网站上提供的解决方案。这同样适用于您关于 MPI 的其他一些问题 :-)
  • 很高兴听到其他人发现 PPCES 的材料很有用。我希望您的讲师给 RWTH 的人一个帽子提示,因为他们通过将所有材料提供给互联网上的任何人,为社区提供了一项出色的服务。
  • 原代码的问题是每次一个工作项被发送到一个worker rank,控制器在发送另一个工作项之前等待返回结果。这与顺序执行没有什么不同,只是计算被卸载到不同的等级。您需要对其进行修改,以允许多个等级同时处理工作项,并且不会留下空闲间隙。在每次迭代中将任务发送到所有 rank 只会让事情稍微好一点,因为更快的 rank 仍然需要等待当前迭代中最慢的 rank。
  • 这种“controller-worker”模式(以前​​称为“master-slave”)的典型实现是控制器将最初的任务发送给所有worker,然后进入一个循环进行监听任何工人等级的结果。一旦收到结果,控制器就会检查是否有更多工作项要处理,如果有,它会向接收结果的同一个工作人员发送一个新的工作项。如果没有更多工作,控制器会发送停止消息。

标签: c math parallel-processing mpi


【解决方案1】:

如果您想从拥有 8 个内核中获利(这只是一个示例),您可以做的最好的事情(也是最简单的)是将您的积分区间分成八部分(您可以任意划分,给出每个线程的工作量相同,这取决于您),然后独立计算每个线程中的每个积分(使用与一个线程相同的循环)。

这种方法不会改变你原来的计算,并使计算完全相互独立(所以根本没有资源争用)

最后你只需要把八个积分相加就可以得到你想要的结果。

如果您正在考虑展开循环以实现更多并行性,那么您最好相信您的编译器,它能够并行使用他的优化器来从普通 CPU 目前拥有的超过 32 个寄存器中获利,并且你很可能不会做得更好。

这里建议的方法将您的积分转换为 8 种不同的积分计算,每种都有不同的参数和不同的值,并且一个线程中的微积分不依赖于其他线程的微积分,因此,即使在基于管道的线程核心中,您将永远不必重新排序或使指令复杂化,因为很容易将另一个线程的指令添加到管道中以避免产生气泡。如果你有 8 个内核,实际上超过 8 个线程计算某事并不代表任何有利的任务。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-12-09
    • 2018-10-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多