【问题标题】:Signal handling in OpenMP parallel programOpenMP 并行程序中的信号处理
【发布时间】:2011-12-29 14:31:05
【问题描述】:

我有一个使用 POSIX 计时器的程序 (timer_create())。本质上,程序设置了一个计时器并开始执行一些冗长的(可能是无限的)计算。当计时器到期并调用信号处理程序时,处理程序会打印已计算出的最佳结果并退出程序。

我考虑使用 OpenMP 并行进行计算,因为它应该会加快速度。

在 pthreads 中,有一些特殊的功能,例如为我的线程设置信号掩码。 OpenMP 是否提供这种控制,还是我必须接受信号可以传递到 OpenMP 创建的任何线程的事实?

另外,如果我当前处于代码的并行部分并且调用了我的处理程序,它是否仍然可以安全地终止应用程序 (exit(0);) 并执行诸如锁定 OpenMP 锁之类的操作?

【问题讨论】:

  • 也许,这可以通过使用一个抓住出口的块来解决?
  • OpenMP 规范不包含“信号”一词。

标签: c pthreads signals openmp signal-handling


【解决方案1】:

OpenMP 3.1 标准对信号只字未提。

据我所知,Linux/UNIX 上每个流行的 OpenMP 实现都是基于 pthread 的,因此 OpenMP 线程是 pthread 的线程。并且适用 pthread 和信号的通用规则。

OpenMP 是否提供这样的控制

没有任何特定的控制;但是你可以尝试使用 pthread 的控制。唯一的问题是要知道使用了多少 OpenMP 线程以及在哪里放置控制语句。

信号可以传递给 OpenMP 创建的任何线程?

默认情况下,是的,它将被传递到任何线程。

我的处理程序被调用,

关于信号处理程序的常规规则仍然适用。信号处理程序中允许的函数列在http://pubs.opengroup.org/onlinepubs/009695399/functions/xsh_chap02_04.html(在页面末尾)

并且printf 是不允许的(write 是)。如果您知道在信号发出时 printf 没有被任何线程使用(例如,您在并行区域中没有 printf),则可以使用 printf。

它仍然可以安全地杀死应用程序吗 (exit(0);)

可以:处理程序允许abort()_exit()

当任何线程执行exitabort 时,Linux/Unix 将终止所有线程。

并执行诸如锁定 OpenMP 锁之类的操作?

你不应该这样做,但是如果你知道这个锁在信号处理程序运行时不会被锁定,你可以尝试这样做。

!!更新

有一个采用信令到 OpenMP http://www.cs.colostate.edu/~cs675/OpenMPvsThreads.pdf 的示例(“OpenMP 与 C/C++ 中的线程”)。简而言之:在处理程序中设置一个标志,并在每个第 N 次循环迭代的每个线程中添加对该标志的检查。

使基于信号的异常机制适应并行区域

在 C/C++ 中更常见的事情 Fortran 应用程序的应用程序是 该程序使用复杂的用户界面。 Genehunter 是一个简单的例子,用户 可能会中断一个家谱的计算 通过按 control-C 使其可以继续 临床数据库中的下一个家谱 疾病。提前终止在处理 类似 C++ 的异常的串行版本 涉及信号处理程序、setjump 的机制, 并且 longjump.OpenMP 不允许非结构化控制 流以跨越平行构造边界。我们 修改了 OpenMP 中的异常处理 通过将中断处理程序更改为版本 轮询机制。捕捉到的线程 control-C 信号设置一个共享标志。所有线程 检查循环开始时的标志 调用例程 has_hit_interrupt( ) 如果已设置,则跳过迭代。当循环 结束,主人检查标志,可以很容易地 执行长跳以完成 异常退出(参见图 1。)

