【问题标题】:Executing functions in parallel并行执行功能
【发布时间】:2012-03-13 12:53:27
【问题描述】:

我有一个函数需要从一个数组中遍历大约 20K 行,并对每个行应用一个外部脚本。这是一个缓慢的过程,因为 PHP 在继续下一行之前等待脚本执行。

为了让这个过程更快,我考虑在不同的部分同时运行这个函数。因此,例如,第 0 到 2000 行作为一个函数,2001 到 4000 行作为另一个函数,依此类推。我怎样才能以一种简洁的方式做到这一点?我可以制作不同的 cron 作业,每个函数使用不同的参数:myFunction(0, 2000),然后是 myFunction(2001, 4000) 等的另一个 cron 作业,但这似乎不太干净。这样做的好方法是什么?

【问题讨论】:

  • 我曾经遇到过类似的情况。我发现在 php 中分叉是相当痛苦的,所以我在 bash 中做到了。为脚本的每个实例传递 start 和 stop 参数,并在服务器的后台运行它们。我相信 Exec 就是你要找的东西。

标签: php parallel-processing


【解决方案1】:

如果您想在 PHP 中执行并行任务,我会考虑使用Gearman。另一种方法是使用pcntl_fork(),但我更喜欢基于任务的实际工作人员。

【讨论】:

  • 如果我无法访问 php 配置(我在共享服务器上),你会建议我怎么做?
【解决方案2】:

唯一的等待时间是在获取数据和处理数据之间。无论如何,处理数据实际上是完全阻塞的(你只需要等待它)。除了将进程数量增加到您拥有的内核数量之外,您可能不会获得任何好处。基本上我认为这意味着进程的数量很少,因此调度 2-8 个进程的执行听起来并不可怕。如果您担心在检索数据时无法处理数据,理论上您可以以小块的形式从数据库中获取数据,然后在几个进程之间分配处理负载,每个内核一个。

我认为我更符合实际运行处理线程的分叉子进程方法。在 pcntl_fork 文档页面上的 cmets 中有一个精彩的演示,展示了作业守护程序类的实现

http://php.net/manual/en/function.pcntl-fork.php

<?php 
declare(ticks=1); 
//A very basic job daemon that you can extend to your needs. 
class JobDaemon{ 

    public $maxProcesses = 25; 
    protected $jobsStarted = 0; 
    protected $currentJobs = array(); 
    protected $signalQueue=array();   
    protected $parentPID; 

    public function __construct(){ 
        echo "constructed \n"; 
        $this->parentPID = getmypid(); 
        pcntl_signal(SIGCHLD, array($this, "childSignalHandler")); 
    } 

    /** 
    * Run the Daemon 
    */ 
    public function run(){ 
        echo "Running \n"; 
        for($i=0; $i<10000; $i++){ 
            $jobID = rand(0,10000000000000); 

            while(count($this->currentJobs) >= $this->maxProcesses){ 
               echo "Maximum children allowed, waiting...\n"; 
               sleep(1); 
            } 

            $launched = $this->launchJob($jobID); 
        } 

        //Wait for child processes to finish before exiting here 
        while(count($this->currentJobs)){ 
            echo "Waiting for current jobs to finish... \n"; 
            sleep(1); 
        } 
    } 

    /** 
    * Launch a job from the job queue 
    */ 
    protected function launchJob($jobID){ 
        $pid = pcntl_fork(); 
        if($pid == -1){ 
            //Problem launching the job 
            error_log('Could not launch new job, exiting'); 
            return false; 
        } 
        else if ($pid){ 
            // Parent process 
            // Sometimes you can receive a signal to the childSignalHandler function before this code executes if 
            // the child script executes quickly enough! 
            // 
            $this->currentJobs[$pid] = $jobID; 

            // In the event that a signal for this pid was caught before we get here, it will be in our signalQueue array 
            // So let's go ahead and process it now as if we'd just received the signal 
            if(isset($this->signalQueue[$pid])){ 
                echo "found $pid in the signal queue, processing it now \n"; 
                $this->childSignalHandler(SIGCHLD, $pid, $this->signalQueue[$pid]); 
                unset($this->signalQueue[$pid]); 
            } 
        } 
        else{ 
            //Forked child, do your deeds.... 
            $exitStatus = 0; //Error code if you need to or whatever 
            echo "Doing something fun in pid ".getmypid()."\n"; 
            exit($exitStatus); 
        } 
        return true; 
    } 

