【问题标题】:Can I bundle two MPI messages?我可以捆绑两个 MPI 消息吗?
【发布时间】:2012-09-23 23:05:12
【问题描述】:

我正在尝试进行无序的多对一通信。基本上我有多个相同大小的浮点数组,由整数 id 标识。

每条消息应如下所示:

<int id><float array data>

在接收端,它确切地知道有多少个数组,因此设置了准确的接收数。收到消息后,它会解析 id 并将数据放入正确的位置。问题是消息可以从任何其他进程发送到接收进程。 (例如,生产者有一个工作队列结构,并处理队列中可用的任何 id。)

由于 MPI 只保证 P2P 的订单交付,我不能轻易地将整数 id 和 FP 数据放在两条消息中,否则接收方可能无法将 id 与数据匹配。 MPI 也不允许一次发送两种类型的数据。

我只能想到两种方法。

1) Receiver 有一个大小为 m (source[m]) 的数组,m 是发送节点的数量。发送者首先发送 id,然后是数据。接收者收到来自发送者 i 的整数消息后,将 id 保存到 source[i]。从发送者 i 接收到 FP 数组后,它会检查 source[i],获取 id,并将数据移动到正确的位置。它之所以有效,是因为 MPI 保证了有序的 P2P 通信。它要求接收者保存每个发送者的状态信息。更糟糕的是,如果单个发送进程可以在数据之前发送两个 id(例如多线程),则此机制将不起作用。

2) 将 id 和 FP 视为字节,并将它们复制到发送缓冲区中。将它们作为 MPI_CHAR 发送,接收器将它们转换回整数和 FP 数组。然后我需要支付将内容复制到发送方字节缓冲区的额外费用。随着 MPI 进程中线程数量的增加,总临时缓冲区也会增加。

它们都不是完美的解决方案。我不想在进程中锁定任何东西。不知道大家有没有更好的建议。

编辑:代码将在具有 infiniband 的共享集群上运行。机器将随机分配。所以我认为 TCP 套接字在这里不能帮助我。此外,IPoIB 看起来很昂贵。我确实需要完整的 40Gbps 通信速度,并让 CPU 进行计算。

