【问题标题】:How to properly suspend threads?如何正确挂起线程?
【发布时间】:2014-07-20 20:24:47
【问题描述】:

在现有多线程应用程序的上下文中,我想对其进行修改以能够挂起线程。该应用程序由 3 个工作线程组成,它们使用 pthread_barrier 在“锁定步骤”中工作,如下所示:

Thread 1   Thread 2   Thread 3

while(1)   while(1)    while(1)
   |          |          |
   |          |          |
   |          |          |
   |          |          |
 barrier     barrier   barrier

这段代码一切正常。我现在添加一个用于控制其他 3 个线程的第 4 个线程,我需要从中暂停/恢复 3 个工作线程。现在我尝试使用一个全局stop 标志和一个由控制线程编写并在屏障之后由工作线程读取的条件变量。

Thread 1     Thread 2       Thread 3        Thread 4

while(1)     while(1)        while(1)        wait for user input to suspend 
   |            |              |             mutex_lock
   |            |              |             stop = 1
   |            |              |             mutex_unlock
   |            |              |             wait for user input to resume 
   |            |              |             mutex_lock
   |            |              |             stop = 0
   |            |              |             cond_broadcast()
   |            |              |             mutex_unlock
 barrier       barrier       barrier
 mutex_lock    mutex_lock    mutex_lock
 if(stop)      if(stop)      if(stop)
  cond_wait()   cond_wait()   cond_wait()
 mutex_unlock  mutex_unlock  mutex_unlock

这个解决方案的问题是它有时会死锁,具体取决于线程的调度和线程 1、2 和 3 的工作长度。因此我想知道如何成功同步这 4 个线程以能够挂起/从控制一恢复工作线程?

【问题讨论】:

  • 你能增加包含线程 4 的障碍吗?
  • 理论上是的,但我不希望这样做,因为它需要我的控制线程像工作线程一样频繁地唤醒,与要求暂停和恢复的用户输入相比,锁定步骤可能非常短.
  • 一个强大的解决方案是用自定义屏障实现替换屏障和停止条件,这允许第四个线程暂停三个。它只需要一个互斥锁、一个条件变量和几个变量。如果用“自制”替换屏障机制在理论上是可以接受的,我想我可以为你准备一个示例实现作为答案——当然,除非其他人提出更好的选择。

标签: c pthreads


【解决方案1】:

我相信gmch's answer 应该可以解决原来的问题。但是,并非所有 pthread 实现都包含 pthread_barrier_t 和相关函数(因为它们是 POSIX 线程规范的可选部分),所以这是我在对原始问题的评论中提到的自定义屏障实现。

(请注意,还有其他方法可以在正常操作期间异步挂起/恢复线程,并且无需线程本身的合作。一种实现方法是使用一个或两个实时信号,以及一个信号处理程序阻塞sigsuspend(),等待补充的“继续”信号。控制线程必须使用pthread_kill()pthread_sigqueue() 向涉及的每个线程发送暂停和继续信号。线程受到的影响最小;除了可能@ 987654333@ 来自阻塞系统调用的错误(因为信号传递会中断阻塞系统调用),线程只是没有任何进展——就好像它们没有被安排一段时间一样。因此,不应该有任何问题关于线程在稍微不同的时间暂停和继续。如果您对此方法感兴趣,请发表评论,我也可以尝试展示该方法的示例实现。)

也许这对需要可暂停自定义屏障实现的其他人有用,或者作为他们自己的自定义屏障的基础。

编辑添加DRAINING 模式,当线程预期退出时。在您的工作循环中,使用do { ... } while (!barrier_wait(&barrier));

barrier.h

#ifndef   BARRIER_H
#define   BARRIER_H
#include <pthread.h>
#include <errno.h>

typedef enum {
    INVALID = -1,
    RUNNING = 0,
    PAUSED = 1,
    DRAINING = 2
} barrier_state;

typedef struct {
    pthread_mutex_t     mutex;
    pthread_cond_t      cond;
    barrier_state       state;
    int                 threads;    /* Number of participants */
    int                 waiting;    /* Number of participants waiting */
} barrier;

