【问题标题】:How to wait for child process to set variable in parent process?如何等待子进程在父进程中设置变量?
【发布时间】:2016-12-28 12:25:41
【问题描述】:
 use Parallel::ForkManager;    
 my $number_running = 0;
 my $pm = new Parallel::ForkManager(30); 
 $pm->run_on_start( sub { ++$number_running; } );
 $pm->run_on_finish( sub { --$number_running; } );
 for (my $i=0; $i<=100; $i++)
 {
     if ($number_running == 5) { while ($number_running > 0) {} }  # waits forever
     $pm->start and next;
     print $i;
     $pm->finish;
 }

以上代码使用Parallel::ForkManager 使用并行进程在for 循环中执行代码。它正在计算有多少子进程正在运行并相应地设置$number_running 变量。一旦 5 个子进程正在运行,我希望它等到 0 个子进程正在运行后再继续。

for 循环中的第一行旨在实现这一点,但它会永远等待该行。就像子进程对变量所做的更改不适用于该行代码一样。我究竟做错了什么?注意:我知道wait_all_children,但我不想使用它。

【问题讨论】:

  • 你不应该在这样的空闲循环中旋转。你的父进程将用尽它所能获得的所有 CPU 时间,无休止地测试$number_running 的值。
  • 我怎样才能等到$number_running 被子进程减为零?
  • @CJ7 子进程不能影响父进程中使用的$number_running -- 子进程和父进程不能写入彼此的变量。父母必须为自己递减这个变量。请参阅添加到我的答案中的解释。

标签: perl parallel-processing fork ipc child-process


【解决方案1】:

Short 回调run_on_finish 通常不会为每个孩子的退出触发,因此$number_running 不会被减少,因此它无法控制循环。解决方法:

  • 使用reap_finished_children 以便在个别孩子退出时进行通信,这样run_on_finish 确实可以在每个孩子退出时运行

  • 使用wait_for_available_procs 等待整个批次完成后再开始一个新的

至于标题,子进程不能在父进程中设置任何内容(父进程也不能在子进程中设置)。他们必须按照上面针对本模块中的此计划概述的方式进行沟通以执行操作。


回调run_on_start 随每个新进程运行,并且计数器递增。但是回调run_on_finish 永远不会被触发,因此计数器永远不会递减。因此,一旦到达5,代码就位于while 循环中。请注意,父进程和子进程是独立的进程,因此不知道彼此的变量,也无法更改它们。

回调run_on_finish 通常是在所有进程被派生后通过wait_all_children 触发的。它的工作也完成了 当最大数量的进程运行并且一个退出时。这是通过调用wait_one_child(调用on_finish,见下文)在start 中完成的。

或者,这可以通过调用reap_finished_children方法随意完成

这是一个非阻塞调用,用于获取子节点并执行独立于对startwait_all_children 的调用的回调。在不经常调用 start 但您希望快速执行回调的情况下使用它。

这解决了如何在个别儿童退出时进行交流(如 cmets 中所述)的主要问题,而不是 wait_all_children

这是一个如何使用它的示例,以便回调在子退出时正确运行。大量代码仅用于诊断(打印)。

use warnings;
use strict;
use feature 'say';
use Parallel::ForkManager;    
$| = 1;

my $total_to_process = 3;  # only a few for this test
my $number_running   = 0;    
my @ds;

my $pm = Parallel::ForkManager->new(30);

