【问题标题】:non-blocking read of a pipe while doing other things在做其他事情时对管道进行非阻塞读取
【发布时间】:2018-02-01 00:24:21
【问题描述】:

作为process hangs when writing large data to pipe 的后续行动,我需要实现一种方法,让父进程从其子进程写入的管道中读取数据,同时执行其他操作直到子进程完成。

更具体地说,父级通过 HTTP 向客户端返回响应。响应由字符串<PING/> 组成,然后是完成ping 时的字符串<DONE/>,然后是实际内容。这样做是为了在实际响应准备好之前保持连接处于活动状态。

我的问题:

1) 我主要是在寻找一般反馈。您是否看到此代码有任何问题?

2) 我能否实现非阻塞读取的目标?特别是一旦读取了所有当前可用的数据(但作者仍在编写更多数据),我的代码是否能够从while ( my $line = <$pipe_reader>){ 继续前进?并且在管道关闭后子节点终止之前它会正常工作吗?

3) IO::Select 的文档说 add() 需要一个 IO::Handle 对象。我一直在到处看到IO::Handle,但我不知道如何确定以这种方式创建的管道是否算作IO::Handle 对象。 perl -e "pipe(my $r, my $w); print(ref($r))" 只是给了我 GLOB...

4) select 的 Perl 文档(我假设 IO::Select 是基于该文档)警告

警告:除非 POSIX 允许,否则不应尝试将缓冲 I/O(如 readreadline)与 select 混合使用,即使这样也只能在 POSIX 系统上使用。您必须改用sysread

这是否意味着$writer->write('<PING/>'); 在同一个循环中存在问题?

Perl 代码

pipe(my $pipe_reader, my $pipe_writer);
$pipe_writer->autoflush(1);

my $pid = fork;

if ( $pid ) {

    # parent
    close $pipe_writer;

    $s = IO::Select->new();
    $s->add($pipe_reader);

    my $response  = "";
    my $startTime = time;
    my $interval  = 25;
    my $pings     = 0;

    while ( waitpid(-1, WNOHANG) <= 0 ) {

        if ( time > $startTime + ($interval * $pings) ) {
            $pings++;
            $writer->write('<PING/>');
        }

        if ( $s->can_read(0) ) {

            while ( my $line = <$pipe_reader> ) {
                $response .= $line;
            }
        }
    };

    $writer->write('<DONE/>');
    $writer->write($response);
    close $pipe_reader;
    $writer->close();

else {

    #child
    die "cannot fork: $!" unless defined $pid;
    close $pipe_reader;

    #...do writes here...

    close $pipe_writer;
}

关于$writer,可能与本题无关,但整体解决方案遵循 second code sample here

由于我们还没有准备好整个 HTTP 正文,我们向 PSGI 返回一个回调,它为我们提供了一个 $responder 对象。我们只给它 HTTP 状态和内容类型,然后它给我们一个 $writer 以便稍后编写正文。

我们在上面的代码中使用$writer 来编写我们的 ping 值和最终的正文。以上所有代码都在返回到 PSGI 的回调中,但为简洁起见,我省略了。

【问题讨论】:

  • 你结婚了吗?文件的容量是无限的(不超过你的磁盘空间),而套接字的容量通常比管道大。

标签: perl pipe


【解决方案1】:

这里的第一个问题是非阻塞操作。其他问题如下。

正如您所引用的,select(或IO::Select)不应使用缓冲 I/O。特别是在您想要非阻塞和非缓冲操作的地方。下面的代码与&lt;&gt; 非常混淆。

请注意,“缓冲”是一项多层次的业务。其中一些可以通过简单的程序指令打开/关闭,有些更难处理,还有一些是实现的问题。它存在于语言、库、操作系统、硬件中。我们至少可以使用推荐的工具。

因此,使用sysreadselect 操作句柄读取,而不是readline<> 使用的)。它在EOF 上返回0,因此可以测试写入端何时关闭(发送EOF 时)。

use warnings;
use strict;
use feature 'say';

use Time::HiRes qw(sleep);
use IO::Select; 

my $sel = IO::Select->new;

pipe my $rd, my $wr;
$sel->add($rd); 

my $pid = fork // die "Can't fork: $!";  #/

if ($pid == 0) {
    close $rd; 
    $wr->autoflush;
    for (1..4) {
        sleep 1;
        say "\tsending data";
        say $wr 'a' x (120*1024);
    }
    say "\tClosing writer and exiting";
    close $wr;
    exit; 
}   
close $wr;    
say "Forked and will read from $pid";

my @recd;
READ: while (1) {
    if (my @ready = $sel->can_read(0)) {  # beware of signal handlers
        foreach my $handle (@ready) {
            my $buff;
            my $rv = sysread $handle, $buff, 64*1024;
            if (not $rv) {  # error (undef) or closed writer (==0)
                if (not defined $rv) {
                    warn "Error reading: $!";
                }
                last READ;  # single pipe (see text)
            }
            say "Got ", length $buff, " characters";
            push @recd, length $buff; 
        }
    }
    else {
        say "Doing else ... ";
        sleep 0.5; 
    }
}   
close $rd;
my $gone = waitpid $pid, 0;
say "Reaped pid $gone";
say "Have data: @recd"

这假定父级不会在else 中进行大量处理,否则会使管道检查等待。在这种情况下,您需要为那些长时间的工作派生另一个进程。