/** barrier_drain() - Mark barrier so that threads will know to exit
 * @b: pointer to barrier
 * @ids: pthread_t's for the threads to wait on, or NULL
 * @retvals: return values from the threads, or NULL
 * This function marks the barrier such that all threads arriving
 * at it will return ETIMEDOUT.
 * If @ids is specified, the threads will be joined.
 * Returns 0 if successful, errno error code otherwise.
*/
static int barrier_drain(barrier *const b, pthread_t *const ids, void **const retvals)
{
    int   result, threads;
    void *retval;

    if (!b || b->threads < 0)
        return errno = EINVAL;

    result = pthread_mutex_lock(&b->mutex);
    if (result)
        return errno = result;

    b->state = DRAINING;
    pthread_cond_broadcast(&b->cond);

    threads = b->threads;
    b->threads = 0;

    pthread_mutex_unlock(&b->mutex);

    while (threads-->0) {
        result = pthread_join(ids[threads], &retval);
        if (result)
            return errno = result;
        if (retvals)
            retvals[threads] = retval;
    }

    return errno = 0;
}            

/** barrier_pause() - Mark barrier to pause threads in the barrier
 * @b: pointer to barrier
 * This function marks the barrier such that all threads arriving
 * in it will wait in the barrier, until barrier_continue() is
 * called on it. If barrier_continue() is called before all threads
 * have arrived on the barrier, the barrier will operate normally;
 * i.e. the threads will continue only when all threads have arrived
 * at the barrier.
 * Returns 0 if successful, errno error code otherwise.
*/
static int barrier_pause(barrier *const b)
{
    int result;

    if (!b || b->threads < 1)
        return errno = EINVAL;

    result = pthread_mutex_lock(&b->mutex);
    if (result)
        return errno = result;

    if (b->state != PAUSED && b->state != RUNNING) {
        pthread_mutex_unlock(&b->mutex);
        return errno = EPERM;
    }

    b->state = PAUSED;
    pthread_mutex_unlock(&b->mutex);
    return errno = 0;
}

/** barrier_continue() - Unpause barrier
 * @b: Pointer to barrier
 * This function lets the barrier operate normally.
 * If all threads are already waiting in the barrier,
 * it lets them proceed immediately. Otherwise, the
 * threads will continue when all threads have arrived
 * at the barrier.
 * Returns 0 if success, errno error code otherwise.
*/
static int barrier_continue(barrier *const b)
{
    int result;

    if (!b || b->threads < 0)
        return errno = EINVAL;

    result = pthread_mutex_lock(&b->mutex);
    if (result)
        return errno = result;

    if (b->state != PAUSED) {
        pthread_mutex_unlock(&b->mutex);
        return errno = EPERM;
    }

    b->state = RUNNING;

    if (b->waiting >= b->threads)
        pthread_cond_broadcast(&b->cond);

    pthread_mutex_unlock(&b->mutex);

    return errno = 0;
}

/** barrier_wait() - Wait on the barrier
 * @b: Pointer to barrier
 * Each thread participating in the barrier
 * must call this function.
 * Callers will block (wait) in this function,
 * until all threads have arrived.
 * If the barrier is paused, the threads will
 * wait until barrier_continue() is called on
 * the barrier, otherwise they will continue
 * when the final thread arrives to the barrier.
 * Returns 0 if success, errno error code otherwise.
 * Returns ETIMEDOUT if the thread should exit.
*/
static int barrier_wait(barrier *const b)
{
    int result;

    if (!b || b->threads < 0)
        return errno = EINVAL;

    result = pthread_mutex_lock(&b->mutex);
    if (result)
        return errno =result;

    if (b->state == INVALID) {
        pthread_mutex_unlock(&b->mutex);
        return errno = EPERM;
    } else
    if (b->state == DRAINING) {
        pthread_mutex_unlock(&b->mutex);
        return errno = ETIMEDOUT;
    }

    b->waiting++;

    if (b->state == RUNNING && b->waiting >= b->threads)
        pthread_cond_broadcast(&b->cond);
    else
        pthread_cond_wait(&b->cond, &b->mutex);

    b->waiting--;
    pthread_mutex_unlock(&b->mutex);

    return errno = 0;
}

/** barrier_destroy() - Destroy a previously initialized barrier
 * @b: Pointer to barrier
 * Returns zero if success, errno error code otherwise.
*/
static int barrier_destroy(barrier *const b)
{
    int result;

    if (!b || b->threads < 0)
        return errno = EINVAL;

    b->state = INVALID;
    b->threads = -1;
    b->waiting = -1;

    result = pthread_cond_destroy(&b->cond);
    if (result)
        return errno = result;

    result = pthread_mutex_destroy(&b->mutex);
    if (result)
        return errno = result;

    return errno = 0;
}

