【问题标题】:guidance with input filter for TBB Pipeline libraryTBB 管道库的输入过滤器指南
【发布时间】:2014-04-30 09:47:53
【问题描述】:

在我之前的问题中,我使用 C++ (Linux) 和输入、转换和输出过滤器实现了一个 TBB 管道:

incorrect output with TBB pipeline

输入过滤器正在从文本文件中读取数据(C 结构)并传递给转换过滤器。变换过滤器正在更新数据并将其传递给输出过滤器。输出过滤器将其存储回光盘上。一个简单的读写应用程序。

现在,我正在寻找创建一个 MAIN 应用程序。 MAIN 应用程序将在开始时创建一个带有三个过滤器的 TBB 管道:

  1. InputFilter : 从主应用程序接收数据/C 结构并将其传递。
  2. TransformFilter:做一些处理。
  3. OutputFilter : 接收它并将信息返回给主应用程序。

    一开始,InputFilter 不会做任何事情,因为 data/C 结构将为空。所以它会循环或等待。

    MAIN 应用程序将从文件中读取数据并将信息传递给 InputFilter(如果需要)。 InputFilter 然后将处理它并将其传递给下一个过滤器,依此类推。所以不同的是:

    输入由 MAIN 应用程序控制,而不是在 InputFilter 中(就像我之前所做的那样)。一种方法是通过引用 InputFilter 来传递 data/C 结构,然后通过 MAIN 应用程序对其进行更新。但问题是:

    控件永远不会从 InputFilter 返回到 MAIN 应用程序。任何帮助将不胜感激! !

【问题讨论】:

    标签: c++ multithreading parallel-processing intel tbb


    【解决方案1】:

    我已经修改了thread_bound_filter documentation 页面中的示例,以使第一个过滤器成为线程绑定的。它工作正常,我认为这是您需要的:

    #include <iostream>
    #include "tbb/compat/thread"
    #include "tbb/pipeline.h"
    
    using namespace tbb;
    using namespace std;
    
    char InputString[] = "abcdefg\n";
    
    class InputFilter: public thread_bound_filter {
        char* my_ptr;
    public:
        void* operator()(void*) {
            if (*my_ptr)
                return my_ptr++;
            else
                return NULL;
        }
        InputFilter()
        : thread_bound_filter( serial_in_order ), my_ptr(InputString)
        {}
    };
    
    class OutputFilter: public filter {
    public:
        void* operator()(void* item) {
            std::cout << *(char*)item;
            return NULL;
        }
        OutputFilter() : filter(serial_in_order) {}
    };
    
    void RunPipeline(pipeline* p) {
        p->run(8);
    }
    
    int main() {
        // Construct the pipeline
        InputFilter f;
        OutputFilter g;
        pipeline p;
        p.add_filter(f);
        p.add_filter(g);
    
        // Another thread initiates execution of the pipeline
        thread t(RunPipeline, &p);
    
        // Process the thread_bound_filter with the current thread.
        while (f.process_item()!=thread_bound_filter::end_of_stream)
            continue;
    
        // Wait for pipeline to finish on the other thread.
        t.join();
    
        return 0;
    }
    

    当然,您可能想要添加更多过滤器,更改它们的类型(第一个必须是串行的除外),更改令牌数量,使用task::enqueue() 而不是显式线程,将process_item() 替换为try_process_item()为了避免在令牌数量超过时被阻塞。但总体思路是一样的,您可以将控制权返回给处理过滤器的线程。

    【讨论】:

    • 嗨。非常感谢我有一些想法。我对线程概念有点陌生。我尝试了代码并且它正在工作。如果你能帮助我更多请。我无法真正理解 abt 显式线程和任务入队的使用。我想从主应用程序一个一个地发送一个字符到输入过滤器。你能帮我解决这个问题吗?将不胜感激。 .
    • 线程绑定过滤器的思想是只有 [try_]process_item() 调用它。因此,当您从main() 调用 f.process_item() 时,您最终会在同一线程上调用 InputFilter。因此,该示例按照您所说的进行 - 它从主线程一个一个地发送字符(每个 process_item() 调用一个)到由其他线程处理的下一个过滤器。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-29
    • 2020-10-24
    • 2013-03-14
    • 2018-10-29
    • 1970-01-01
    相关资源
    最近更新 更多