$pm->run_on_start( sub {
    ++$number_running;
    say "Started $_[0], total: $number_running";
});
$pm->run_on_finish( sub {
    --$number_running;
    my ($pid, $code, $iden, $sig, $dump, $rdata) = @_;
    push @ds, "gone-$pid";
    say "Cleared $pid, ", ($rdata->[0] // ''), ($code ? " exit $code" : '');
});

foreach my $i (1 .. $total_to_process)
{
    $pm->start and next;
    run_job($i);
    $pm->finish(10*$i, [ "kid #$i" ]);
}
say "Running: ", map { "$_ " } $pm->running_procs;  # pid's of children

# Reap right as each process exits, retrieve and print info
my $curr = $pm->running_procs;
while ($pm->running_procs) 
{
    $pm->reap_finished_children;    # may be fewer now
    if ($pm->running_procs < $curr) {
        $curr = $pm->running_procs;
        say "Remains: $number_running. Data: @ds";
    }
    sleep 1;  # or use Time::HiRes::sleep 0.1;
}

sub run_job {
    my ($num) = @_;
    my $sleep_time = ($num == 1) ? 1 : ($num == 2 ? 10 : 20);
    sleep $sleep_time;
    say "\tKid #$num slept for $sleep_time, exiting";
}

使用这个方法相当于在fork之后循环调用waitpid -1, POSIX::WNOHANG。这比最大 (30) 进程更少,以便更轻松地查看输出并证明回调在子退出时正确运行。更改这些数字以查看其完整操作。

子进程以10*$i 退出,以便能够在输出中跟踪子进程。在匿名数组[...] 中返回的数据是一个标识子进程的字符串。只要reap_finished_children 调用完成,$number_running 就会在回调中减少。这就是使用 $curr 变量的原因,再次用于诊断。

打印出来

开始:开始 4656,运行:1 开始:开始 4657,运行:2 开始:开始4658,运行:3 跑步:4656 4658 4657 孩子 #1 睡了 1 分钟,退出 清除 4656,孩子 #1 出口 10 遗骸:2。数据:gone-4656 孩子 #2 睡了 10 分钟,离开了 清除 4657,孩子 #2 出口 20 遗骸:1。数据:gone-4656gone-4657 孩子 #3 睡了 20 分钟,离开了 清除 4658,孩子 #3 出口 30 遗骸:0。数据:gone-4656gone-4657gone-4658

直接的问题是如何在开始一个新批次之前等待整个批次完成。这可以直接由wait_for_available_procs($n)完成

等到$n 可用的进程槽可用。如果没有给出$n,则默认为1

如果$MAX 用于$n,那么只有在整个批次完成后才会有这么多插槽可用。 $n 使用什么也可以在运行时决定。


模块操作的一些细节

当一个孩子退出时,SIGCHLD 信号被发送给父母,它必须抓住这个信号才能知道孩子已经离开(首先是为了避免僵尸)。这是通过在代码或SIGCHLD 处理程序中使用waitwaitpid 来完成的(但仅在一个地方)。请参阅forkSignals in perlipcwaitpidwait

我们从P::FM's source 看到这是在wait_one_child 中完成的(通过_waitpid sub)

sub wait_one_child { my ($s,$par)=@_;  
  my $kid;
  while (1) {
    $kid = $s->_waitpid(-1,$par||=0);
    last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
    redo if !exists $s->{processes}->{$kid};
    my $id = delete $s->{processes}->{$kid};
    $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0);
    last;
  }
  $kid;
};  

用于wait_all_children

sub wait_all_children { my ($s)=@_;
  while (keys %{ $s->{processes} }) {
    $s->on_wait;
    $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
  };
}

上面使用的方法reap_finished_children是这个方法的同义词。

start 使用获取信号的方法wait_one_child 在最大进程数已满且一个退出时获取子进程。这就是模块知道何时可以启动另一个进程并尊重其最大值的方式。 (它也被其他一些等待进程的例程使用。 )。这是run_on_finish$s-&gt;on_finish( $kid, ... )触发的时候

sub on_finish {
  my ($s,$pid,@par)=@_;
  my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
  $code->($pid,@par);
};

回调在 coderef $code 中,从对象的 on_finish 键中检索,该键本身在子 run_on_finish 中设置。一旦 sub 运行,这就是回调的设置方式。

为此用户可用的方法是wait_all_childrenreap_finished_children

由于发布的代码中没有使用这些,$number_running 没有得到更新,所以while 是一个无限循环。回想一下父进程中的变量$number_running不能被子进程直接更改

【讨论】:

  • run_on_finish 在每次子进程完成时执行。 $number_running由子进程更新。我自己对此进行了测试。我不想使用wait_for_children。我想知道如何等待$number_running 被子进程减为零。
  • @CJ7 1. 孩子与柜台无关。子代码是我们在start 之后键入的内容。 2. 你想要 wait(或waitpid)为孩子,这是分叉的第一条规则。 3. 程序通过信号获知孩子的退出,它必须捕捉(并处理)这些信号才能真正知道孩子退出了。另一种方式是让孩子返回pipe(或发送SIGUSR 信号)说它正在退出,但我们输入孩子代码,所以情况并非如此。 4.终于看源码了,这个wait-ing确实是在wait_all_children做的。我会更新我的帖子。
  • @CJ7 while 显然是无限的,这只能是因为$number_running 没有得到更新,即使子进程在创建后很快就退出了。这只能是因为回调没有运行。当我注释掉wait_all_children 时,我看到了它的确认——Cleared PID 行只有在所有进程都被分叉后才开始打印。
  • 好的,我可以在源代码中看到finish 没有调用on_finish,但我不明白为什么这不能发生。为什么不能这样做?
  • @CJ7 我刚刚尝试过——将限制设置为 10,但只运行了 5 个进程(并删除了wait_all_children)。它从未打印过Cleared ...,回调从未执行。请注意,这可能会导致僵尸。一些系统会处理这个问题,但通常不能指望它。
猜你喜欢
  • 2013-12-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多