/** barrier_init() - Initialize a barrier
 * @b: Pointer to barrier
 * @threads: Number of threads to participate in barrier
 * Returns 0 if success, errno error code otherwise.
*/
static int barrier_init(barrier *const b, const int threads)
{
    int result;

    if (!b || threads < 1)
        return errno = EINVAL;

    result = pthread_mutex_init(&b->mutex, NULL);
    if (result)
        return errno = result;

    result = pthread_cond_init(&b->cond, NULL);
    if (result)
        return errno = result;

    b->state = RUNNING;
    b->threads = threads;
    b->waiting = 0;

    return errno = 0;
}

#endif /* BARRIER_H */

逻辑很简单。在屏障中等待的所有线程都等待cond 条件变量。如果屏障正常运行(state==RUNNING),最终到达屏障的线程将在条件变量上广播而不是等待,从而唤醒所有其他线程。

如果屏障被暂停(state==PAUSED),即使最后到达屏障的线程也会等待条件变量。

barrier_pause() 被调用时,屏障状态变为暂停。可能有零个或多个线程在条件变量上等待,这没关系:只有到达屏障的最终线程具有特殊作用,并且该线程还不能到达。 (如果有,它早就清空了屏障。)

当调用barrier_continue() 时,屏障状态变为正常(state==RUNNING)。如果所有线程都在等待条件变量,则通过在条件变量上广播来释放它们。否则,最终到达屏障的线程将在条件变量上广播,并正常释放等待的线程。

请注意barrier_pause()barrier_continue() 不等待屏障变满或耗尽。它只在互斥体上阻塞,并且函数一次只能保持很短的时间。 (换句话说,它们可能会阻塞一小段时间,但不会等待屏障到达任何特定情况。)

如果屏障正在耗尽 (state==DRAINING),到达屏障的线程会立即返回 errno==ETIMEDOUT。为简单起见,所有屏障函数现在都无条件设置 errno(如果成功则为 0,如果错误则为 errno 代码,如果正在耗尽则 ETIMEDOUT)。

mutex 保护屏障字段,以便一次只有一个线程可以访问这些字段。特别是,由于互斥锁,只有一个线程可以同时到达屏障。

存在一种复杂的情况:使用屏障的循环体可能很短,或者可能有太多线程,以至于线程甚至在上一次迭代的所有线程都离开之前就开始到达屏障的下一次迭代它。

根据POSIX.1-2004, pthread_cond_broadcast()“应解除当前阻塞在指定条件变量上的所有线程”。尽管它们的唤醒是连续的——因为每个人都会依次获取互斥锁——但只有在调用pthread_cond_broadcast() 时被阻塞的线程才会被唤醒。

因此,如果实现在条件变量方面遵循 POSIX 语义,则唤醒线程可以(甚至立即!)重新等待条件变量,等待下一个广播或信号:“旧”和“新”等待者是单独的集合。这个用例实际上非常典型,我听说过的所有 POSIX 实现都允许这样做——它们不会唤醒在最后一个pthread_cond_broadcast() 之后开始等待条件变量的线程。

如果我们可以依赖 POSIX 条件变量唤醒语义,这意味着上述屏障实现应该可靠地工作,包括在线程到达屏障的情况下(用于下一次迭代),甚至在所有线程之前(来自上一次迭代) ) 已经离开了障碍。

(请注意,已知的“spurious wakeups” 问题仅影响pthread_cond_signal();即当调用pthread_cond_signal() 时,可能会唤醒多个线程。这里,我们使用@ 唤醒所有线程987654354@。我们只依靠它唤醒当前的服务员,而不是任何未来的服务员。)


这是一个 POSIX.1-2001 实现,用于异步挂起和恢复线程,无需目标线程的任何合作。

这使用两个信号,一个用于挂起线程,另一个用于恢复它。为了获得最大的兼容性,我没有使用 GNU C 扩展或 POSIX.1b 实时信号。两个信号都保存和恢复errno,这样对挂起线程的影响就会最小。