    public function childSignalHandler($signo, $pid=null, $status=null){ 

        //If no pid is provided, that means we're getting the signal from the system.  Let's figure out 
        //which child process ended 
        if(!$pid){ 
            $pid = pcntl_waitpid(-1, $status, WNOHANG); 
        } 

        //Make sure we get all of the exited children 
        while($pid > 0){ 
            if($pid && isset($this->currentJobs[$pid])){ 
                $exitCode = pcntl_wexitstatus($status); 
                if($exitCode != 0){ 
                    echo "$pid exited with status ".$exitCode."\n"; 
                } 
                unset($this->currentJobs[$pid]); 
            } 
            else if($pid){ 
                //Oh no, our job has finished before this parent process could even note that it had been launched! 
                //Let's make note of it and handle it when the parent process is ready for it 
                echo "..... Adding $pid to the signal queue ..... \n"; 
                $this->signalQueue[$pid] = $status; 
            } 
            $pid = pcntl_waitpid(-1, $status, WNOHANG); 
        } 
        return true; 
    } 
}

【讨论】:

    【解决方案3】:

    你可以使用“PTHREADS”

    非常容易安装并且在 Windows 上运行良好

    从这里下载 -> http://windows.php.net/downloads/pecl/releases/pthreads/2.0.4/

    解压 zip 文件,然后

    • 将文件 'php_pthreads.dll' 移动到 php\ext\ 目录。

    • 将文件“pthreadVC2.dll”移动到 php\ 目录。

    然后在你的 'php.ini' 文件中添加这一行:

    extension=php_pthreads.dll
    

    保存文件。

    你刚刚完成 :-)

    现在让我们看看如何使用它的示例:

    class ChildThread extends Thread {
        public $data;
    
        public function run() {
            /* Do some expensive work */
    
            $this->data = 'result of expensive work';
        }
    }
    
    $thread = new ChildThread();
    
    if ($thread->start()) {     
        /*
         * Do some expensive work, while already doing other
         * work in the child thread.
         */
    
        // wait until thread is finished
        $thread->join();
    
        // we can now even access $thread->data
    }
    

    有关 PTHREADS 的更多信息,请在此处阅读 php 文档:

    PHP DOCS PTHREADS

    • 如果您像我一样使用 WAMP,那么您应该将“pthreadVC2.dll”添加到 \wamp\bin\apache\ApacheX.X.X\bin 并编辑'php.ini'文件(相同的路径)并添加与之前相同的行

      extension=php_pthreads.dll

    祝你好运!

    【讨论】:

      【解决方案4】:

      您正在寻找的是parallel,这是一个用于PHP 7.2+

      的简洁并发API
      $runtime = new \parallel\Runtime();
      
      $future = $runtime->run(function() {
          for ($i = 0; $i < 500; $i++) {
              echo "*";
          }
      
          return "easy";
      });
      
      for ($i = 0; $i < 500; $i++) {
          echo ".";
      }
      
      printf("\nUsing \\parallel\\Runtime is %s\n", $future->value());
      

      输出

      .*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*..*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*
      Using \parallel\Runtime is easy
      

      【讨论】:

        【解决方案5】:

        看看pcntl_fork。这允许您生成子进程,然后这些子进程可以完成您需要的单独工作。

        【讨论】:

          【解决方案6】:

          不确定是否适合您的情况,但您可以将系统调用的输出重定向到文件,因此 PHP 不会等到程序完成。虽然这可能会导致服务器过载。

          http://www.php.net/manual/en/function.exec.php - 如果一个程序使用这个函数启动,为了让它在后台继续运行,程序的输出必须重定向到一个文件或另一个输出流。否则会导致 PHP 挂起,直到程序执行结束。

          【讨论】:

          • 链接失效了。
          【解决方案7】:

          Guzzle 的 concurrent requests

          use GuzzleHttp\Client;
          use GuzzleHttp\Promise;
          
          $client = new Client(['base_uri' => 'http://httpbin.org/']);
          
          $promises = [
              'image' => $client->getAsync('/image'),
              'png'   => $client->getAsync('/image/png'),
              'jpeg'  => $client->getAsync('/image/jpeg'),
              'webp'  => $client->getAsync('/image/webp')
          ];
          
          $responses = Promise\Utils::unwrap($promises);
          

          promise 的开销;但更重要的是,Guzzle 仅适用于 HTTP 请求,并且适用于 7+ 版本和 Laravel 等框架。

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2021-03-06
            • 2019-06-25
            • 1970-01-01
            • 2022-11-26
            • 1970-01-01
            相关资源
            最近更新 更多