【发布时间】:2022-01-01 23:23:35
【问题描述】:
在我的应用程序中,与 MPI 单向通信 (RMA) 相比,我通过 TBB 流程图实现了我所谓的“稀疏向量减少”。该算法的核心部分如下所示:
auto &reduce = m_g_R.add<function_node<ReductionJob, ReductionJob>>(
serial,
[=, &reduced_bi](ReductionJob rj) noexcept
{
const auto r = std::get<0>(rj);
auto *buffer = std::get<1>(rj)->data.data();
auto &mask = std::get<1>(rj)->mask;
if (m_R_comms[r] != MPI_COMM_NULL)
{
const size_t n = reduced_bi.dim(r);
MPI_Win win;
MPI_Win_create(
buffer,
r == mr ? n * sizeof(T) : 0,
sizeof(T),
MPI_INFO_NULL,
m_R_comms[r],
&win
);
if (n > 0 && r != mr)
{
MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, win);
size_t i = 0;
do
{
while (i < n && !mask[i]) ++i;
size_t base = i;
while (i < n && mask[i]) ++i;
if (i > base) MPI_Accumulate(
buffer + base, i - base, MpiType<T>::type,
0,
base, i - base, MpiType<T>::type,
MPI_SUM,
win
);
}
while (i < n);
MPI_Win_unlock(0, win);
}
MPI_Win_free(&win);
}
return rj;
}
);
这是针对参与计算的每个等级 r 执行的,reduced_bi.dim(r) 指定每个等级拥有多少元素。 mr 是当前等级,并且以这样一种方式创建通信器,即目标进程是它们每个的根。 buffer 是 T = double 的数组(通常),mask 是 std::vector<bool> 标识哪些元素非零。循环的组合将通信拆分为非零元素块。
这通常可以正常工作并且结果是正确的,与我之前使用MPI_Reduce 的实现相同。但是,将此节点的并发级别设置为serial 似乎很重要,这表明最多有一个并行 TBB 任务(因此最多有一个线程)执行此代码。
我想将其设置为 unlimited 以提高性能,而且确实在我的笔记本电脑上运行 MPICH 3.4.1 的小型作业可以正常工作。然而,在我真正想要运行计算的集群上,使用 OpenMPI 4.1.1,它运行了一段时间,然后因段错误和涉及一堆 UCX 函数的回溯而崩溃。
我现在想知道,是否不允许有多个线程并行调用这样的 RMA 操作(在我的笔记本电脑上它只是偶然工作),还是我遇到了集群上的错误/限制?从文档中我没有直接看到我想做的事情不受支持。
当然,MPI 是用MPI_THREAD_MULTIPLE 初始化的,我再次重复上面发布的 sn-p 工作正常,只有当我更改 serial --> unlimited 以启用并发执行时,我才会遇到问题在集群上。
在回复下面的 Victor Eijkhout 评论时,这里有一个完整的示例程序,可以重现该问题。这在我的笔记本电脑上运行良好(专门使用 mpirun -n 16 进行了测试),但是当我以 16 个等级(分布在 4 个集群节点上)运行它时,它在集群上崩溃。
#include <iostream>
#include <vector>
#include <thread>
#include <mpi.h>
int main(void)
{
int requested = MPI_THREAD_MULTIPLE, provided;
MPI_Init_thread(nullptr, nullptr, requested, &provided);
if (provided != requested)
{
std::cerr << "Failed to initialize MPI with full thread support!"
<< std::endl;
exit(1);
}
int mr, nr;
MPI_Comm_rank(MPI_COMM_WORLD, &mr);
MPI_Comm_size(MPI_COMM_WORLD, &nr);
const size_t dim = 1024;
const size_t repeat = 100;
std::vector<double> send(dim, static_cast<double>(mr) + 1.0);
std::vector<double> recv(dim, 0.0);
MPI_Win win;
MPI_Win_create(
recv.data(),
recv.size() * sizeof(double),
sizeof(double),
MPI_INFO_NULL,
MPI_COMM_WORLD,
&win
);
std::vector<std::thread> threads;
for (size_t i = 0; i < repeat; ++i)
{
threads.clear();
threads.reserve(nr);
for (int r = 0; r < nr; ++r) if (r != mr)
{
threads.emplace_back([r, &send, &win]
{
MPI_Win_lock(MPI_LOCK_SHARED, r, 0, win);
for (size_t i = 0; i < dim; ++i) MPI_Accumulate(
send.data() + i, 1, MPI_DOUBLE,
r,
i, 1, MPI_DOUBLE,
MPI_SUM,
win
);
MPI_Win_unlock(r, win);
});
}
for (auto &t : threads) t.join();
MPI_Barrier(MPI_COMM_WORLD);
if (mr == 0) std::cout << recv.front() << std::endl;
}
MPI_Win_free(&win);
MPI_Finalize();
}
注意:我在这里故意使用纯线程来避免不必要的依赖。它应该与-lpthread 链接。
我在集群上遇到的具体错误是这样的,使用 OpenMPI 4.1.1:
*** An error occurred in MPI_Accumulate
*** reported by process [1829189442,11]
*** on win ucx window 3
*** MPI_ERR_RMA_SYNC: error executing rma sync
*** MPI_ERRORS_ARE_FATAL (processes in this win will now abort,
*** and potentially your MPI job)
来自ompi_info的可能相关部分:
Open MPI: 4.1.1
Open MPI repo revision: v4.1.1
Open MPI release date: Apr 24, 2021
Thread support: posix (MPI_THREAD_MULTIPLE: yes, OPAL support: yes, OMPI progress: no, Event lib: yes)
已用 UCX/1.10.1 编译。
【问题讨论】:
-
不知道是不是每个线程都创建了窗口内存的问题。那不应该是每个进程吗?
-
哦,这是在任何地方明确说明的吗? MPI_Win_create 的 OpenMPI 文档说“每个进程都指定一个现有内存的窗口,它向 comm 组中的进程公开给 RMA 访问。”我认为这可以理解为“每个进程一个窗口”,也可以理解为“每个通信器一个窗口”。 MPI_Win_create 的 MPICH 文档说“这个例程是线程安全的。这意味着这个例程可以被多个线程安全地使用,而不需要任何用户提供的线程锁。”
-
Windows 在一个通信器上是集体的,因此通信器中的每个进程都需要进行一些创建调用。您似乎让每个 thread 进行创建调用。我会将 create 调用移到您的 lambda 之外。无论如何,您可能想多次使用 Windows,对吧?线程安全仅指使用窗口,而不是创建。
-
在这种情况下,流程图拓扑(未在 sn-p 中显示)实际上确保每个通信器只有一个窗口,并且所有进程以相同的顺序执行窗口创建。但是,是的,理想情况下,我想在前面创建所有窗口。
-
事实上,我有一个以这种方式实现的版本,但它也与 OpenMPI 崩溃,给了我 MPI_ERR_WIN。我的理论是,可能存在一个约束,要求创建和使用窗口仅限于同一个线程,但当时可能还有其他问题。如果我稍微改变我的算法并将 MPI_Accumulate 也用于 rank-local 更新,我现在意识到实际上有一种方法只使用一个通信器和窗口每个进程,所以我会尝试那个版本。
标签: c++ multithreading mpi openmpi