但是,请注意,man 7 signal“信号处理程序中断系统调用和库函数”中列出的函数部分,在 “之后,以下接口永远不会重新启动被信号处理程序中断” 段落,将在暂停/恢复时返回 errno==EINTR。这意味着您将不得不使用传统的do { result = FUNCTION(...); } while (result == -1 &amp;&amp; errno == EINTR); 循环,而不仅仅是result = FUNCTION(...);

suspend_threads()resume_threads() 调用不是同步的。线程将在函数调用返回之前或之后的某个时间暂停/恢复。此外,从进程本身发送的挂起和恢复信号可能影响线程;这取决于内核是否使用目标线程之一来传递此类信号。 (这种方法不能忽略其他进程发送的信号。)

测试表明,在实践中,这种挂起/恢复功能非常可靠,假设没有外部干扰(通过从另一个进程发送目标线程捕获的信号)。但是,它不是很健壮,对其操作的保证也很少,但对于某些实现来说可能就足够了。

suspend-resume.h

#ifndef   SUSPEND_RESUME_H
#define   SUSPEND_RESUME_H

#if !defined(_POSIX_C_SOURCE) && !defined(POSIX_SOURCE)
#error This requires POSIX support (define _POSIX_C_SOURCE).
#endif

#include <signal.h>
#include <errno.h>
#include <pthread.h>

#define   SUSPEND_SIGNAL  SIGUSR1
#define   RESUME_SIGNAL   SIGUSR2

/* Resume signal handler.
*/
static void resume_handler(int signum, siginfo_t *info, void *context)
{
    /* The delivery of the resume signal is the key point.
     * The actual signal handler does nothing. */
    return;
}

/* Suspend signal handler.
*/
static void suspend_handler(int signum, siginfo_t *info, void *context)
{
    sigset_t  resumeset;
    int       saved_errno;

    if (!info || info->si_signo != SUSPEND_SIGNAL)
        return;

    /* Save errno to keep it unchanged in the interrupted thread. */
    saved_errno = errno;

    /* Block until suspend or resume signal received. */
    sigfillset(&resumeset);
    sigdelset(&resumeset, SUSPEND_SIGNAL);
    sigdelset(&resumeset, RESUME_SIGNAL);
    sigsuspend(&resumeset);

    /* Restore errno. */
    errno = saved_errno; 
}

/* Install signal handlers.
*/
static int init_suspend_resume(void)
{
    struct sigaction act;

    sigemptyset(&act.sa_mask);
    sigaddset(&act.sa_mask, SUSPEND_SIGNAL);
    sigaddset(&act.sa_mask, RESUME_SIGNAL);
    act.sa_flags = SA_RESTART | SA_SIGINFO;

    act.sa_sigaction = resume_handler;
    if (sigaction(RESUME_SIGNAL, &act, NULL))
        return errno;

    act.sa_sigaction = suspend_handler;
    if (sigaction(SUSPEND_SIGNAL, &act, NULL))
        return errno;

    return 0;
}

/* Suspend one or more threads.
*/
static int suspend_threads(const pthread_t *const identifier, const int count)
{
    int i, result, retval = 0;

    if (!identifier || count < 1)
        return errno = EINVAL;

    for (i = 0; i < count; i++) {
        result = pthread_kill(identifier[i], SUSPEND_SIGNAL);
        if (result && !retval)
            retval = result;
    }

    return errno = retval;
}

/* Resume one or more threads.
*/
static int resume_threads(const pthread_t *const identifier, const int count)
{
    int i, result, retval = 0;

    if (!identifier || count < 1)
        return errno = EINVAL;

    for (i = 0; i < count; i++) {
        result = pthread_kill(identifier[i], RESUME_SIGNAL);
        if (result && !retval)
            retval = result;
    }

    return errno = retval;
}

#endif /* SUSPEND_RESUME_H */

问题?

