【问题标题】:how to abort execution of a node and its children in tbb flowgraph如何在 tbb 流程图中中止节点及其子节点的执行
【发布时间】:2014-01-15 21:03:55
【问题描述】:

我目前正在测试 tbb 的流程图功能。 为了使用它,我必须能够中止图中某个节点的执行,包括所有依赖它的子节点,但让其他不依赖它的子节点执行。 从主体中抛出异常或调用 task::cancel_group_execution() 会中止所有节点的执行。

#include <cstdio>
#include "tbb/flow_graph.h"

using namespace tbb::flow;

struct body
{   std::string my_name;
    body( const char *name ) : my_name(name)
    {
    }
    void operator()( continue_msg ) const
    {   if (my_name == "B")
            tbb::task::self().group()->cancel_group_execution();
        else
        {   sleep(1);
            printf("%s\n", my_name.c_str());
        }
    }
};

int main()
{
    graph g;

    broadcast_node< continue_msg > start(g);
    continue_node<continue_msg> a( g, body("A"));
    continue_node<continue_msg> b( g, body("B"));
    continue_node<continue_msg> c( g, body("C"));
    continue_node<continue_msg> d( g, body("D"));
    continue_node<continue_msg> e( g, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i )
        try
        {   start.try_put( continue_msg() );
            g.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
    return 0;
}

【问题讨论】:

  • 在示例代码中,您希望节点 A、E 被处理而节点 B、C、D 被跳过?
  • 正确,除了 B 没有真正被跳过而是失败了。

标签: c++ multithreading tbb tbb-flow-graph


【解决方案1】:

如果您希望能够取消部分图表的执行,则需要使用 task_group_contexts。添加以下内容:

#include "tbb/task.h"

并将您的主程序更改为以下内容:

int main()
{
    tbb::task_group_context tgc1;
    tbb::task_group_context tgc2;
    graph g1(tgc1);
    graph g2(tgc2);
    printf("Constructing graph\n");
    broadcast_node< continue_msg > start(g1);
    continue_node<continue_msg> a( g1, body("A"));
    continue_node<continue_msg> b( g2, body("B"));
    continue_node<continue_msg> c( g2, body("C"));
    continue_node<continue_msg> d( g2, body("D"));
    continue_node<continue_msg> e( g1, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i ) {
        try
        {   
            printf("broadcasting graph %d\n", i);
            start.try_put( continue_msg() );
            g1.wait_for_all();
            g2.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
        g1.wait_for_all();
        g1.reset();
        g2.reset();
    }
    return 0;
}

每个 task_group_context 都是(默认)父上下文的子上下文。取消 g2 不会影响 g1。如果 B 抛出而不是取消,您的 catch 将确保异常不会传递给父级。如果您没有捕获异常,父上下文也将被取消,A 和 E 的上下文也将被取消。

请注意,您必须等待两个图表完成。此外,您必须reset() 图表来重置continue_nodes' 计数器。实际上,在抛出并捕获异常的情况下,并不能保证在catch(...)完成后g1就完成了,所以你需要在try/catch之外做一个g1.wait_for_all()。我编辑了代码以显示这一点。

您可以使用 continue_msg 的输入和 continue_msg 的单个输出将 B 设为 multifunction_node,而不是使用取消来停止部分计算:

typedef multifunction_node<continue_msg, tuple<continue_msg> > mf_type;

struct mf_body {
    std::string my_name;
    mf_body(const char *name) : my_name(name) {}
    void operator()(continue_msg, mf_type::output_ports_type &op) {
        if(my_name == "B") {
            printf("%s returning without sending\n", my_name.c_str());
            return;
        }
        sleep(1);
        get<0>(op).try_put(continue_msg());
        return;
    }
};

然后你创建节点 B:

mf_type b( g, unlimited, mf_body("B"));

从 B 到 C 的边会这样设置:

make_edge( output_port<0>(b), c ); 

在这种情况下,您不需要将图拆分为两个子图。如果节点 B 已取消,则它会返回而不将 continue_msg 转发给其后继节点。如果节点 B 不转发消息,节点 C 将不会执行,因为它需要两个continue_msgs 才能启动。之后您仍然需要重置图表,以重置 C 的计数。

multifunction_node 的优点是您可以选择是否转发消息。这里需要注意的是,带有continue_msg 输入的multifunction_node 不像continue_nodecontinue_node 需要的continue_msgs 与它的前辈一样多(加上构造时的初始化值)。multifunction_node 主体在收到continue_msg 时执行,无论它有多少前辈。因此,对于您的图表,您不能只制作所有节点multifunction_nodes

【讨论】:

    【解决方案2】:

    您可以用bool 代替continue_msg 来表示中止状态。 每个process_node 接收前驱节点状态并在可用时处理任务,并将更新的中止状态发送到后继节点。

    struct body
    {   std::string my_name;
        body( const char *name ) : my_name(name)
        {
        }
        bool operator()( bool avail ) const
        {   if (!avail)
               printf("%s skipped\n", my_name.c_str());
            else
                if (my_name == "B")
                {   printf("%s fail\n", my_name.c_str());
                    avail = false;  // fail task
                }
                else
                {   sleep(1);
                    printf("%s\n", my_name.c_str());
                }
            return avail;
        }
    };
    
    int main()
    {
        graph g;
    
        typedef function_node<bool, bool> process_node;
        typedef std::tuple<bool,bool> bool_pair;
        broadcast_node< bool > start(g);
        process_node a( g, unlimited, body("A"));
        process_node b( g, unlimited, body("B"));
        process_node c( g, unlimited, body("C"));
        join_node<bool_pair> join_c(g);
        function_node<bool_pair, bool> and_c(g, unlimited, [](const bool_pair& in)->bool {
            return std::get<0>(in) && std::get<1>(in);
        });
        process_node d( g, unlimited, body("D"));
        process_node e( g, unlimited, body("E"));
    
        /*
         * start -+-> A -+-> E
         *        |       \
         *        |        \
         *        |         join_c -> and_c -> C -> D
         *        |        /
         *        |       /
         *        +-> B -- 
         */
        make_edge( start, a );
        make_edge( start, b );
        make_edge( a, input_port<0>(join_c) );
        make_edge( b, input_port<1>(join_c) );
        make_edge( join_c, and_c );
        make_edge( and_c, c );
        make_edge( c, d );
        make_edge( a, e );
    
        for (int i = 0; i < 3; ++i )
            try
            {   start.try_put( true );
                g.wait_for_all();
            } catch (...)
            {   printf("Caught exception\n");
            }
        return 0;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多