【问题标题】:Parallel execution of command using Parallel::ForkManager使用 Parallel::ForkManager 并行执行命令
【发布时间】:2020-06-25 18:06:29
【问题描述】:

我想知道我对以下脚本/逻辑的理解是否正确。

我有节点列表,我需要通过对服务器执行 SSH 来利用我拥有的服务器数量在每个节点上运行某个命令,这意味着该过程应该并行发生。

我有 node_list.txt 文件,其中包含节点列表:

node1
node2
.
.
node49
node50

我已经在数组@hosts 中定义了服务器数量,我应该在其中执行SSH 并通过将node_file.txt 拆分为可用服务器中相等数量的部分(称为$node_list_X.txt)来对每个节点执行命令。

一旦我有了这些文件 (node_list_1.txt,node_list_2.txt,node_list_3.txt,node_list_4.txt),我将登录到每个已经定义的服务器,并通过传递 node_list_X.txt 在每个主机上执行某些命令并行文件。

为了并行执行,我使用了Parallel::ForkManager Perl 模块。

所以,让我们在每个主机中说 -

192.168.0.1 -> node_list_1.txt (13 nodes)
192.168.0.2 -> node_list_2.txt (13 nodes)
192.168.0.3 -> node_list_3.txt (12 nodes)
192.168.0.4 -> node_list_4.txt (12 nodes)

将并行运行。

脚本如下:

...
my @hosts = ("192.168.0.1", "192.168.0.2", "192.168.0.3","192.168.0.4");

open(my $node_fh, '<', $node_file)
        or die "can't open $node_file: $!";

my @lines =  <$node_fh>;

my %Files;

my $num_buckets = scalar @hosts;

my $per_bucket = int( @lines / $num_buckets );
my $num_extras =      @lines % $num_buckets;
my $path = "/home/user/vinod/test/";

for my $bucket_num (0..$num_buckets-1) {
   my $num_lines = $per_bucket;
   if ($num_extras) {
      ++$num_lines;
      --$num_extras;
   }

   last if($num_lines == 0);
   my $qfn = $path."node_list_${bucket_num}.txt";
   open(my $fh, '>', $qfn)
      or die("Can't create \"$qfn\": $!\n");

   $fh->print(splice(@lines, 0, $num_lines));
   $Files{$bucket_num} = $qfn;
}
print Dumper(\%Files);

my $command = #"defining my command here";

my $pm = Parallel::ForkManager->new(5);
my $ssh;

DATA_LOOP:
foreach my $n (0..$num_buckets-1) {
    if( exists $Files{$n} ) {
        my $pid = $pm->start and next DATA_LOOP;

        $command_to_execute = $command." ".$Files{$n};
        $ssh = SSH_Connection( $hosts[$n-1], "user", "password" );
        $result = $ssh->capture($command_to_execute);
      
        $pm->finish;       
    }
}
$pm->wait_all_children;
undef $ssh;

#SSH Connect
sub SSH_Connection {
    my ( $host, $user, $passwd ) = @_;
    my $ssh = Net::OpenSSH->new($host,
                                user => $user,
                                password => $passwd,
                                master_opts => [-o => "StrictHostKeyChecking=no"]
    );
    $ssh->error and die "Couldn't establish SSH connection: ". $ssh->error;
    return $ssh;
}

这里一切正常。

当我定义 $pm 对象时,并行进程设置为 5。

my $pm = Parallel::ForkManager->new(5);

这是否意味着在特定服务器(例如:192.168.0.1)中一次应该运行 5 个并行进程。意味着它应该从node_list_1.txt(共 13 个)文件中获取 5 个节点并执行命令?

我的理解正确吗?如果不是,那么在每个服务器中使用多线程并行运行命令的可能解决方案是什么?

【问题讨论】:

    标签: perl parallel-processing multitasking


    【解决方案1】:

    这是否意味着在特定服务器(例如:192.168.0.1)中一次应该运行 5 个并行进程。

    没有。 P::FM 对服务器一无所知。它管理进程,-&gt;new(5) 表示-&gt;start 将等待它创建的进程之一完成,然后再创建一个新进程,如果其中有 5 个仍在执行。

    在每个服务器中使用多线程并行运行命令的可能解决方案是什么?

    假设您通常指的是多任务而不是专门的多线程(因为您没有使用线程),可以按如下方式为每个主机创建一个进程:

    my %children;
    my $error = 0;
    for my $host (@hosts) {
        my $pid = fork();
        if (!defined($pid)) {
           warn("Can't execute on $host: Can't fork: $!\n");
           next;
        }
    
        if ($pid) {
           ++$children{$pid};
           next;
        }
    
        if (!eval {
           do_it($host);
           return 1;  # No exception
        }) {
           warn("Error executing commands on $host: $@");
        }
    }
    
    while (%children) {
       ( my $pid = wait() ) >= 0
          or die("Can't wait: $!\n");
    
       delete($children{$pid});   
    }
    

    【讨论】:

    • 感谢@ikegami 的回复。这如何在我的脚本中实现?因为我也需要 ssh 到主机。
    • do_it做任何你想做的事。
    • 如果我在do_it 中添加DATA_LOOP 部分,每个节点文件将为每个服务器运行,对吗?我的意思是4 * 4。我很困惑。
    • @zdim 如果我遍历存储桶,那么您的意思是说不需要使用Parallel::ForkManager。它将遍历每个主机并通过使用Net::OpenSSH 连接到服务器来运行命令。
    • @vkk05(我看错了,所以删除了这两个 cmets)--- 因为这段代码 forks(每个服务器的子进程)你可以使用你的 @ do_it() 内的 987654330@ 原样(带有P::FM 子进程)。
    【解决方案2】:

    如果您想在一堆不同的服务器上运行作业,请考虑使用适当的作业队列。 Perl 的Minion 非常好。各种服务器可以连接到它,以各种方式请求工作,并将结果发回。

    【讨论】:

    • 看起来很有趣。我可以知道在哪里可以在Minion 上获得更多信息/示例。
    【解决方案3】:

    您是否考虑过使用Net::OpenSSH::Parallel

    在我看来,它直接支持你想做的事情,并且能够并行处理大量连接,然后安排,处理错误和重试失败的命令等。

    更新但它是否允许我在每个主机内并行运行作业?

    但你真正想做的是什么?将工作分配给一组工人?在这种情况下,brian d foy 解决方案可能是更好的选择。

    无论如何,Net::OpenSSH::Parallel 从来没有打算这样做,但它可以做到:

    my @hosts = ...;
    my @tasks = ...;
    my $n_workers = 5;
    
    my $ossh = Net::OpenSSH::Parallel->new;
    
    for my $host (@hosts) {
      for my $ix (0..$n_workers) {
        $ossh->add_host("$host-$ix", host => $host);
      }
    }
    
    my $fetch_task = sub {
      my ($pssh, $label) = @_;
      if (defined (my $task = shift @tasks)) {
        $ossh->push($label, cmd => $task);
        $ossh->push($label, sub => $fetch_task);
      }
    }
    
    $ossh->push('*', sub => $fetch_task)
    
    $ossh->run
    

    【讨论】:

    • 当然。但它是否允许我在每个主机内并行运行作业?