一些cmets

  • 我向sysread 索要大量数据,因为这是使用它的最有效方式,而且您期望孩子会写大量数据。您可以从打印件(下面是示例)中看到效果如何。

  • sysread 的未定义返回表示错误。管道可能仍然是可读的,如果我们通过while 返回sysread,我们可能会陷入无限循环的错误,所以我们退出循环。下一次读取错误可能不会发生,但指望这会冒无限循环的风险。

  • 在异常返回(写入器关闭或错误)时,代码将退出循环,因为此处无需再进行操作。但是对于更复杂的 IPC(更多管道,所有这些都在另一个循环中获取新连接,信号处理程序等),我们需要从要监视的列表中删除句柄,并且读取错误的处理将不同于封闭的作家。

  • 在这个简单的示例中,错误处理很简单(实际上只是last READ if not $rv;)。但一般而言,读取错误与有序关闭的写入器不同,它们是分开处理的。 (例如,在读取错误时,我们希望重试固定次数。)

  • 所有数据都可以通过使用OFFSETsysread 的第四个参数length $buff)收集到$buff。然后每次写入都从$buff 的末尾开始,它会被扩展。

    my $rv = sysread $handle, $buff, 64*1024, length $buff;
    

    在这种情况下,不需要@recd。这是收集数据的常用方法。

  • 信号是任何 IPC 的组成部分。接下来是有限的讨论

"Safe signals" 通常保护 I/O 不被信号中断。但是select可能会受到影响

请注意,select 是否在信号(例如 SIGALRM)之后重新启动取决于实现。

因此使用它的句柄也可能不安全。根据我的经验,can_read 可以在程序处理 SIGCHLD 时返回(假)。这个简单的例子是安全的,有几个原因:

  • 如果can_read 在处理信号时返回空,while 会将其直接返回到仍可读的句柄。

  • 如果程序在select 被阻塞时,信号可能会影响select。但是您有非阻塞操作,并且在select 检查句柄时信号正确进入的机会微乎其微

  • 最后,我不知道写入管道的进程的SIGCHLD 是否会影响该管道另一端的select,但即使它可以,可能性也很小。

对于更复杂的代码(如果can_read 没有像上面那样直接在循环中),请考虑其错误返回(由于信号)是否会影响程序流程。如果这是一个问题,请添加代码以检查来自can_read 的错误返回;如果由信号引起,$!EINTR。这可以通过使用%! 来检查,使用时会加载Errno。所以你可以检查can_read是否因为if $!{EINTR}的中断而返回。比如

if (my @ready = $sel->can_read(0)) {
   ...
}
elsif ($!{EINTR}) { 
   # interrupted by signal, transfer control as suitable
   next READ;
}

同样,上面的程序无论如何都会立即返回到while(假设else 块不适用于长时间运行的作业,应该有另一个进程)。

另一个问题是SIGPIPE 信号,默认情况下会杀死程序。由于您正在处理管道,因此处理它是谨慎的,通过 安装signal handler

    $SIG{PIPE} = \&handle_sigpipe;

where sub handle_sigpipe 可以做程序需要的事情。例如,设置用于检查管道有效性的全局变量,因此一旦它引发错误,我们就不会再次尝试读取/写入它。我们分配给$SIG{PIPE} 的事实可以防止该信号。但是,除非它是'IGNORE',否则can_read 需要如上所述重新启动。请参阅follow-up post

对问题的评论

  • 您的代码片段将无法按预期“继续”,因为它使用&lt;&gt; 进行读取。 (此外,您在&lt;&gt; 上获得了while,这确实会阻塞。因此,一旦它读取了可用的内容,它就会坐下来等到更多的内容出现。您想要单次读取,但又不是&lt;&gt; .)

  • 每个 filenahdle 都是一个 IO::Handle(或 IO::File)对象,或者至少可以按需加入这些类。请参阅this post 的(第二部分)。

  • 关于不将缓冲 I/O 与 select 混合的警告与使用它的文件句柄有关。虽然它对管道至关重要,但写入其他服务是无关的。

  • 代码注释:没有必要以孩子的退出为条件。您需要注意孩子何时关闭管道。稍后再收获进程(收集信号)。

处理类似需求的另一种方法是在自己的fork 中完成工作的每个部分。因此,在一个单独的过程中使用您的HTTP 进行“保持活动”。然后通过使用socketpair 进行通信,所有子进程都可以由父进程更简单地管理。

请参阅 this post 比较 readsysread,其中包含许多相关点。


上面的代码打印出来

分叉,将从 4171 读取 做别的... 做别的... 做别的... 发送数据 有 65536 个字符 有 57345 个字符 做别的... 做别的... 发送数据 有 65536 个字符 有 57345 个字符 做别的... 做别的... 发送数据 做别的... 有 65536 个字符 有 40960 个字符 有 16385 个字符 做别的... 做别的... 发送数据 有 65536 个字符 有 24576 个字符 关闭 writer 并退出 得到 32769 个字符 做别的... 收割 pid 4171 有数据:65536 57345 65536 57345 65536 40960 16385 65536 24576 32769

【讨论】:

  • 触发follow up question的cmets的(清理)线程反映在同时已大大扩展和编辑的答案中。
猜你喜欢
  • 2015-01-25
  • 2016-08-09
  • 2011-04-15
  • 2016-04-02
  • 2010-09-27
相关资源
最近更新 更多