【讨论】:

  • 感谢您的完整回答!我想知道这样的障碍在性能方面与 glibc 实现相比如何?关于信号解决方案,您将如何解决 gmch 的 skuanr 回答中提到的问题?
  • 此外,我希望能够从第四个线程结束三个线程;我该怎么做?
  • @ManuelSelva:C 库实现往往非常相似,尽管它们倾向于直接使用内核原语(Linux 中的futexes)。我没有对这些进行任何基准测试,但如果有任何有意义的性能差异,我会感到非常惊讶。至于让工作线程退出,有线程取消之类的选项(通过pthread_cancel() 等)。我会编辑障碍,看看变化。 (如果barrier_wait() 返回非零值,则让工作人员退出。)
  • @ManuelSelva:我添加了一个基于信号的示例方法。简单地说,只要线程在信号处理程序内部运行,不修改线程变量(例如errno),线程就会看到暂停,就好像它只是暂时没有调度一样。某些函数确实返回 EINTR 错误,因此存在一些正常/预期的副作用。我有一个非常健壮的想法,在挂起时为每个线程使用一个套接字对,但它需要 GNU C 扩展。
  • 再次感谢。对于排水功能,我实施了与您建议的解决方案相同的解决方案,因为我无法让 pthread_cancel 工作。您能否也提供具有此功能的解决方案?注意:无论如何,谢谢,回答已接受
【解决方案2】:

为了保持线程同步,您应该将stop 的测试放在屏障之前。如果在一个或多个工作线程到达屏障时设置了标志,则它们将被保留,直到其他(S)从条件中释放。


在下面的cmets交换之后添加...

通过检查障碍后的停止标志,比赛开始了。在屏障之后,工作线程立即依次检查标志。如果标志在一个或多个线程检查后设置,但在下一个线程之前,一些线程将错过停止,并继续循环到屏障 - 所以工作线程现在 out of同步

通过检查屏障前的停止标志,仍然存在竞争,但不会导致工作线程不同步。如果在一个或多个线程检查后立即设置标志,则错过它的线程继续,并在屏障处停止。任何看到停止标志的线程都会在该条件下停止,当条件发出信号时,它们将继续前进到屏障,所有工作线程继续同步。

换一种说法……通过屏障之后的检查,所有工作线程在它们被屏障同步后需要看到停止标志的相同状态,如果它们要保持同步 - - 这是不可能的。通过屏障前的检查,只有一个工作线程需要看到停止标志才能同步停止它们——这很简单。

从代码展示的草图来看,尚不清楚为什么会出现死锁。移动检查循环不会改变这一点,但报告的死锁可能是由于工作线程不同步造成的。


单独和 FWIW,一般人会这样写:

while (...reason to wait...)
  pthread_cond_wait(...) ;

而不是:

if (...reason to wait...)
  pthread_cond_wait(...) ;

这主要是因为pthread_cond_signal() 可以(标准允许它)唤醒多个线程,并且在这种情况下pthread_cond_broadcast 正在被使用...但是if 一直在敲响警钟。

【讨论】:

  • 如果其中一个线程已经到达屏障,而第四个线程启用stop怎么办?我认为顺序(先停止或障碍)并不重要;当第四个线程启用stop时,总会有一个或多个线程已经到达障碍的窗口。
  • 不管有多少已经到达屏障,那些线程都会停止。如果只有一个击中停止标志,它将按条件停止,其他人继续在障碍物处等待。当停止标志被清除并且条件被触发时,该线程前进到屏障,并且所有三个都继续。
  • 在这种情况下,stopbarrier 之间的顺序无关紧要,不是吗?那么如何更改订单才能解决 OP 的问题呢? (为了清楚起见,我个人假设整个图片比显示的要复杂得多;显示的图表不应该死锁。)
  • 考虑如果在所有线程都从屏障释放之后设置了停止标志,但其中一个通过了停止检查会发生什么。现在有两个线程在等待条件,但一个线程跑到障碍物上——三个线程不再同步。
  • 没错。如果外部线程启用stop 当一个但不是所有线程都到达障碍时,您建议的序列也会发生同样的情况。这就是为什么我不认为改变障碍和停止之间的顺序是一个解决方案。
【解决方案3】:

您可以使用信号处理程序来暂停和恢复线程,具体取决于传递给线程的信号。编写两个自定义信号处理程序:一个用于挂起(SIGUSR1)和恢复(SIGUSR2)。因此,当您想暂停线程时,只需向该线程发送 SIGUSR1 信号。类似地,为了恢复挂起的线程,使用 pthread_kill 将 SIGUSR2 发送到该线程。

【讨论】:

  • 您如何确保发送和接收“挂起”信号的方式是所有工作线程都在同一周期挂起(并因此保持同步)?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-03-09
  • 2014-08-29
  • 2020-03-13
  • 1970-01-01
  • 2021-03-11
  • 1970-01-01
  • 2016-06-16
相关资源
最近更新 更多