Subversion Repositories cheapmusic

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
103 - 1
<?php
2
namespace GuzzleHttp;
3
 
4
use GuzzleHttp\Promise\PromisorInterface;
5
use Psr\Http\Message\RequestInterface;
6
use GuzzleHttp\Promise\EachPromise;
7
 
8
/**
9
 * Sends and iterator of requests concurrently using a capped pool size.
10
 *
11
 * The pool will read from an iterator until it is cancelled or until the
12
 * iterator is consumed. When a request is yielded, the request is sent after
13
 * applying the "request_options" request options (if provided in the ctor).
14
 *
15
 * When a function is yielded by the iterator, the function is provided the
16
 * "request_options" array that should be merged on top of any existing
17
 * options, and the function MUST then return a wait-able promise.
18
 */
19
class Pool implements PromisorInterface
20
{
21
    /** @var EachPromise */
22
    private $each;
23
 
24
    /**
25
     * @param ClientInterface $client   Client used to send the requests.
26
     * @param array|\Iterator $requests Requests or functions that return
27
     *                                  requests to send concurrently.
28
     * @param array           $config   Associative array of options
29
     *     - concurrency: (int) Maximum number of requests to send concurrently
30
     *     - options: Array of request options to apply to each request.
31
     *     - fulfilled: (callable) Function to invoke when a request completes.
32
     *     - rejected: (callable) Function to invoke when a request is rejected.
33
     */
34
    public function __construct(
35
        ClientInterface $client,
36
        $requests,
37
        array $config = []
38
    ) {
39
        // Backwards compatibility.
40
        if (isset($config['pool_size'])) {
41
            $config['concurrency'] = $config['pool_size'];
42
        } elseif (!isset($config['concurrency'])) {
43
            $config['concurrency'] = 25;
44
        }
45
 
46
        if (isset($config['options'])) {
47
            $opts = $config['options'];
48
            unset($config['options']);
49
        } else {
50
            $opts = [];
51
        }
52
 
53
        $iterable = \GuzzleHttp\Promise\iter_for($requests);
54
        $requests = function () use ($iterable, $client, $opts) {
55
            foreach ($iterable as $key => $rfn) {
56
                if ($rfn instanceof RequestInterface) {
57
                    yield $key => $client->sendAsync($rfn, $opts);
58
                } elseif (is_callable($rfn)) {
59
                    yield $key => $rfn($opts);
60
                } else {
61
                    throw new \InvalidArgumentException('Each value yielded by '
62
                        . 'the iterator must be a Psr7\Http\Message\RequestInterface '
63
                        . 'or a callable that returns a promise that fulfills '
64
                        . 'with a Psr7\Message\Http\ResponseInterface object.');
65
                }
66
            }
67
        };
68
 
69
        $this->each = new EachPromise($requests(), $config);
70
    }
71
 
72
    public function promise()
73
    {
74
        return $this->each->promise();
75
    }
76
 
77
    /**
78
     * Sends multiple requests concurrently and returns an array of responses
79
     * and exceptions that uses the same ordering as the provided requests.
80
     *
81
     * IMPORTANT: This method keeps every request and response in memory, and
82
     * as such, is NOT recommended when sending a large number or an
83
     * indeterminate number of requests concurrently.
84
     *
85
     * @param ClientInterface $client   Client used to send the requests
86
     * @param array|\Iterator $requests Requests to send concurrently.
87
     * @param array           $options  Passes through the options available in
88
     *                                  {@see GuzzleHttp\Pool::__construct}
89
     *
90
     * @return array Returns an array containing the response or an exception
91
     *               in the same order that the requests were sent.
92
     * @throws \InvalidArgumentException if the event format is incorrect.
93
     */
94
    public static function batch(
95
        ClientInterface $client,
96
        $requests,
97
        array $options = []
98
    ) {
99
        $res = [];
100
        self::cmpCallback($options, 'fulfilled', $res);
101
        self::cmpCallback($options, 'rejected', $res);
102
        $pool = new static($client, $requests, $options);
103
        $pool->promise()->wait();
104
        ksort($res);
105
 
106
        return $res;
107
    }
108
 
109
    private static function cmpCallback(array &$options, $name, array &$results)
110
    {
111
        if (!isset($options[$name])) {
112
            $options[$name] = function ($v, $k) use (&$results) {
113
                $results[$k] = $v;
114
            };
115
        } else {
116
            $currentFn = $options[$name];
117
            $options[$name] = function ($v, $k) use (&$results, $currentFn) {
118
                $currentFn($v, $k);
119
                $results[$k] = $v;
120
            };
121
        }
122
    }
123
}