【问题标题】:C++ Filter PipelineC++ 过滤器管道
【发布时间】:2020-10-24 07:38:47
【问题描述】:

我想为我的应用程序开发一个过滤器管道。 管道应该由任意数量的过滤器组成。

对于过滤器,我声明一个抽象基类,如下所示:

struct AbstractFilter {
    virtual void execute(const std::string& message) = 0;
    virtual ~AbstractFilter() = default;
}

每个过滤器都应该从这个基类继承并实现execute 方法。 像这样:

struct PrintMessage : public AbstractFilter {
    void execute(const std::string& message) override {
        std::cout << "Filter A " << message << '\n';
        //hand over message to next Filter

    }
}

struct Upper : public AbstractFilter {
    void execute(const std::string& message) override {
        std::string new_line;
        for (char c : line)
           new_line.push_back(std::toupper(c));
        //hand over message to next Filter
    }
}

struct WriteToFile : public AbstractFilter {
    void execute(const std::string& message) override {
        std::ofstream of{"test.txt"};
        of << message;
        of.close();
    }
}

编辑 1:

消息应该从管道中的一个过滤器发送到下一个过滤器。 如果管道例如是这样的:

Upper -- PrintMessage -- WriteToFile

消息应该通过所有 3 个过滤器。 (例如,如果Upper 完成了他的工作,则应该将消息发送到PrintMessage 等等)

在上面的例子中,如果消息Hello World 被发送到管道,输出应该是:

Console:
HELLO WORLD
test.txt:
HELLO WORLD

编辑 2:

过滤器仅更改给定消息的内容。类型没有改变。每个过滤器都应该使用例如字符串或给定的类。 该消息仅转发给一个收件人。

我现在的问题是如何连接这些过滤器?

我的第一个猜测是使用Queues。所以每个过滤器都有一个InputOutput 队列。为此,我认为每个过滤器都应该在它自己的Thread 内运行,并在数据添加到他的Input 队列时收到通知。 (例如FilterA的OutputQueue也是FilterB的InputQueue)

我的第二个猜测是使用责任链模式和boost::signals2 例如,FilterB 连接到 FilterA 的信号。 FilterA 在完成工作后调用这些 Filter。

这两种解决方案中哪一种更灵活?或者有没有更好的方法来连接过滤器?

另一个问题是否也可以在一个线程内运行整个管道,以便我可以启动多个管道? (在示例中,有 3 个 FilterA-FilterB-FilterD 管道启动并运行?)

【问题讨论】:

  • 如果你有一个支持协程的编译器,这可能是协程的一个很好的候选者。我认为协程是 C++20
  • 是的,我也考虑过使用它们,但不幸的是我无法使用带有 c++20 的编译器。
  • 您的过滤器根本不会修改它们给出的内容,所以我不会称它们为过滤器。另外,当只有一个函数时,为什么要强迫人们实现另一个基类呢? std::function 提供了足够的抽象。也就是说,我不明白你的要求,即你需要什么行为?您需要订购吗?任何阶段都可以终止管道吗?
  • @UlrichEckhardt 抱歉,示例可能不太好。某些过滤器还应该能够在传递消息之前对其进行修改。我已经更新了示例。

标签: c++ multithreading boost pipeline


【解决方案1】:

我会这样处理: 创建一个包含所有已实现的抽象过滤器版本的列表。因此,按照您的示例,在阅读输入文件后,我将获得一个列表:

[0]:Upper 
[1]:PrintMessage
[2]:WriteToFile

然后一个线程(如果您需要一次处理多个字符串,则为线程轮询)在输入队列中等待一个字符串。当池中出现新字符串时,线程在过滤器列表上循环,最后将结果发布到输出队列中。

如果你想并行运行它,你需要找到一种方法来保持输入字符串 anche nelle stringhe di 输出的顺序。

【讨论】:

  • 我们的意大利语恐怕达不到标准。
【解决方案2】:

我认为 AbstractFilter 不是必需的,我建议使用 std::tuple 来定义管道:

std::tuple<FilterA, FilterB> pipeline1;
std::tuple<FilterA, FilterB, FilterC ... > pipeline2;

要通过管道运行消息(使用 c++17):

template<typename Pipeline>
void run_in_pipeline(const std::string& message, Pipeline& pipeline){
  std::apply([&message](auto&& ... filter) {
    (filter.execute(message), ...);
  }, pipeline);
}

如果您关心性能并且过滤器必须按顺序执行,我不建议在单个管道上使用多线程或信号槽模式。如果您正在处理多线程应用程序,请考虑在不同的线程上运行不同的管道

【讨论】:

  • 感谢您的回复,抽象过滤器已定义,因为我希望管道由可以在配置文件中定义的不同过滤器组成,因此应根据配置动态创建管道
  • @Kevin 那么,如果它具有运行时可配置,那么管道将是指向抽象过滤器的指针向量?实际的 run_in_pipeline 将是一个 for 循环。如果您出于性能原因想避免指向对象的指针,那么将需要其他东西,因为即使 std::function 在内部动态分配内存..
【解决方案3】:

我相信责任链模式更简单,允许更简洁的代码和更大的灵活性。

您不需要第三方库来实现它。
您所说的过滤器实际上是处理程序。所有处理程序都实现一个通用接口,定义一个可以命名为handle() 的方法,甚至可以将对象作为参数来共享状态。每个处理程序都存储一个指向下一个处理程序的指针。它可能会或可能不会在其上调用该方法;在后一种情况下,处理会停止,它充当过滤器

如果其中一些阶段需要其他阶段的输出作为输入,则并行运行流水线阶段会更复杂。对于并行运行的不同管道,每个管道将在自己的线程上运行,您可以使用队列将输入传递给它。

【讨论】:

    猜你喜欢
    • 2013-03-14
    • 2017-11-29
    • 2011-04-06
    • 2014-02-17
    • 1970-01-01
    • 1970-01-01
    • 2017-06-28
    • 1970-01-01
    相关资源
    最近更新 更多