【问题标题】:Guzzle Pool : Wait for RequestsGuzzle Pool : 等待请求
【发布时间】:2017-08-02 22:09:52
【问题描述】:

是否可以让 Guzzle 池等待请求?

现在我可以动态地将请求添加到池中,但是一旦池为空,guzzle 就会停止(显然)。

当我同时处理 10 个左右的页面时,这是一个问题,因为我的 requests 数组将是空的,直到结果 HTML 页面被处理并添加了新链接。

这是我的发电机:

$generator = function () {
  while ($request = array_shift($this->requests)) {
    if (isset($request['page'])) {
      $key = 'page_' . $request['page'];
    } else {
      $key = 'listing_' . $request['listing'];
    }

    yield $key => new Request('GET', $request['url']);                                          
  }
  echo "Exiting...\n";
  flush();
};

还有我的游泳池:

$pool = new Pool($this->client, $generator(), [
  'concurrency' => function() {
    return max(1, min(count($this->requests), 2));
  },
  'fulfilled' => function ($response, $index) {
      // new requests may be added to the $this->requests array here
  }
  //...
]);

$promise = $pool->promise();
$promise->wait();

@Alexey Shockov 回答后编辑的代码:

$generator = function() use ($headers) {
  while ($request = array_shift($this->requests)) {
    echo 'Requesting ' . $request['id'] . ': ' . $request['url'] . "\r\n";

    $r = new Request('GET', $request['url'], $headers);

    yield 'id_' . $request['id'] => $this->client->sendAsync($r)->then(function($response, $index) {
      echo 'In promise fulfillment ' . $index . "\r\n";
    }, function($reason, $index) {
      echo 'in rejected: ' . $index . "\r\n";
    });
  }
};

$promise = \GuzzleHttp\Promise\each_limit($generator(), 10, function() {
  echo 'fullfilled' . "\r\n";
  flush();
}, function($err) {
  echo 'rejected' . "\r\n";
  echo $err->getMessage();
  flush();
});
$promise->wait();

