【问题标题】:Implementing a pipe within C/C++ program在 C/C++ 程序中实现管道
【发布时间】:2025-11-28 15:10:01
【问题描述】:

我有以下问题:

我编写了一个简单的解压缩器,它解压缩 每行 (/n) 由另一个进程处理的 .gz 文件。所以,在 shell 中,我可以输入:

解压文件 |程序

这两个程序都是 C/C++ 编码的。

有谁知道我是否以及如何在一个 C/C++ 程序中实现这个“管道”(|),这样我就可以制作像这样的多线程的东西...

在我的特殊情况下,保持新行结构完整很重要,这就是我使用管道的原因。 gz 文件太大,无法作为一个整体保存在内存中。

【问题讨论】:

  • 您不需要管道来逐行处理输入。逐行阅读。也就是说,您可以通过使用线程或协程来简化控制结构(例如,在 1960 年代为 Algol 编译器完成的 IIRC)。
  • 答案取决于您的操作系统。我建议根据您的意思添加“linux”或“osx”或“windows”标签(或“solaris”或“posix”等)。
  • 那么基本上你想把这2个程序合并成1个?您使用pipe 系统调用来创建管道。一个线程解压缩文件并写入管道;另一个线程读取管道并执行它所做的任何事情。你是否从中获得了什么是一个悬而未决的问题。
  • 问题是解压缩只能按设定的字节数进行,而不是按行。解压后的行的长度也不一样,所以我需要解压后的换行符……这是在 Linux 中完成的,但我认为 C/C++ 代码也应该在其他操作系统中工作……
  • 此外,这些行要以 4 组为一组进行处理...

标签: c++ pipe


【解决方案1】:

在一般的编程中,有一种叫做generators的东西;在 C++ 中,我们倾向于将它们视为输入迭代器,但关注点仍然相同:很像管道,它是关于拉动驱动生产。

因此,您可以围绕 Producer(最好使用输入迭代器的接口)和 Consumer 的思想来重构您的程序,并且 Consumer 会在此时要求输入一行,而 Producer 会懒洋洋地来跟上。

关于必要界面的良好指南,我推荐古老的 SGI STL 网站:这里是 InputIterator 概念。

举个更简单的例子,假设我们不必处理解压缩,只需逐行读取文件:

class LineIterator: public std::iterator<std::input_iterator_tag,
                                         std::string const>
{
public:
    // Default Constructible
    LineIterator(): stream(nullptr) {}

    explicit LineIterator(std::istream& is): stream(&is) { this->advance(); }

    // Equality Comparable
    friend bool operator==(LineIterator const& left, LineIterator const& right) {
        return left.stream == right.stream
           and left.buffer == right.buffer
           and left.currentLine == right.currentLine;
    }

    friend bool operator!=(LineIterator const& left, LineIterator const& right) {
        return not (left == right);
    }

    // Trivial Iterator (non mutable)
    pointer operator->() const { return &currentLine; }

    reference operator*() const { return currentLine; }

    // Input Iterator
    LineIterator& operator++() {
        this->advance();
        return *this;
    } // operator++

    LineIterator operator++(int) {
        LineIterator tmp(*this);
        ++*this;
        return tmp;
    } // operator++

private:
    void advance() {
        // Advance a valid iterator to fetch the next line from the source stream.
        static LineIterator const SingularValue;

        assert(*this != SingularValue and "Cannot advance singular iterator");
        // Note: in real life, I would use std::getline...
        // ... but it would not showcase the double-buffering model
        // required to solve the OP problem (because of decoding)

        // We use double-buffering, so clear current and swap buffers
        currentLine.clear();
        swap(buffer, currentLine);

        // Check if we found some new line or not
        size_t const nl = currentLine.find('\n');

        // If we found one already, preserve what's after in the buffer
        // as we only want to expose one line worth of material.
        if (nl != std::string::npos) {
            if (nl == currentLine.size()) { return; } // nothing to preserve

            buffer.assign(currentLine.begin() + nl + 1, currentLine.end());
            currentLine.erase(currentLine.begin() + nl + 1, currentLine.end());
            return;
        }

        // If we did not, then we need to pump more data into the buffer.
        if (not stream) { return; } // Nothing to pump...

        static size_t const ReadBufferSize = 256;
        char input[ReadBufferSize];

        while (stream->read(input, ReadBufferSize)) {
            if (this->splitBuffer(input, ReadBufferSize)) { break; }
        }

        // We end up here either if we found a new line or if some read failed.
        // If the stream is still good, we successfully found a new line!
        if (*stream) { return; }

        // Otherwise, the stream is no good any longer (it dried up!)
        // but we may still have read some little things from it.
        this->splitBuffer(input, stream->gcount());

        stream = SingularValue.stream; // stream dried up,
                                       // so reset it to match singular value.
    } // advance

    bool splitBuffer(char const* input, size_t const size) {
        // Split input at the newline character, the first chunk ends
        // up in currentLine, the second chunk in buffer.
        // Returns true if a newline character was found, false otherwise.

        // Check if we finally found a new line
        char const* const newLine = std::find(input, input + size, '\n');

        // If we did not, copy everything into currentLine and signal it.
        if (newLine == input + size) {
            currentLine.append(input, size);
            return false;
        }

        // If we did, copy everything up to it (including it) into currentLine
        // and then bufferize the rest for the next iteration.
        currentLine.append(input, newLine + 1);
        buffer.assign(newLine + 1, input + size);
        return true;
    } // splitBuffer

    std::istream* stream;
    std::string buffer;

    std::string currentLine;
}; // class LineIterator

它有点拗口(而且可能有问题...),但它具有我们需要用 STL 算法组合它的接口,例如:

std::ifstream file("someFile.txt");
std::copy(LineIterator(file), LineIterator(), std::ostream_iterator(std::cout));

这将在终端上一次回显文件一行 (demo here)。

现在,您只需将提取部分 (stream.read) 替换为逐块读取和解压缩 :)

【讨论】:

  • 好的,非常感谢您的回复。我确实有几个问题,因为我不是高级 C 程序员。谁能解释以下结构:
  • 类 LineIterator: public std::iterator<:input_iterator_tag std::string const>显式 LineIterator(std::istream& is): stream(&is) { this->advance(); }
  • 我不明白 : 结构,那里发生了什么。那么,什么是朋友?
  • 指针操作符->()和引用操作符*()有什么作用?
  • @Niels:嗯,我不太确定我是否理解所有问题......让我试试。 1. std::iterator 是一个没有方法/状态的简单类,它只声明了一组迭代器所期望的内部类型(例如reference); 2、Foo(int i): attribute(i) {}:{之间的部分是initializer list,用于在构造函数体开始之前初始化类属性; 3.friend是一种声明类或函数被允许访问你当前类的protectedprivate部分的方式,一般情况下是不会这样的……