我相信gmch's answer 应该可以解决原来的问题。但是,并非所有 pthread 实现都包含 pthread_barrier_t 和相关函数(因为它们是 POSIX 线程规范的可选部分),所以这是我在对原始问题的评论中提到的自定义屏障实现。
(请注意,还有其他方法可以在正常操作期间异步挂起/恢复线程,并且无需线程本身的合作。一种实现方法是使用一个或两个实时信号,以及一个信号处理程序阻塞sigsuspend(),等待补充的“继续”信号。控制线程必须使用pthread_kill() 或pthread_sigqueue() 向涉及的每个线程发送暂停和继续信号。线程受到的影响最小;除了可能@ 987654333@ 来自阻塞系统调用的错误(因为信号传递会中断阻塞系统调用),线程只是没有任何进展——就好像它们没有被安排一段时间一样。因此,不应该有任何问题关于线程在稍微不同的时间暂停和继续。如果您对此方法感兴趣,请发表评论,我也可以尝试展示该方法的示例实现。)
也许这对需要可暂停自定义屏障实现的其他人有用,或者作为他们自己的自定义屏障的基础。
编辑添加DRAINING 模式,当线程预期退出时。在您的工作循环中,使用do { ... } while (!barrier_wait(&barrier));
barrier.h:
#ifndef BARRIER_H
#define BARRIER_H
#include <pthread.h>
#include <errno.h>
typedef enum {
INVALID = -1,
RUNNING = 0,
PAUSED = 1,
DRAINING = 2
} barrier_state;
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
barrier_state state;
int threads; /* Number of participants */
int waiting; /* Number of participants waiting */
} barrier;
/** barrier_drain() - Mark barrier so that threads will know to exit
* @b: pointer to barrier
* @ids: pthread_t's for the threads to wait on, or NULL
* @retvals: return values from the threads, or NULL
* This function marks the barrier such that all threads arriving
* at it will return ETIMEDOUT.
* If @ids is specified, the threads will be joined.
* Returns 0 if successful, errno error code otherwise.
*/
static int barrier_drain(barrier *const b, pthread_t *const ids, void **const retvals)
{
int result, threads;
void *retval;
if (!b || b->threads < 0)
return errno = EINVAL;
result = pthread_mutex_lock(&b->mutex);
if (result)
return errno = result;
b->state = DRAINING;
pthread_cond_broadcast(&b->cond);
threads = b->threads;
b->threads = 0;
pthread_mutex_unlock(&b->mutex);
while (threads-->0) {
result = pthread_join(ids[threads], &retval);
if (result)
return errno = result;
if (retvals)
retvals[threads] = retval;
}
return errno = 0;
}
/** barrier_pause() - Mark barrier to pause threads in the barrier
* @b: pointer to barrier
* This function marks the barrier such that all threads arriving
* in it will wait in the barrier, until barrier_continue() is
* called on it. If barrier_continue() is called before all threads
* have arrived on the barrier, the barrier will operate normally;
* i.e. the threads will continue only when all threads have arrived
* at the barrier.
* Returns 0 if successful, errno error code otherwise.
*/
static int barrier_pause(barrier *const b)
{
int result;
if (!b || b->threads < 1)
return errno = EINVAL;
result = pthread_mutex_lock(&b->mutex);
if (result)
return errno = result;
if (b->state != PAUSED && b->state != RUNNING) {
pthread_mutex_unlock(&b->mutex);
return errno = EPERM;
}
b->state = PAUSED;
pthread_mutex_unlock(&b->mutex);
return errno = 0;
}
/** barrier_continue() - Unpause barrier
* @b: Pointer to barrier
* This function lets the barrier operate normally.
* If all threads are already waiting in the barrier,
* it lets them proceed immediately. Otherwise, the
* threads will continue when all threads have arrived
* at the barrier.
* Returns 0 if success, errno error code otherwise.
*/
static int barrier_continue(barrier *const b)
{
int result;
if (!b || b->threads < 0)
return errno = EINVAL;
result = pthread_mutex_lock(&b->mutex);
if (result)
return errno = result;
if (b->state != PAUSED) {
pthread_mutex_unlock(&b->mutex);
return errno = EPERM;
}
b->state = RUNNING;
if (b->waiting >= b->threads)
pthread_cond_broadcast(&b->cond);
pthread_mutex_unlock(&b->mutex);
return errno = 0;
}
/** barrier_wait() - Wait on the barrier
* @b: Pointer to barrier
* Each thread participating in the barrier
* must call this function.
* Callers will block (wait) in this function,
* until all threads have arrived.
* If the barrier is paused, the threads will
* wait until barrier_continue() is called on
* the barrier, otherwise they will continue
* when the final thread arrives to the barrier.
* Returns 0 if success, errno error code otherwise.
* Returns ETIMEDOUT if the thread should exit.
*/
static int barrier_wait(barrier *const b)
{
int result;
if (!b || b->threads < 0)
return errno = EINVAL;
result = pthread_mutex_lock(&b->mutex);
if (result)
return errno =result;
if (b->state == INVALID) {
pthread_mutex_unlock(&b->mutex);
return errno = EPERM;
} else
if (b->state == DRAINING) {
pthread_mutex_unlock(&b->mutex);
return errno = ETIMEDOUT;
}
b->waiting++;
if (b->state == RUNNING && b->waiting >= b->threads)
pthread_cond_broadcast(&b->cond);
else
pthread_cond_wait(&b->cond, &b->mutex);
b->waiting--;
pthread_mutex_unlock(&b->mutex);
return errno = 0;
}
/** barrier_destroy() - Destroy a previously initialized barrier
* @b: Pointer to barrier
* Returns zero if success, errno error code otherwise.
*/
static int barrier_destroy(barrier *const b)
{
int result;
if (!b || b->threads < 0)
return errno = EINVAL;
b->state = INVALID;
b->threads = -1;
b->waiting = -1;
result = pthread_cond_destroy(&b->cond);
if (result)
return errno = result;
result = pthread_mutex_destroy(&b->mutex);
if (result)
return errno = result;
return errno = 0;
}
/** barrier_init() - Initialize a barrier
* @b: Pointer to barrier
* @threads: Number of threads to participate in barrier
* Returns 0 if success, errno error code otherwise.
*/
static int barrier_init(barrier *const b, const int threads)
{
int result;
if (!b || threads < 1)
return errno = EINVAL;
result = pthread_mutex_init(&b->mutex, NULL);
if (result)
return errno = result;
result = pthread_cond_init(&b->cond, NULL);
if (result)
return errno = result;
b->state = RUNNING;
b->threads = threads;
b->waiting = 0;
return errno = 0;
}
#endif /* BARRIER_H */
逻辑很简单。在屏障中等待的所有线程都等待cond 条件变量。如果屏障正常运行(state==RUNNING),最终到达屏障的线程将在条件变量上广播而不是等待,从而唤醒所有其他线程。
如果屏障被暂停(state==PAUSED),即使最后到达屏障的线程也会等待条件变量。
当barrier_pause() 被调用时,屏障状态变为暂停。可能有零个或多个线程在条件变量上等待,这没关系:只有到达屏障的最终线程具有特殊作用,并且该线程还不能到达。 (如果有,它早就清空了屏障。)
当调用barrier_continue() 时,屏障状态变为正常(state==RUNNING)。如果所有线程都在等待条件变量,则通过在条件变量上广播来释放它们。否则,最终到达屏障的线程将在条件变量上广播,并正常释放等待的线程。
请注意barrier_pause() 和barrier_continue() 不等待屏障变满或耗尽。它只在互斥体上阻塞,并且函数一次只能保持很短的时间。 (换句话说,它们可能会阻塞一小段时间,但不会等待屏障到达任何特定情况。)
如果屏障正在耗尽 (state==DRAINING),到达屏障的线程会立即返回 errno==ETIMEDOUT。为简单起见,所有屏障函数现在都无条件设置 errno(如果成功则为 0,如果错误则为 errno 代码,如果正在耗尽则 ETIMEDOUT)。
mutex 保护屏障字段,以便一次只有一个线程可以访问这些字段。特别是,由于互斥锁,只有一个线程可以同时到达屏障。
存在一种复杂的情况:使用屏障的循环体可能很短,或者可能有太多线程,以至于线程甚至在上一次迭代的所有线程都离开之前就开始到达屏障的下一次迭代它。
根据POSIX.1-2004, pthread_cond_broadcast()“应解除当前阻塞在指定条件变量上的所有线程”。尽管它们的唤醒是连续的——因为每个人都会依次获取互斥锁——但只有在调用pthread_cond_broadcast() 时被阻塞的线程才会被唤醒。
因此,如果实现在条件变量方面遵循 POSIX 语义,则唤醒线程可以(甚至立即!)重新等待条件变量,等待下一个广播或信号:“旧”和“新”等待者是单独的集合。这个用例实际上非常典型,我听说过的所有 POSIX 实现都允许这样做——它们不会唤醒在最后一个pthread_cond_broadcast() 之后开始等待条件变量的线程。
如果我们可以依赖 POSIX 条件变量唤醒语义,这意味着上述屏障实现应该可靠地工作,包括在线程到达屏障的情况下(用于下一次迭代),甚至在所有线程之前(来自上一次迭代) ) 已经离开了障碍。
(请注意,已知的“spurious wakeups” 问题仅影响pthread_cond_signal();即当调用pthread_cond_signal() 时,可能会唤醒多个线程。这里,我们使用@ 唤醒所有线程987654354@。我们只依靠它唤醒当前的服务员,而不是任何未来的服务员。)
这是一个 POSIX.1-2001 实现,用于异步挂起和恢复线程,无需目标线程的任何合作。
这使用两个信号,一个用于挂起线程,另一个用于恢复它。为了获得最大的兼容性,我没有使用 GNU C 扩展或 POSIX.1b 实时信号。两个信号都保存和恢复errno,这样对挂起线程的影响就会最小。
但是,请注意,man 7 signal,“信号处理程序中断系统调用和库函数”中列出的函数部分,在 “之后,以下接口永远不会重新启动被信号处理程序中断” 段落,将在暂停/恢复时返回 errno==EINTR。这意味着您将不得不使用传统的do { result = FUNCTION(...); } while (result == -1 && errno == EINTR); 循环,而不仅仅是result = FUNCTION(...);。
suspend_threads() 和 resume_threads() 调用不是同步的。线程将在函数调用返回之前或之后的某个时间暂停/恢复。此外,从进程本身发送的挂起和恢复信号可能影响线程;这取决于内核是否使用目标线程之一来传递此类信号。 (这种方法不能忽略其他进程发送的信号。)
测试表明,在实践中,这种挂起/恢复功能非常可靠,假设没有外部干扰(通过从另一个进程发送目标线程捕获的信号)。但是,它不是很健壮,对其操作的保证也很少,但对于某些实现来说可能就足够了。
suspend-resume.h:
#ifndef SUSPEND_RESUME_H
#define SUSPEND_RESUME_H
#if !defined(_POSIX_C_SOURCE) && !defined(POSIX_SOURCE)
#error This requires POSIX support (define _POSIX_C_SOURCE).
#endif
#include <signal.h>
#include <errno.h>
#include <pthread.h>
#define SUSPEND_SIGNAL SIGUSR1
#define RESUME_SIGNAL SIGUSR2
/* Resume signal handler.
*/
static void resume_handler(int signum, siginfo_t *info, void *context)
{
/* The delivery of the resume signal is the key point.
* The actual signal handler does nothing. */
return;
}
/* Suspend signal handler.
*/
static void suspend_handler(int signum, siginfo_t *info, void *context)
{
sigset_t resumeset;
int saved_errno;
if (!info || info->si_signo != SUSPEND_SIGNAL)
return;
/* Save errno to keep it unchanged in the interrupted thread. */
saved_errno = errno;
/* Block until suspend or resume signal received. */
sigfillset(&resumeset);
sigdelset(&resumeset, SUSPEND_SIGNAL);
sigdelset(&resumeset, RESUME_SIGNAL);
sigsuspend(&resumeset);
/* Restore errno. */
errno = saved_errno;
}
/* Install signal handlers.
*/
static int init_suspend_resume(void)
{
struct sigaction act;
sigemptyset(&act.sa_mask);
sigaddset(&act.sa_mask, SUSPEND_SIGNAL);
sigaddset(&act.sa_mask, RESUME_SIGNAL);
act.sa_flags = SA_RESTART | SA_SIGINFO;
act.sa_sigaction = resume_handler;
if (sigaction(RESUME_SIGNAL, &act, NULL))
return errno;
act.sa_sigaction = suspend_handler;
if (sigaction(SUSPEND_SIGNAL, &act, NULL))
return errno;
return 0;
}
/* Suspend one or more threads.
*/
static int suspend_threads(const pthread_t *const identifier, const int count)
{
int i, result, retval = 0;
if (!identifier || count < 1)
return errno = EINVAL;
for (i = 0; i < count; i++) {
result = pthread_kill(identifier[i], SUSPEND_SIGNAL);
if (result && !retval)
retval = result;
}
return errno = retval;
}
/* Resume one or more threads.
*/
static int resume_threads(const pthread_t *const identifier, const int count)
{
int i, result, retval = 0;
if (!identifier || count < 1)
return errno = EINVAL;
for (i = 0; i < count; i++) {
result = pthread_kill(identifier[i], RESUME_SIGNAL);
if (result && !retval)
retval = result;
}
return errno = retval;
}
#endif /* SUSPEND_RESUME_H */
问题?