【问题讨论】:

    标签: php curl guzzle


    【解决方案1】:

    很遗憾,您不能使用生成器来做到这一点,只能使用自定义迭代器。

    我准备了a gist with the full example,但主要的想法只是创建一个迭代器,它会以两种方式改变其状态(它可以在结束后再次变为有效)。

    psysh 中的 ArrayIterator 示例:

    >>> $a = new ArrayIterator([1, 2])
    => ArrayIterator {#186
         +0: 1,
         +1: 2,
       }
    >>> $a->current()
    => 1
    >>> $a->next()
    => null
    >>> $a->current()
    => 2
    >>> $a->next()
    => null
    >>> $a->valid()
    => false
    >>> $a[] = 2
    => 2
    >>> $a->valid()
    => true
    >>> $a->current()
    => 2
    

    考虑到这个想法,我们可以将这样的动态迭代器传递给 Guzzle 并让它完成工作:

    // MapIterator mainly needed for readability.
    $generator = new MapIterator(
        // Initial data. This object will be always passed as the second parameter to the callback below
        new \ArrayIterator(['http://google.com']),
        function ($request, $array) use ($httpClient, $next) {
            return $httpClient->requestAsync('GET', $request)
                ->then(function (Response $response) use ($request, $array, $next) {
                    // The status code for example.
                    echo $request . ': ' . $response->getStatusCode() . PHP_EOL;
                    // New requests.
                    $array->append($next->shift());
                    $array->append($next->shift());
                });
        }
    );
    // The "magic".
    $generator = new ExpectingIterator($generator);
    // And the concurrent runner.
    $promise = \GuzzleHttp\Promise\each_limit($generator, 5);
    $promise->wait();
    

    正如我之前所说,完整示例在 the gist 中,MapIteratorExpectingIterator

    【讨论】:

    • 好的,谢谢,还有到 psysh 的链接(不知道)。请务必不要删除您的要点,因为它可能在将来对其他人有价值!
    • 这就是目标 ;) 我会尝试添加更多的 cmets,因为从第一个视图中并不清楚为什么需要 ExpectingIterator
    • @ncla,它没有。 Pool 类在内部使用与 each_limit() 相同的机制。我创建了一个单独的包(基于上面的代码)以尽可能简化事情,所以请看一下:github.com/alexeyshockov/guzzle-dynamic-pool/blob/master/…
    【解决方案2】:

    从问题看来,您可以将聚合回调直接移动到查询中。在这种情况下,池将始终等待您的处理代码,因此您可以随时添加新请求。

    生成器可以返回请求或承诺,并且承诺可以以不同的方式组合在一起。

    $generator = function () {
        while ($request = array_shift($this->requests)) {
            if (isset($request['page'])) {
                $key = 'page_' . $request['page'];
            } else {
                $key = 'listing_' . $request['listing'];
            }
    
            yield $this->client->sendAsync('GET', $request['url'])
                ->then(function (Response $response) use ($key) {
                /*
                 * The fullfillment callback is now connected to the query, so the 
                 * pool will wait for it.
                 * 
                 * $key is also available, because it's just a closure, so 
                 * no $index needed as an argument.
                 */
            });
        }
        echo "Exiting...\n";
        flush();
    };
    
    $promise = \GuzzleHttp\Promise\each_limit($generator(), [
        'concurrency' => function () {
            return max(1, min(count($this->requests), 2));
        },
        //...
    ]);
    
    $promise->wait();
    

    【讨论】:

    • 嗨,这似乎不起作用。我将其更改为 $this->client->requestAsync 但收到此错误: 致命错误:未捕获的 InvalidArgumentException:迭代器产生的每个值都必须是 Psr7\Http\Message\RequestInterface 或返回承诺的可调用Psr7\Message\Http\ResponseInterface 对象
    • 还尝试发出新请求,然后使用您提出的 sendAsync 方法(因为该方法需要请求,而不是您的示例),但不幸的是它也不起作用。
    • 是的,忘记了each_limit()Pool 之间的区别。更新了答案。基本上使用each_limit()函数而不是池更方便,因为它在里面包装了promise。
    • 请看我编辑的代码,必须稍微编辑一下 each_limit 函数的参数。但现在我得到了错误:函数 Scraper::{closure}() 的参数太少,第 203 行的 /vendor/guzzlehttp/promises/src/Promise.php 中传递了 1 个,并且正好是 2 个 expectedRequesting - 有什么想法吗?
    • 好吧,我错了,所以如果我只在我的已完成和已拒绝回调中添加 1 个参数,它就可以工作。但是,我知道请求 $index 非常重要。是否无法以某种方式获取该信息? Pool 对象可以做到这一点。没有它,each_limit 对我来说毫无用处。
    【解决方案3】:

    正如我之前所说,完整的示例在要点中,包括 MapIterator 和 ExpectingIterator

    Iterators dont 在 php

    另一方面,只要您在迭代器上使用 ->append 方法而不是 [] push,它就可以在早期版本的 php 上运行。

    【讨论】:

    • 谢谢,->append() 是正确的。更改了原来的答案,现在它与 PHP 5.x 兼容。
    【解决方案4】:

    答案是可以的。你只需要更多的发电机。并将您的请求解析和排队逻辑分离到异步设计中。而不是为请求使用数组,您的池将发出并等待它本身需要成为一个生成器,该生成器从您的初始列表中产生新请求并从解析的响应中添加请求,直到所有请求都被发送解析并发送结果请求并且已解析(重复)或遇到停止条件。

    【讨论】:

    • 我假设这正是我所拥有的。我添加到 $this->requests 数组中的任何内容都会被 $generator 自动使用。
    • 当你的数组为空时会发生什么?当数组中只有 1 个请求时,作为思考练习跟踪您的代码。当请求从数组中删除时,您的生成器是否有任何理由等待将任何内容添加回其中?
    • 但是 guzzle Pool 类只接受 1 个 $generator,所以当你说池本身需要是一个生成器时,不知道你在说什么?
    【解决方案5】:

    如果你可以使用postAsync/getAsync左右,你可以使用下面的骨架,

    function postInBulk($inputs)
    {
        $client = new Client([
            'base_uri' => 'https://a.b.com'
        ]);
        $headers = [
            'Authorization' => 'Bearer token_from_directus_user'
        ];
    
        $requests = function ($a) use ($client, $headers) {
            for ($i = 0; $i < count($a); $i++) {
                yield function() use ($client, $headers) {
                    return $client->postAsync('https://a.com/project/items/collection', [
                        'headers' => $headers,
                        'json' => [
                            "snippet" => "snippet",
                            "rank" => "1",
                            "status" => "published"
                        ]        
                    ]);
                };
            }
            
        };
    
        $pool = new Pool($client, $requests($inputs),[
            'concurrency' => 5,
            'fulfilled' => function (Response $response, $index) {
                // this is delivered each successful response
            },
            'rejected' => function (RequestException $reason, $index) {
                // this is delivered each failed request
            },
        ]);
    
        $pool->promise()->wait();
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-04-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-08-05
      • 2017-07-30
      相关资源
      最近更新 更多