【问题标题】:a simple boss-worker model using pthreads使用 pthreads 的简单老板-工人模型
【发布时间】:2023-12-10 17:21:02
【问题描述】:

我是一名业余程序员,正在尝试使用 pthreads,看看多线程程序可以在多大程度上提高我正在处理的相当长的计算的效率。计算通过一个 std::list 对象运行,弹出列表的第一个元素,并将其分配给一个线程,用它计算一些东西。该程序跟踪活动线程,并确保始终有一定数量的活动线程在运行。一旦列表为空,程序对结果数据进行排序,转储数据文件并终止。

程序的多线程版本目前不工作。它在列表中获得 20 或 40 或 200 个左右的元素(取决于我给它的列表)和段错误。似乎段错误发生在列表的特定元素上,这意味着它们不会以任何方式随机出现。

但是 奇怪的是,如果我用调试符号编译并通过 gdb 运行程序,程序不会出现段错误。它运行完美。当然,慢慢地,但它运行并按照我期望的方式完成所有事情。

在考虑了大家的建议一段时间后,使用(除其他外)valgrind 的工具来尝试解决正在发生的事情。我注意到下面的简化代码(在 std 库或 pthread 库之外没有任何调用)会给 helgrind 带来麻烦,这可能是我问题的根源。所以这里只是简化的代码,以及 hlgrind 的抱怨。

#include <cstdlib>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <list>
#include <iostream>
#include <signal.h>
#include <sys/select.h>

struct thread_detail {
 pthread_t *threadID; 
 unsigned long num;
};

pthread_mutex_t coutLock;

void *ThreadToSpawn(void *threadarg)
{
   struct thread_detail *my_data;
   my_data = (struct thread_detail *) threadarg;
   int taskid = my_data->num;

   struct timeval timeout;
   for (unsigned long i=0; i < 10; i++)
    { 
     timeout.tv_sec = 0;  timeout.tv_usec = 500000; // half-second 
     select( 0, NULL, NULL, NULL, & timeout );
     pthread_mutex_lock(&coutLock);
     std::cout << taskid << " "; std::cout.flush();
     pthread_mutex_unlock(&coutLock);
    }
   pthread_exit(NULL);
}


int main (int argc, char *argv[])
{
  unsigned long comp_DONE=0; 
  unsigned long comp_START=0;
  unsigned long ms_LAG=10000; // microsecond lag between polling of threads

  // set-up the mutexes
  pthread_mutex_init( &coutLock, NULL );

  if (argc != 3) { std::cout << "Program requires two arguments: (1) number of threads to use,"
                               " and (2) tasks to accomplish. \n"; exit(1); }
  unsigned long NUM_THREADS(atoi( argv[1] ));
  unsigned long comp_TODO(atoi(argv[2]));
  std::cout << "Program will have " << NUM_THREADS << " threads. \n";
  std::list < thread_detail > thread_table;

   while (comp_DONE != comp_TODO) // main loop to set-up and track threads
    {
     // poll stack of computations to see if any have finished, 
     // extract data and remove completed ones from stack
     std::list < thread_detail >::iterator i(thread_table.begin());
     while (i!=thread_table.end())
      {
       if (pthread_kill(*i->threadID,0)!=0) // thread is dead
        { // if there was relevant info in *i we'd extract it here
         if (pthread_join(*i->threadID, NULL)!=0) { std::cout << "Thread join error!\n"; exit(1); }
         pthread_mutex_lock(&coutLock);
         std::cout << i->num << " done. "; std::cout.flush();
         pthread_mutex_unlock(&coutLock);
         delete i->threadID;
         thread_table.erase(i++);  
         comp_DONE++;
        }
       else (i++);
      }
     // if list not full, toss another on the pile
     while ( (thread_table.size() < NUM_THREADS) && (comp_TODO > comp_START) )
      {
        pthread_t *tId( new pthread_t );
        thread_detail Y; Y.threadID=tId; Y.num=comp_START;
        thread_table.push_back(Y);
        int rc( pthread_create( tId, NULL, ThreadToSpawn, (void *)(&(thread_table.back() )) ) );
        if (rc) { printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); }
        pthread_mutex_lock(&coutLock);
       std::cout << comp_START << " start. "; std::cout.flush();
        pthread_mutex_unlock(&coutLock);
        comp_START++;
      }

     // wait a specified amount of time
     struct timeval timeout;
     timeout.tv_sec = 0;  timeout.tv_usec = ms_LAG; 
     select( 0, NULL, NULL, NULL, & timeout );
    } // the big while loop

   pthread_exit(NULL);
}

Helgrind 输出