【讨论】:

    【解决方案2】:

    这有点晚了,但希望这个示例代码可以帮助其他处于类似位置的人!


    正如 osgx 所提到的,OpenMP 对信号问题保持沉默,但由于 OpenMP 通常在 POSIX 系统上使用 pthread 实现,我们可以使用 pthread 信号方法。

    对于使用 OpenMP 的繁重计算,很可能只有少数几个位置可以安全地停止计算。因此,对于想要获得过早结果的情况,我们可以使用同步信号处理来安全地执行此操作。另一个优点是这让我们可以接受来自特定 OpenMP 线程的信号(在下面的示例代码中,我们选择主线程)。捕捉到信号后,我们只需设置一个标志,指示计算应该停止。然后,每个线程应确保在方便时定期检查此标志,然后结束其工作负载份额。

    通过使用这种同步方法,我们允许计算优雅地退出并且对算法的更改非常小。另一方面,所需的信号处理程序方法可能不合适,因为可能难以将每个线程的当前工作状态整理成连贯的结果。不过,同步方法的一个缺点是计算可能需要相当长的时间才能停止。

    信号检测仪由三部分组成:

    • 阻塞相关信号。这应该在omp parallel 区域之外完成,以便每个 OpenMP 线程 (pthread) 都将继承相同的阻塞行为。
    • 从主线程轮询所需信号。可以为此使用sigtimedwait,但某些系统(例如 MacOS)不支持此功能。更便携,我们可以使用sigpending 来轮询任何阻塞的信号,然后在使用sigwait 同步接受它们之前仔细检查阻塞的信号是否是我们所期望的(应该立即返回,除非其他部分程序正在创建竞争条件)。我们终于设置了相关标志。
    • 我们应该在最后移除我们的信号掩码(可以选择最后一次检查信号)。

    有一些重要的性能注意事项和注意事项:

    • 假设每个内部循环迭代都很小,执行信号检查系统调用的成本很高。在示例代码中,我们仅每 1000 万次(每线程)迭代检查一次信号,这可能对应于几秒钟的 wall time。
    • omp for 循环不能脱离1,因此您必须在剩余的迭代中旋转或使用更基本的 OpenMP 原语重写循环。常规循环(例如外部并行循环的内部循环)可以拆分出来就好了。
    • 如果只有主线程可以检查信号,那么这可能会在主线程在其他线程之前完成的程序中产生问题。在这种情况下,这些其他线程将是不间断的。为了解决这个问题,您可以在每个线程完成其工作负载时“传递信号检查的指挥棒”,或者可以强制主线程继续运行和轮询,直到所有其他线程完成2
    • 在某些架构(例如 NUMA HPC)上,检查“全局”信号标志的时间可能非常昂贵,因此在决定何时何地检查或操作标志时要小心。例如,对于自旋循环部分,可能希望在标志变为 true 时将其本地缓存。

    示例代码如下:

    #include <signal.h>
    
    void calculate() {
        _Bool signalled = false;
        int sigcaught;
        size_t steps_tot = 0;
    
        // block signals of interest (SIGINT and SIGTERM here)
        sigset_t oldmask, newmask, sigpend;
        sigemptyset(&newmask);
        sigaddset(&newmask, SIGINT);
        sigaddset(&newmask, SIGTERM);
        sigprocmask(SIG_BLOCK, &newmask, &oldmask);
    
        #pragma omp parallel
        {
            int rank = omp_get_thread_num();
            size_t steps = 0;
    
            // keep improving result forever, unless signalled
            while (!signalled) {
                #pragma omp for
                for (size_t i = 0; i < 10000; i++) {
                    // we can't break from an omp for loop...
                    // instead, spin away the rest of the iterations
                    if (signalled) continue;
    
                    for (size_t j = 0; j < 1000000; j++, steps++) {
                        // ***
                        // heavy computation...
                        // ***
    
                        // check for signal every 10 million steps
                        if (steps % 10000000 == 0) {
    
                            // master thread; poll for signal
                            if (rank == 0) {
                                sigpending(&sigpend);
                                if (sigismember(&sigpend, SIGINT) || sigismember(&sigpend, SIGTERM)) {
                                    if (sigwait(&newmask, &sigcaught) == 0) {
                                        printf("Interrupted by %d...\n", sigcaught);
                                        signalled = true;
                                    }
                                }
                            }
    
                            // all threads; stop computing
                            if (signalled) break;
                        }
                    }
                }
            }
    
            #pragma omp atomic
            steps_tot += steps;
        }
    
        printf("The result is ... after %zu steps\n", steps_tot);
    
        // optional cleanup
        sigprocmask(SIG_SETMASK, &oldmask, NULL);
    }
    

    如果使用 C++,您可能会发现以下类很有用...

    #include <signal.h>
    #include <vector>
    
    class Unterminable {
        sigset_t oldmask, newmask;
        std::vector<int> signals;
    
    public:
        Unterminable(std::vector<int> signals) : signals(signals) {
            sigemptyset(&newmask);
            for (int signal : signals)
                sigaddset(&newmask, signal);
            sigprocmask(SIG_BLOCK, &newmask, &oldmask);
        }
    
        Unterminable() : Unterminable({SIGINT, SIGTERM}) {}
    
        // this can be made more efficient by using sigandset,
        // but sigandset is not particularly portable
        int poll() {
            sigset_t sigpend;
            sigpending(&sigpend);
            for (int signal : signals) {
                if (sigismember(&sigpend, signal)) {
                    int sigret;
                    if (sigwait(&newmask, &sigret) == 0)
                        return sigret;
                    break;
                }
            }
            return -1;
        }
    
        ~Unterminable() {
            sigprocmask(SIG_SETMASK, &oldmask, NULL);
        }
    };
    

    calculate() 的阻塞部分可以替换为Unterminable unterm();,信号检查部分可以替换为if ((sigcaught = unterm.poll()) &gt; 0) {...}。当unterm 超出范围时,会自动执行信号解除阻塞。


    1 这并非完全正确。 OpenMP 支持以cancellation points 的形式执行“并行中断”的有限支持。如果您选择在并行循环中使用取消点,请确保您确切知道隐式取消点的位置,以确保您的计算数据在取消时保持一致。

    2 就我个人而言,我会统计有多少线程完成了 for 循环,如果主线程在没有捕获信号的情况下完成了循环,它会一直轮询信号,直到其中任一它捕获一个信号或所有线程完成循环。为此,请确保将 for 循环标记为 nowait

    【讨论】:

    • 值得注意的是,OpenMP 可以在并行区域之间保持线程处于活动状态,因此必须在第一个信号之前阻塞信号。这让我想知道,是否可以保证 OpenMP 在遇到第一个并行区域之前以及在信号被阻塞之前不会预先生成线程?这将阻止继承正确的 sigmask。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多