Short 回调run_on_finish 通常不会为每个孩子的退出触发,因此$number_running 不会被减少,因此它无法控制循环。解决方法:
至于标题,子进程不能在父进程中设置任何内容(父进程也不能在子进程中设置)。他们必须按照上面针对本模块中的此计划概述的方式进行沟通以执行操作。
回调run_on_start 随每个新进程运行,并且计数器递增。但是回调run_on_finish 永远不会被触发,因此计数器永远不会递减。因此,一旦到达5,代码就位于while 循环中。请注意,父进程和子进程是独立的进程,因此不知道彼此的变量,也无法更改它们。
回调run_on_finish 通常是在所有进程被派生后通过wait_all_children 触发的。它的工作也完成了
当最大数量的进程运行并且一个退出时。这是通过调用wait_one_child(调用on_finish,见下文)在start 中完成的。
或者,这可以通过调用reap_finished_children方法随意完成
这是一个非阻塞调用,用于获取子节点并执行独立于对start 或wait_all_children 的调用的回调。在不经常调用 start 但您希望快速执行回调的情况下使用它。
这解决了如何在个别儿童退出时进行交流(如 cmets 中所述)的主要问题,而不是 wait_all_children。
这是一个如何使用它的示例,以便回调在子退出时正确运行。大量代码仅用于诊断(打印)。
use warnings;
use strict;
use feature 'say';
use Parallel::ForkManager;
$| = 1;
my $total_to_process = 3; # only a few for this test
my $number_running = 0;
my @ds;
my $pm = Parallel::ForkManager->new(30);
$pm->run_on_start( sub {
++$number_running;
say "Started $_[0], total: $number_running";
});
$pm->run_on_finish( sub {
--$number_running;
my ($pid, $code, $iden, $sig, $dump, $rdata) = @_;
push @ds, "gone-$pid";
say "Cleared $pid, ", ($rdata->[0] // ''), ($code ? " exit $code" : '');
});
foreach my $i (1 .. $total_to_process)
{
$pm->start and next;
run_job($i);
$pm->finish(10*$i, [ "kid #$i" ]);
}
say "Running: ", map { "$_ " } $pm->running_procs; # pid's of children
# Reap right as each process exits, retrieve and print info
my $curr = $pm->running_procs;
while ($pm->running_procs)
{
$pm->reap_finished_children; # may be fewer now
if ($pm->running_procs < $curr) {
$curr = $pm->running_procs;
say "Remains: $number_running. Data: @ds";
}
sleep 1; # or use Time::HiRes::sleep 0.1;
}
sub run_job {
my ($num) = @_;
my $sleep_time = ($num == 1) ? 1 : ($num == 2 ? 10 : 20);
sleep $sleep_time;
say "\tKid #$num slept for $sleep_time, exiting";
}
使用这个方法相当于在fork之后循环调用waitpid -1, POSIX::WNOHANG。这比最大 (30) 进程更少,以便更轻松地查看输出并证明回调在子退出时正确运行。更改这些数字以查看其完整操作。
子进程以10*$i 退出,以便能够在输出中跟踪子进程。在匿名数组[...] 中返回的数据是一个标识子进程的字符串。只要reap_finished_children 调用完成,$number_running 就会在回调中减少。这就是使用 $curr 变量的原因,再次用于诊断。
打印出来
开始:开始 4656,运行:1
开始:开始 4657,运行:2
开始:开始4658,运行:3
跑步:4656 4658 4657
孩子 #1 睡了 1 分钟,退出
清除 4656,孩子 #1 出口 10
遗骸:2。数据:gone-4656
孩子 #2 睡了 10 分钟,离开了
清除 4657,孩子 #2 出口 20
遗骸:1。数据:gone-4656gone-4657
孩子 #3 睡了 20 分钟,离开了
清除 4658,孩子 #3 出口 30
遗骸:0。数据:gone-4656gone-4657gone-4658
直接的问题是如何在开始一个新批次之前等待整个批次完成。这可以直接由wait_for_available_procs($n)完成
等到$n 可用的进程槽可用。如果没有给出$n,则默认为1。
如果$MAX 用于$n,那么只有在整个批次完成后才会有这么多插槽可用。 $n 使用什么也可以在运行时决定。
模块操作的一些细节
当一个孩子退出时,SIGCHLD 信号被发送给父母,它必须抓住这个信号才能知道孩子已经离开(首先是为了避免僵尸)。这是通过在代码或SIGCHLD 处理程序中使用wait 或waitpid 来完成的(但仅在一个地方)。请参阅fork、Signals in perlipc、waitpid 和 wait。
我们从P::FM's source 看到这是在wait_one_child 中完成的(通过_waitpid sub)
sub wait_one_child { my ($s,$par)=@_;
my $kid;
while (1) {
$kid = $s->_waitpid(-1,$par||=0);
last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
redo if !exists $s->{processes}->{$kid};
my $id = delete $s->{processes}->{$kid};
$s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0);
last;
}
$kid;
};
用于wait_all_children
sub wait_all_children { my ($s)=@_;
while (keys %{ $s->{processes} }) {
$s->on_wait;
$s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
};
}
上面使用的方法reap_finished_children是这个方法的同义词。
start 使用获取信号的方法wait_one_child 在最大进程数已满且一个退出时获取子进程。这就是模块知道何时可以启动另一个进程并尊重其最大值的方式。 (它也被其他一些等待进程的例程使用。
)。这是run_on_finish被$s->on_finish( $kid, ... )触发的时候
sub on_finish {
my ($s,$pid,@par)=@_;
my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
$code->($pid,@par);
};
回调在 coderef $code 中,从对象的 on_finish 键中检索,该键本身在子 run_on_finish 中设置。一旦 sub 运行,这就是回调的设置方式。
为此用户可用的方法是wait_all_children 和reap_finished_children。
由于发布的代码中没有使用这些,$number_running 没有得到更新,所以while 是一个无限循环。回想一下父进程中的变量$number_running不能被子进程直接更改。