【问题讨论】:

    标签: c++ c cluster-computing mpi openmpi


    【解决方案1】:

    您可以在接收函数中指定MPI_ANY_SOURCE 作为源排名,然后使用它们的标签对消息进行排序,这比创建自定义消息更容易。这是一个简化的示例:

    #include <stdio.h>
    #include "mpi.h"
    
    int main() {
        MPI_Init(NULL,NULL);
        int rank=0;
        int size=1;
        MPI_Comm_rank(MPI_COMM_WORLD,&rank);
        MPI_Comm_size(MPI_COMM_WORLD,&size);
    
        // Receiver is the last node for simplicity in the arrays
        if (rank == size-1) {
            // Receiver has size-1 slots
            float data[size-1];
            MPI_Request request[size-1];
    
            // Use tags to sort receives
            for (int tag=0;tag<size-1;++tag){
                printf("Receiver for id %d\n",tag);
                // Non-blocking receive
                MPI_Irecv(data+tag,1,MPI_FLOAT,
                          MPI_ANY_SOURCE,tag,MPI_COMM_WORLD,&request[tag]);
            }
    
            // Wait for all requests to complete
            printf("Waiting...\n");
            MPI_Waitall(size-1,request,MPI_STATUSES_IGNORE);
            for (size_t i=0;i<size-1;++i){
                printf("%f\n",data[i]);
            }
        } else {
            // Producer
            int id = rank;
            float data = rank;
            printf("Sending {%d}{%f}\n",id,data);
            MPI_Send(&data,1,MPI_FLOAT,size-1,id,MPI_COMM_WORLD);
        }
    
        return MPI_Finalize();
    }
    

    【讨论】:

    • 这实际上是我当前的实现,唯一的区别是我使用 Isend 作为生产者,数据总量为 GB 量级,大约 3000 次发送。如果发送 id 真的乱序,它会奇怪地死锁,其中一些发送无法继续。如果我按照接收者设置标签的顺序同步发送者,那么死锁就会消失。从 doc 来看,任何一种方式都不应该死锁,因为每个标签只有一个匹配的发送/接收。这就是我想将消息打包并尝试 ANY_SOURCE 和 ANY_TAG 实现的原因。
    • 我确信死锁来自 MPI 发送/接收,因为代码没有任何其他同步机制。 Isend/irecv 不消耗内部缓冲区空间。标签在发送者和接收者之间完美匹配。事实上,只有当发送者发送严重乱序的东西时,才会出现僵局。我没有足够的关于 openMPI 实现的知识来推断这种违反 MPI 规范的行为。
    • 请注意,MPI 标准要求可接受的标签值从 0MPI_TAG_UBMPI_TAG_UB至少 32767。一些实现为MPI_TAG_UB(例如 2^31-1)但其他人没有。如果使用超过 32768 个数组 ID,则使用标签来标识数组的代码将不可移植。
    【解决方案2】:

    正如有人已经写过的,您可以使用MPI_ANY_SOURCE 接收来自任何来源的信息。要在一次发送中发送两种不同类型的数据,您可以使用 derived datatype

    #include <stdio.h>
    #include <stdlib.h>
    #include "mpi.h"
    
    #define asize 10
    
    typedef struct data_ {
      int   id;
      float array[asize];
    } data;
    
    int main() {
    
      MPI_Init(NULL,NULL);
    
      int rank = -1;
      int size = -1;
      MPI_Comm_rank(MPI_COMM_WORLD,&rank);
      MPI_Comm_size(MPI_COMM_WORLD,&size);
    
      data buffer;    
     // Define and commit a new datatype
      int          blocklength [2];
      MPI_Aint     displacement[2];
      MPI_Datatype datatypes   [2];
      MPI_Datatype mpi_tdata;
    
      MPI_Aint     startid,startarray;
      MPI_Get_address(&(buffer.id),&startid);
      MPI_Get_address(&(buffer.array[0]),&startarray);
    
      blocklength [0] = 1;
      blocklength [1] = asize;
      displacement[0] = 0;
      displacement[1] = startarray - startid;
      datatypes   [0] = MPI_INT;
      datatypes   [1] = MPI_FLOAT;
    
      MPI_Type_create_struct(2,blocklength,displacement,datatypes,&mpi_tdata);
      MPI_Type_commit(&mpi_tdata);
    
      if (rank == 0) {
        int        count = 0;
        MPI_Status status;
    
        while (count < size-1 ) {
          // Non-blocking receive
          printf("Receiving message %d\n",count);
          MPI_Recv(&buffer,1,mpi_tdata,MPI_ANY_SOURCE,0,MPI_COMM_WORLD,&status);
          printf("Message tag %d, first entry %g\n",buffer.id,buffer.array[0]);
          // Counting the received messages 
          count++;
        }
    
      } else {
        // Initialize buffer to be sent
        buffer.id = rank;
        for (int ii = 0; ii < size; ii++) {
          buffer.array[ii] = 10*rank + ii;
        }
        // Send buffer
        MPI_Send(&buffer,1,mpi_tdata,0,0,MPI_COMM_WORLD);
      }
    
      MPI_Type_free(&mpi_tdata);
    
      MPI_Finalize();
      return 0;
    }
    

    【讨论】:

    • 我同意使用 MPI_ANY_SOURCE 和标签“应该”工作,但它的行为不像我的代码中指定的那样。这次我会尝试我自己的数据类型。谢谢!
    • 您提出的建议是高度不可移植的,并且在异构环境中不起作用。一个可移植的解决方案是注册一个具有两个字段的 MPI 结构类型,一个是MPI_INT 类型和1 的块长度,另一个是MPI_FLOAT 类型和asize 的块长度。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多