==2849== Helgrind, a thread error detector
==2849== Copyright (C) 2007-2009, and GNU GPL'd, by OpenWorks LLP et al.
==2849== Using Valgrind-3.6.0.SVN-Debian and LibVEX; rerun with -h for copyright info
==2849== Command: ./thread2 2 6
==2849== 
Program will have 2 threads. 
==2849== Thread #2 was created
==2849==    at 0x64276BE: clone (clone.S:77)
==2849==    by 0x555E172: pthread_create@@GLIBC_2.2.5 (createthread.c:75)
==2849==    by 0x4C2D42C: pthread_create_WRK (hg_intercepts.c:230)
==2849==    by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849==    by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849== 
==2849== Thread #1 is the program's root thread
==2849== 
==2849== Possible data race during write of size 8 at 0x7feffffe0 by thread #2
==2849==    at 0x4C2D54C: mythread_wrapper (hg_intercepts.c:200)
==2849==  This conflicts with a previous read of size 8 by thread #1
==2849==    at 0x4C2D440: pthread_create_WRK (hg_intercepts.c:235)
==2849==    by 0x4C2D4CF: pthread_create@* (hg_intercepts.c:257)
==2849==    by 0x401374: main (in /home/rybu/prog/regina/exercise/thread2)
==2849== 
 [0 start.]  [1 start.] 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1  [0 done.]  [1 done.]  [2 start.]  [3 start.] 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3 2 3  [2 done.]  [3 done.]  [4 start.]  [5 start.] 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5 4 5  [4 done.]  [5 done.] ==2849== 
==2849== For counts of detected and suppressed errors, rerun with: -v
==2849== Use --history-level=approx or =none to gain increased speed, at
==2849== the cost of reduced accuracy of conflicting-access information
==2849== ERROR SUMMARY: 6 errors from 1 contexts (suppressed: 675 from 37)

大概我以不正确的方式使用 pthreads,但我并不清楚我做错了什么。此外,我不确定如何制作 helmgrind 输出。早些时候 helgrind 抱怨说,因为我没有在由于其他原因代码知道已死的线程上调用 pthread_join。添加 pthread_join 处理了这些投诉。

在线阅读各种 pthread 教程后,我发现像上面的代码一样,进行如此多的线程创建和销毁可能毫无意义。让 N 个线程同时运行可能更有效,并使用互斥锁和共享内存在“BOSS”线程和“WORKER”线程之间传递数据,只在程序结束时杀死 WORKER 线程一次。所以这是我最终必须调整的东西,但是上面的代码有什么明显的问题吗?

编辑:我越来越频繁地注意到一些关键字。我试图创建的东西的术语显然是一个线程池。此外,对此标准实现有各种建议,例如在 boost 库中有 boost::threadpool、boost::task、boost::thread。其中一些似乎只是提议。我在这里遇到人们提到you can combine ASIO and boost::thread 来完成我正在寻找的东西的线程。同样有一个消息队列类。

嗯,所以我似乎只是在探讨当今许多人都在思考的一个话题,但它似乎是一种萌芽,就像 OOP 在 1989 年之类的那样。

【问题讨论】:

  • 代码过多/代码不足 - 尝试将其减少到仍然存在问题的最小样本,您不妨在过程中找到错误。
  • 尝试修复 helgrind 识别的所有种族,然后重试。
  • 您现在在该测试程序中看到的大多数比赛是因为您试图同时访问 stdio (std::cout) - 在 stdout 调用周围加一些锁以清除它。跨度>
  • 谢谢。这消除了除第一个 helmgrind 投诉之外的所有内容。我经常看到这种情况,即使在其他人的代码中似乎也很受欢迎,例如 Daniel Robbins 的代码:ibm.com/developerworks/linux/library/l-posix3/…

标签: gdb pthreads valgrind


【解决方案1】:

尝试启用核心转储 (ulimit -c unlimited),然后在不使用 gdb 的情况下运行您的程序。当它崩溃时,它应该留下一个核心文件,然后您可以使用 gdb 打开并开始调查 (gdb &lt;executable-file&gt; &lt;core-file&gt;)。

【讨论】:

  • @Ryan:回溯命令bt 将让您查看您的代码的哪一行触发了段错误。
  • @Ryan:该类的实例是否在线程之间共享?它应该是线程安全的吗?如果没有,您需要在该类的操作周围添加一些锁定。
  • 不,实例不共享。这些类在单独的线程中初始化,不知道其他线程的存在。
【解决方案2】:

关于顶部,您使用了多少个线程?我在顶部输出中没有看到 DATA,但在使用线程时看到了虚拟列气球。我的理解(也许我应该要求确定)是每个线程都有自己可能使用的内存空间。该内存实际上并没有被使用,它只是在需要时可用,这就是为什么这个数字可以变得相当高而不会真正引起问题。就其本身而言,记忆可能不是灾难性的。您应该查看 DATA 利用率是否与您正在使用的线程数成线性关系。

