1 前言

      一个后台实时处理的业务平台,通常我们会根据数据的输入与输出,依据时间轴进行分解成不同阶段或不同粒度的逻辑任务,而每一个待处理的数据我们称为任务或者消息。任务之间的关系可以分为两类:a 上下游父子关系,b 可以并行运行的兄弟关系。具有上下游关系的任务集合具有逻辑或数据依赖关系,即上游任务执行完后,才能执行下游任务;具有兄弟关系的任务间逻辑上互不影响,可以并行运行。

      无论是上面任一情况的业务场景,我们都需要一种管理类,其职责:管理着一堆线程及其待执行的同类型任务集合。线程会等待去执行喂给它的任务,当任务集合大于线程集合的个数时,任务会在队列排队等待;而当线程集合个数大于任务集合时,线程会挂起处于阻塞等待状态,执行器也相应地处于不饱和状态。在jdk里面有现成的管理类ThreadPoolExecutor,那么在c++里面看看类似的实现吧: 

2 任务与任务池

2.1任务

 无论是消息或业务数据,可以抽象地表达为:

      struct data_pair
      {

         char *data;

         int len;

      }

2.2 任务池

     任务的缓存用队列表达:

     std::queue<data_pair*> _queue; 

2.3 任务提交入口

  int CQueueThread::writeData(void *data, int len)

    {

        if (data == NULL || len <= 0) {
            return EXIT_FAILURE;
        }

        data_pair *item = new data_pair();
        item->data = (char*) malloc(len);
        assert(item->data != NULL);
        memcpy(item->data, data, len);
        item->len = len;       
        _mutex.lock();
        _queue.push(item);
        _mutex.signal();

        _mutex.unlock();

        return EXIT_SUCCESS;
    }

3线程池   

3.1 线程封装

    c++里面类似jdk里面Thread类的封装CThread          

{

class CThread {

 

public:

    /**

     * 构造函数

     */

    CThread() {

        tid = 0;

        pid = 0;

    }

 

    /**

     * 起一个线程,开始运行

     */

    bool start(Runnable *r, void *a) {

        runnable = r;

        args = a;

        return 0 == pthread_create(&tid, NULL, CThread::hook, this);

    }

 

    /**

     * 等待线程退出

     */

    void join() {

        if (tid) {

            pthread_join(tid, NULL);

            tid = 0;

            pid = 0;

        }

    }

 

    /**

     * 得到Runnable对象

     *

     * @return Runnable

     */

    Runnable *getRunnable() {

        return runnable;

    }

 

    /**

     * 得到回调参数

     *

     * @return args

     */

    void *getArgs() {

        return args;

    }

   

    /***

     * 得到线程的进程ID

     */

    int getpid() {

        return pid;

    }

 

    /**

     * 线程的回调函数

     *

     */

 

    static void *hook(void *arg) {

        CThread *thread = (CThread*) arg;

        thread->pid = gettid();

 

        if (thread->getRunnable()) {

            thread->getRunnable()->run(thread, thread->getArgs());

        }

 

        return (void*) NULL;

    }

   

private:   

    /**

     * 得到tid号

     */

    #ifdef _syscall0

    static _syscall0(pid_t,gettid)

    #else

    static pid_t gettid() { return static_cast<pid_t>(syscall(__NR_gettid));}

    #endif

 

private:

    pthread_t tid;      // pthread_self() id

    int pid;            // 线程的进程ID

    Runnable *runnable;

    void *args;

}; 

}
View Code

相关文章:

  • 2022-02-05
  • 2022-01-18
  • 2021-05-21
  • 2021-08-05
  • 2022-12-23
  • 2022-01-08
  • 2021-07-08
  • 2021-08-11
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2021-07-09
  • 2021-08-28
  • 2021-05-30
  • 2021-08-19
  • 2022-12-23
相关资源
相似解决方案