这有点晚了,但希望这个示例代码可以帮助其他处于类似位置的人!
正如 osgx 所提到的,OpenMP 对信号问题保持沉默,但由于 OpenMP 通常在 POSIX 系统上使用 pthread 实现,我们可以使用 pthread 信号方法。
对于使用 OpenMP 的繁重计算,很可能只有少数几个位置可以安全地停止计算。因此,对于想要获得过早结果的情况,我们可以使用同步信号处理来安全地执行此操作。另一个优点是这让我们可以接受来自特定 OpenMP 线程的信号(在下面的示例代码中,我们选择主线程)。捕捉到信号后,我们只需设置一个标志,指示计算应该停止。然后,每个线程应确保在方便时定期检查此标志,然后结束其工作负载份额。
通过使用这种同步方法,我们允许计算优雅地退出并且对算法的更改非常小。另一方面,所需的信号处理程序方法可能不合适,因为可能难以将每个线程的当前工作状态整理成连贯的结果。不过,同步方法的一个缺点是计算可能需要相当长的时间才能停止。
信号检测仪由三部分组成:
- 阻塞相关信号。这应该在
omp parallel 区域之外完成,以便每个 OpenMP 线程 (pthread) 都将继承相同的阻塞行为。
- 从主线程轮询所需信号。可以为此使用
sigtimedwait,但某些系统(例如 MacOS)不支持此功能。更便携,我们可以使用sigpending 来轮询任何阻塞的信号,然后在使用sigwait 同步接受它们之前仔细检查阻塞的信号是否是我们所期望的(应该立即返回,除非其他部分程序正在创建竞争条件)。我们终于设置了相关标志。
- 我们应该在最后移除我们的信号掩码(可以选择最后一次检查信号)。
有一些重要的性能注意事项和注意事项:
- 假设每个内部循环迭代都很小,执行信号检查系统调用的成本很高。在示例代码中,我们仅每 1000 万次(每线程)迭代检查一次信号,这可能对应于几秒钟的 wall time。
-
omp for 循环不能脱离1,因此您必须在剩余的迭代中旋转或使用更基本的 OpenMP 原语重写循环。常规循环(例如外部并行循环的内部循环)可以拆分出来就好了。
- 如果只有主线程可以检查信号,那么这可能会在主线程在其他线程之前完成的程序中产生问题。在这种情况下,这些其他线程将是不间断的。为了解决这个问题,您可以在每个线程完成其工作负载时“传递信号检查的指挥棒”,或者可以强制主线程继续运行和轮询,直到所有其他线程完成2。
- 在某些架构(例如 NUMA HPC)上,检查“全局”信号标志的时间可能非常昂贵,因此在决定何时何地检查或操作标志时要小心。例如,对于自旋循环部分,可能希望在标志变为 true 时将其本地缓存。
示例代码如下:
#include <signal.h>
void calculate() {
_Bool signalled = false;
int sigcaught;
size_t steps_tot = 0;
// block signals of interest (SIGINT and SIGTERM here)
sigset_t oldmask, newmask, sigpend;
sigemptyset(&newmask);
sigaddset(&newmask, SIGINT);
sigaddset(&newmask, SIGTERM);
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
#pragma omp parallel
{
int rank = omp_get_thread_num();
size_t steps = 0;
// keep improving result forever, unless signalled
while (!signalled) {
#pragma omp for
for (size_t i = 0; i < 10000; i++) {
// we can't break from an omp for loop...
// instead, spin away the rest of the iterations
if (signalled) continue;
for (size_t j = 0; j < 1000000; j++, steps++) {
// ***
// heavy computation...
// ***
// check for signal every 10 million steps
if (steps % 10000000 == 0) {
// master thread; poll for signal
if (rank == 0) {
sigpending(&sigpend);
if (sigismember(&sigpend, SIGINT) || sigismember(&sigpend, SIGTERM)) {
if (sigwait(&newmask, &sigcaught) == 0) {
printf("Interrupted by %d...\n", sigcaught);
signalled = true;
}
}
}
// all threads; stop computing
if (signalled) break;
}
}
}
}
#pragma omp atomic
steps_tot += steps;
}
printf("The result is ... after %zu steps\n", steps_tot);
// optional cleanup
sigprocmask(SIG_SETMASK, &oldmask, NULL);
}
如果使用 C++,您可能会发现以下类很有用...
#include <signal.h>
#include <vector>
class Unterminable {
sigset_t oldmask, newmask;
std::vector<int> signals;
public:
Unterminable(std::vector<int> signals) : signals(signals) {
sigemptyset(&newmask);
for (int signal : signals)
sigaddset(&newmask, signal);
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
}
Unterminable() : Unterminable({SIGINT, SIGTERM}) {}
// this can be made more efficient by using sigandset,
// but sigandset is not particularly portable
int poll() {
sigset_t sigpend;
sigpending(&sigpend);
for (int signal : signals) {
if (sigismember(&sigpend, signal)) {
int sigret;
if (sigwait(&newmask, &sigret) == 0)
return sigret;
break;
}
}
return -1;
}
~Unterminable() {
sigprocmask(SIG_SETMASK, &oldmask, NULL);
}
};
calculate() 的阻塞部分可以替换为Unterminable unterm();,信号检查部分可以替换为if ((sigcaught = unterm.poll()) > 0) {...}。当unterm 超出范围时,会自动执行信号解除阻塞。
1 这并非完全正确。 OpenMP 支持以cancellation points 的形式执行“并行中断”的有限支持。如果您选择在并行循环中使用取消点,请确保您确切知道隐式取消点的位置,以确保您的计算数据在取消时保持一致。
2 就我个人而言,我会统计有多少线程完成了 for 循环,如果主线程在没有捕获信号的情况下完成了循环,它会一直轮询信号,直到其中任一它捕获一个信号或所有线程完成循环。为此,请确保将 for 循环标记为 nowait。