【问题标题】:Why MPI_Waitall return before MPI_Irecv get data needed?为什么 MPI_Waitall 在 MPI_Irecv 获取所需数据之前返回?
【发布时间】:2016-02-17 00:00:32
【问题描述】:

我正在使用 MPI 求解具有非结构化网格划分的拉普拉斯方程。我打算先完成邻居分区的数据发送和接收,然后对每个处理器进行计算。 MPI_Waitall 用于等待所有MPI_Isend()MPI_Irecv() 完成,但问题是所有处理器都通过MPI_Waitall 并在读取接收到的缓冲区数据时卡在那里,因为每个处理器实际上没有收到任何数据(标志MPI_Testall 返回 0)。据我了解,MPI_Irecv 应该在MPI_Waitall 返回之前收到数据。

double **sbuf = calloc(partition->ptn_nnbr[my_id], sizeof(double *));
double **rbuf = calloc(partition->ptn_nnbr[my_id], sizeof(double *));
for (i = 0; i < partition->ptn_nnbr[my_id]; i++)
{
    //rbuf[i] = calloc(partition->ptn_cnt[my_id][k1], sizeof(double));
    rbuf[i] = calloc(MAX_nnode, sizeof(double));
    sbuf[i] = calloc(MAX_nnode, sizeof(double));
}