关于 gdb。正如您所指出的,gdb 不会修复您的代码,但如果您正在破坏内存,它可能会在您发生错误的地方移动。如果损坏发生在您不会返回或您已经发布并且从未尝试重用的区域中,那么问题的症状就会消失。直到您需要在某个关键区域演示或使用您的代码时才离开。

此外,您还需要查看 helgrind,它是 valgrind 的一部分。如果您遇到锁定问题,这种事情就是它的面包和黄油:

Helgrind 是一个 Valgrind 工具,用于检测使用 POSIX pthreads 线程原语的 C、C++ 和 Fortran 程序中的同步错误。

只要做:

valgrind --tool=helgrind {your program}

【讨论】:

  • 奇怪的是,无论我让程序使用多少线程,它都会发生。 1、2、10。它总是在膨胀。
  • 感谢 helgrind 的提示,如果有的话,了解所有这些调试工具真是太好了。它为我提供了大量关于各种“可能的数据竞争”情况的丰富数据。
  • 不幸的是,当 helgrind 运行时,segfault 不会发生。我将编辑我的原始帖子以在应用程序崩溃的情况下显示 helmind 输出,如果 helgrind 没有运行。
  • 好吧,我发现大部分 valgrind 输出只是由于 valgrind 没有正确读取我的代码,因为我没有费心调用 pthread_join(),因为我已经知道相关线程已经死了。通过这种修改,valgrind 只抱怨一次。在上面引用的输出中,该投诉与 valgrind 的第一个投诉相同。你知道它为什么抱怨吗?它仍然在相同的地方出现段错误。
  • 感谢您的提示。在对其大惊小怪之后,看起来 helgrind 并没有提供非常可靠的输出——我尝试过针对 helgrind 运行其他人的代码,它还提供了许多似乎实际上并不存在的数据竞争。所以看来问题是我的线程池调用的代码不是 pthread 安全的。这意味着我有一些工作要做!
【解决方案3】:

你确定它是完整的代码吗?我看不到您在哪里创建线程或从哪里调用 BuildKCData。

你应该在 pthread_kill() 之后有一个内存屏障,尽管我怀疑它在这种情况下会有所不同。

编辑:您混淆了顺序执行和缓存一致性。

缓存一致性: x86(当前)保证对齐的 4 字节访问是原子的,因此线程 A 中的 a[0]=123 和线程 B 中的 a[1]=456 将起作用 — 线程 C 最终会看到“ 123,456 英寸。周围有各种缓存一致性协议,但我相信它大约是一个 MRSW 锁。

乱序执行: x86 不保证读取的顺序(可能还有写入;关于 linux 内核中是否需要 sfence 存在争议)。这可以让 CPU 更有效地预取数据,但这意味着线程 A 中的 a[0]=123,a[1] 和线程 B 中的 a[1]=456,a[0] 都可能返回 0,因为 a[1] 的获取可能发生在 a[0] 的加载之前。解决此问题的一般方法有两种:

  • 仅当您持有锁时才能访问共享数据。特别是不要在锁之外读取共享数据。这是否意味着每个条目的锁或整个数组的锁取决于您,以及您认为锁争用可能是什么样的(提示:通常不是很大)。
  • 在需要按顺序排列的事物之间设置内存屏障。这很难做到(pthread 甚至没有内存屏障;pthread_barrier 更像是一个同步点)。

虽然内存障碍是最近的趋势,但锁定更容易正确(我持有锁,因此不允许其他人更改我脚下的数据)。内存障碍在某些圈子里风靡一时,但还有很多事情要做(我希望这个读取是原子的我希望其他线程可以原子地写入我希望其他线程使用屏障哦,是的,我也需要使用屏障)。

如果锁定的速度太慢,那么减少争用将比用障碍替换锁定并希望你做对了更有效。

【讨论】:

  • “内存屏障”是什么意思?我不确定你在说什么——如果你告诉我你的意思,我很乐意尝试。正在调用 BuildKCData,它位于以 comp_START++ 结尾的 while 循环中。从上到下阅读,这是 std::cout
  • 哦,现在我明白你的意思了。存在某种格式问题。代码在那里,但我认为尖括号在代码 sn-p 中被奇怪地处理。我看看能不能解决这个问题。
  • 似乎模板调用、小于符号等的尖括号导致代码格式混乱。我或多或少地修复了它,但你必须想象像“leq”之类的东西意味着“
  • 如果您只是缩进四个空格(或单击“代码”按钮),它的格式应该正确。
  • 许多“现代”架构不保证内存读/写按指定的顺序发生;您需要屏障操作来确保排序(在 x86 上,lfence/sfence/mfence 指令)。锁通常也是内存屏障(例如,在 Java 中,synchronized 块的开头保证是读屏障,结尾是写屏障)。还有臭名昭著的 Opteron E 错误(谷歌“opeteron lfence”),大概是support.amd.com/us/Processor_TechDocs/25759.pdf的勘误表 147@
最近更新 更多