【发布时间】:2014-09-02 18:42:53
【问题描述】:
我正在学习 MPI-2/MPI-3 中引入的 MPI 单面通信,并遇到了这个online course page about MPI_Accumulate:
MPI_Accumulate 允许调用者合并移动到 已经存在数据的目标进程,例如 在目标进程求和。相同的功能可以通过 使用 MPI_Get 检索数据(随后是同步); 在调用者处执行求和运算;然后使用 MPI_Put 发送 将更新后的数据返回给目标进程。积累简化 这种混乱......
但只有有限数量的运算可以与MPI_Accumulate 一起使用(最大值、最小值、总和、乘积等),并且不允许使用用户定义的运算。我想知道如何使用MPI_Get、sync、op 和MPI_Put 来实现上述混乱。是否有任何 C/C++ 教程或工作代码示例?
谢谢
为了进行测试,我改编了SO question 中的一段代码,其中使用单侧通信来创建一个整数计数器,该计数器在 MPI 进程之间保持同步。使用MPI_Accumulate 标记的目标问题行。
代码按原样编译并在大约 15 秒内返回。但是当我尝试用问题行之后的注释块中所示的等效基本操作序列替换MPI_Accumulate时,编译的程序无限期挂起。
谁能帮忙解释一下出了什么问题,以及
在这种情况下替换MPI_Accumulate 的正确方法是什么?
附:我用
编译了代码g++ -std=c++11 -I.. mpistest.cpp -lmpi
并用
执行二进制文件mpiexec -n 4 a.exe
代码:
//adpated from https://stackoverflow.com/questions/4948788/
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <thread>
#include <chrono>
struct mpi_counter_t {
MPI_Win win;
int hostrank; //id of the process that host values to be exposed to all processes
int rank; //process id
int size; //number of processes
int val;
int *hostvals;
};
struct mpi_counter_t *create_counter(int hostrank) {
struct mpi_counter_t *count;
count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
count->hostrank = hostrank;
MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
MPI_Comm_size(MPI_COMM_WORLD, &(count->size));
if (count->rank == hostrank) {
MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->hostvals));
for (int i=0; i<count->size; i++) count->hostvals[i] = 0;
MPI_Win_create(count->hostvals, count->size * sizeof(int), sizeof(int),
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
}
else {
count->hostvals = NULL;
MPI_Win_create(count->hostvals, 0, 1,
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
}
count -> val = 0;
return count;
}
int increment_counter(struct mpi_counter_t *count, int increment) {
int *vals = (int *)malloc( count->size * sizeof(int) );
int val;
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);
for (int i=0; i<count->size; i++) {
if (i == count->rank) {
MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,count->win); //Problem line: increment hostvals[i] on host
/* //Question: How to correctly replace the above MPI_Accumulate call with the following sequence? Currently, the following causes the program to hang.
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_fence(0,count->win);
vals[i] += increment;
MPI_Put(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_fence(0,count->win);
//*/
} else {
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
}
}
MPI_Win_unlock(0, count->win);
//do op part of MPI_Accumulate's work on count->rank
count->val += increment;
vals[count->rank] = count->val;
//return the sum of vals
val = 0;
for (int i=0; i<count->size; i++)
val += vals[i];
free(vals);
return val;
}
void delete_counter(struct mpi_counter_t **count) {
if ((*count)->rank == (*count)->hostrank) {
MPI_Free_mem((*count)->hostvals);
}
MPI_Win_free(&((*count)->win));
free((*count));
*count = NULL;
return;
}
void print_counter(struct mpi_counter_t *count) {
if (count->rank == count->hostrank) {
for (int i=0; i<count->size; i++) {
printf("%2d ", count->hostvals[i]);
}
puts("");
}
}
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
const int WORKITEMS=50;
struct mpi_counter_t *c;
int rank;
int result = 0;
c = create_counter(0);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
srand(rank);
while (result < WORKITEMS) {
result = increment_counter(c, 1);
if (result <= WORKITEMS) {
printf("%d working on item %d...\n", rank, result);
std::this_thread::sleep_for (std::chrono::seconds(rand()%2));
} else {
printf("%d done\n", rank);
}
}
MPI_Barrier(MPI_COMM_WORLD);
print_counter(c);
delete_counter(&c);
MPI_Finalize();
return 0;
}
还有一个问题,我应该在这里使用MPI_Win_fence 而不是锁吗?
--编辑--
我在increment_counter 中使用锁定/解锁如下,程序运行但行为奇怪。在最终的打印输出中,主节点完成所有工作。还是一头雾水。
int increment_counter(struct mpi_counter_t *count, int increment) {
int *vals = (int *)malloc( count->size * sizeof(int) );
int val;
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);
for (int i=0; i<count->size; i++) {
if (i == count->rank) {
//MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,count->win); //Problem line: increment hostvals[i] on host
///* //Question: How to correctly replace the above MPI_Accumulate call with the following sequence? reports that 0 does all the work
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_unlock(0, count->win);
vals[i] += increment;
MPI_Put(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);
//*/
} else {
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
}
}
MPI_Win_unlock(0, count->win);
//do op part of MPI_Accumulate's work on count->rank
count->val += increment;
vals[count->rank] = count->val;
//return the sum of vals
val = 0;
for (int i=0; i<count->size; i++)
val += vals[i];
free(vals);
return val;
}
【问题讨论】:
-
锁比栅栏(它们是集体的)粒度更细,所以最好尽可能使用它们。但要小心嵌套同步!你在一个已经被锁定的区域内有栅栏,所以你正在上吊。另请注意,在该问题/答案之后出现的 MPI-3 极大地改进了片面的例程和语义......
-
@JonathanDursi 谢谢。我试过锁定/解锁。它仍然没有给出正确的结果。请参阅编辑。
-
MPI-3.0 标准改进了例程和语义。现有的实现没有:)
-
如果这种情况太复杂无法解释,我问了一个更简单的情况stackoverflow.com/questions/24728083 ,其中只有一个“全局”值用于记录进程之间的“最小值”值。即使在更简单的情况下,我也会遇到一些同步问题。
标签: c++ mpi communication mpi-rma