nrm = 1;            // nrm = max(abs(r[i])), i = 1..n
iter = 0;
printf("Entering jacobi; iterations = %d, error norm = %e\n", iter, nrm);
while (nrm > TOL && iter<4 ){
    init_boundary_conditions_ptn(x_ptn, mesh, my_id, partition);

    iter++;     
    int req_idx= 0;     
    int idx = 0;
    MPI_Request *request = (MPI_Request *) calloc(2 * partition->ptn_nnbr[my_id], sizeof(MPI_Request));
    MPI_Status *status = calloc(2 * partition->ptn_nnbr[my_id], sizeof(MPI_Status));
    int *flag = calloc(2 * partition->ptn_nnbr[my_id], sizeof(int));
    for (k1 = 0; k1 < partition->nptn; k1++)
    {
        if (partition->ptn_list[my_id][k1] != NULL)
        {               
            for (i = 0; i < partition->ptn_cnt[k1][my_id]; i++)
            {
                sbuf[idx][i] = x_ptn->val[partition->ptn_list[k1][my_id][i] - partition->ptn[my_id] + 1];
            }
            MPI_Isend(sbuf[idx], partition->ptn_cnt[k1][my_id], MPI_DOUBLE, k1, TAG, MPI_COMM_WORLD, &request[req_idx]);
            //printf("isend done from nbr %d for partition %d \n", k1, my_id);
            req_idx++;              
            idx++;
        }
    }

    idx = 0;
    for (k1 = 0; k1 < partition->nptn; k1++)
    {
        if (partition->ptn_list[my_id][k1] != NULL)
        {
            MPI_Irecv(rbuf[idx], partition->ptn_cnt[my_id][k1], MPI_DOUBLE, k1, TAG, MPI_COMM_WORLD, &request[req_idx]);
            //printf("irecv done from nbr %d for partition %d \n", k1, my_id);
            req_idx++;

            idx++;
        }
    }

    printf("partition %d is waiting \n", my_id);
    MPI_Testall(2 * partition->ptn_nnbr[my_id],request,flag, status);
    for (i = 0; i < 2 * partition->ptn_nnbr[my_id]; i++)
    {
        printf("flag[%d] is %d from partition %d\n", i, flag[i], my_id);
    }

    MPI_Waitall(2 * partition->ptn_nnbr[my_id], request, status);
    printf("partition %d pass MPI_Wait \n", my_id);

    for (k1 = 0; k1 < partition->nptn; k1++)
    {
        if (partition->ptn_list[my_id][k1] != NULL)
        {
            MPI_Probe(k1, TAG, MPI_COMM_WORLD, status1);
            MPI_Get_count(status1, MPI_DOUBLE, &count);
            printf("count is %d from nbr %d \n", count, k1);
            for (i = 0; i < count; i++)
            {
                x->val[partition->ptn_list[my_id][k1][i]] = rbuf[idx][i];
            }
        }
    }    

    //printf("exchange complete from partition %d\n", my_id);

    jacobi_step_csr_matrix(A_ptn, x, b_ptn, y_ptn);     // y = inv(D)*(b + (D-A)*x), D = diag(A)

    copy_vector(y_ptn, x_ptn);
    MPI_Gatherv(x_ptn->val, x_ptn->n, MPI_DOUBLE, x->val, x_count, x_dis, MPI_DOUBLE,0, MPI_COMM_WORLD);
    if (my_id == 0)
    {
        init_boundary_conditions(x, mesh, partition->perm);
        matvec_csr_matrix(A, x, r);     // r = A*x
        sxapy(b, -1.0, r);          // r = b - r
        zero_boundary_conditions(r, mesh, partition->perm);
        nrm = norm_inf(r);
    }
    MPI_Bcast(&nrm, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
    printf("nrm is %f from partition %d in iter %d \n", nrm, my_id, iter);
    free(request);
    free(status);       

输出是:

Processor 0 start Jacobi 
MAx_node is 2 from partition 0 
Entering jacobi; iterations = 0, error norm = 1.000000e+00
Processor 2 start Jacobi 
MAx_node is 2 from partition 2 
Entering jacobi; iterations = 0, error norm = 1.000000e+00
Processor 3 start Jacobi 
MAx_node is 2 from partition 3 
Entering jacobi; iterations = 0, error norm = 1.000000e+00
Processor 1 start Jacobi 
MAx_node is 2 from partition 1 
Entering jacobi; iterations = 0, error norm = 1.000000e+00
partition 3 is waiting 
flag[0] is 0 from partition 3
flag[1] is 0 from partition 3
flag[2] is 0 from partition 3
flag[3] is 0 from partition 3
partition 3 pass MPI_Wait 
partition 0 is waiting 
flag[0] is 0 from partition 0
flag[1] is 0 from partition 0
flag[2] is 0 from partition 0
flag[3] is 0 from partition 0
partition 0 pass MPI_Wait 
partition 2 is waiting 
flag[0] is 0 from partition 2
flag[1] is 0 from partition 2
flag[2] is 0 from partition 2
flag[3] is 0 from partition 2
partition 2 pass MPI_Wait 
partition 1 is waiting 
flag[0] is 0 from partition 1
flag[1] is 0 from partition 1
flag[2] is 0 from partition 1
flag[3] is 0 from partition 1
partition 1 pass MPI_Wait 

【问题讨论】:

    标签: mpi hpc


    【解决方案1】:

    在我看来,您对 MPI 中的非阻塞通信的理解有些模糊。首先,您使用了错误的测试调用。 MPI_Testall 输出一个标量 完成标志,指示在调用MPI_Testall 时是否所有 请求都已完成。如果您改用MPI_Testsome,您会注意到只有一些 个请求(或更可能没有)会完成。 MPI 标准允许推迟非阻塞操作的进程,并且只在某些情况下进行。仅保证完成:

    • 在调用MPI_Wait{all|some|any} 之后(在请求完成之前根本不会返回);
    • MPI_Test{all|some|any} 之后返回一个真正的完成标志。无法保证对 MPI_Test... 的单次调用将导致完成 - 测试函数应重复调用,直到标志指示请求完成为止。

    出于性能原因,大多数 MPI 库都是单线程的,即没有后台线程来处理非阻塞调用,除了一些在硬件中实现进程的特定架构。因此,需要定期调用 MPI 库才能真正发生非阻塞通信,而您期望在调用 MPI_Testall 时所有非阻塞请求都应该完成是完全错误的。

    另外,您的程序卡在MPI_Probe。这是一个阻塞调用,必须在接收消息之前调用,而不是之后调用。 MPI_Irecv 已经收到该消息,并且探测呼叫正在等待另一条永远不会到达的消息。不要拨打MPI_Probe。将status 数组的相关元素传递给MPI_Get_count

    最后一点,您将传递 2 * partition-&gt;ptn_nnbr[my_id] 作为请求数。确保这个值实际上与req_idx 中累积的值匹配,否则你的程序会崩溃。非活动请求必须设置为 MPI_REQUEST_NULL,并且 Open MPI 和 MPICH 都不使用 NULL(在您的情况下通过调用 calloc(3) 设置)用于非活动请求。您应该传递 req_idx 作为请求数。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-01-08
      • 2019-05-14
      • 1970-01-01
      • 1970-01-01
      • 2022-01-26
      相关资源
      最近更新 更多