Subversion Repositories cheapmusic

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
103 - 1
<?php
2
namespace GuzzleHttp\Promise;
3
 
4
/**
5
 * Represents a promise that iterates over many promises and invokes
6
 * side-effect functions in the process.
7
 */
8
class EachPromise implements PromisorInterface
9
{
10
    private $pending = [];
11
 
12
    /** @var \Iterator */
13
    private $iterable;
14
 
15
    /** @var callable|int */
16
    private $concurrency;
17
 
18
    /** @var callable */
19
    private $onFulfilled;
20
 
21
    /** @var callable */
22
    private $onRejected;
23
 
24
    /** @var Promise */
25
    private $aggregate;
26
 
27
    /** @var bool */
28
    private $mutex;
29
 
30
    /**
31
     * Configuration hash can include the following key value pairs:
32
     *
33
     * - fulfilled: (callable) Invoked when a promise fulfills. The function
34
     *   is invoked with three arguments: the fulfillment value, the index
35
     *   position from the iterable list of the promise, and the aggregate
36
     *   promise that manages all of the promises. The aggregate promise may
37
     *   be resolved from within the callback to short-circuit the promise.
38
     * - rejected: (callable) Invoked when a promise is rejected. The
39
     *   function is invoked with three arguments: the rejection reason, the
40
     *   index position from the iterable list of the promise, and the
41
     *   aggregate promise that manages all of the promises. The aggregate
42
     *   promise may be resolved from within the callback to short-circuit
43
     *   the promise.
44
     * - concurrency: (integer) Pass this configuration option to limit the
45
     *   allowed number of outstanding concurrently executing promises,
46
     *   creating a capped pool of promises. There is no limit by default.
47
     *
48
     * @param mixed    $iterable Promises or values to iterate.
49
     * @param array    $config   Configuration options
50
     */
51
    public function __construct($iterable, array $config = [])
52
    {
53
        $this->iterable = iter_for($iterable);
54
 
55
        if (isset($config['concurrency'])) {
56
            $this->concurrency = $config['concurrency'];
57
        }
58
 
59
        if (isset($config['fulfilled'])) {
60
            $this->onFulfilled = $config['fulfilled'];
61
        }
62
 
63
        if (isset($config['rejected'])) {
64
            $this->onRejected = $config['rejected'];
65
        }
66
    }
67
 
68
    public function promise()
69
    {
70
        if ($this->aggregate) {
71
            return $this->aggregate;
72
        }
73
 
74
        try {
75
            $this->createPromise();
76
            $this->iterable->rewind();
77
            $this->refillPending();
78
        } catch (\Throwable $e) {
79
            $this->aggregate->reject($e);
80
        } catch (\Exception $e) {
81
            $this->aggregate->reject($e);
82
        }
83
 
84
        return $this->aggregate;
85
    }
86
 
87
    private function createPromise()
88
    {
89
        $this->mutex = false;
90
        $this->aggregate = new Promise(function () {
91
            reset($this->pending);
92
            if (empty($this->pending) && !$this->iterable->valid()) {
93
                $this->aggregate->resolve(null);
94
                return;
95
            }
96
 
97
            // Consume a potentially fluctuating list of promises while
98
            // ensuring that indexes are maintained (precluding array_shift).
99
            while ($promise = current($this->pending)) {
100
                next($this->pending);
101
                $promise->wait();
102
                if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
103
                    return;
104
                }
105
            }
106
        });
107
 
108
        // Clear the references when the promise is resolved.
109
        $clearFn = function () {
110
            $this->iterable = $this->concurrency = $this->pending = null;
111
            $this->onFulfilled = $this->onRejected = null;
112
        };
113
 
114
        $this->aggregate->then($clearFn, $clearFn);
115
    }
116
 
117
    private function refillPending()
118
    {
119
        if (!$this->concurrency) {
120
            // Add all pending promises.
121
            while ($this->addPending() && $this->advanceIterator());
122
            return;
123
        }
124
 
125
        // Add only up to N pending promises.
126
        $concurrency = is_callable($this->concurrency)
127
            ? call_user_func($this->concurrency, count($this->pending))
128
            : $this->concurrency;
129
        $concurrency = max($concurrency - count($this->pending), 0);
130
        // Concurrency may be set to 0 to disallow new promises.
131
        if (!$concurrency) {
132
            return;
133
        }
134
        // Add the first pending promise.
135
        $this->addPending();
136
        // Note this is special handling for concurrency=1 so that we do
137
        // not advance the iterator after adding the first promise. This
138
        // helps work around issues with generators that might not have the
139
        // next value to yield until promise callbacks are called.
140
        while (--$concurrency
141
            && $this->advanceIterator()
142
            && $this->addPending());
143
    }
144
 
145
    private function addPending()
146
    {
147
        if (!$this->iterable || !$this->iterable->valid()) {
148
            return false;
149
        }
150
 
151
        $promise = promise_for($this->iterable->current());
152
        $idx = $this->iterable->key();
153
 
154
        $this->pending[$idx] = $promise->then(
155
            function ($value) use ($idx) {
156
                if ($this->onFulfilled) {
157
                    call_user_func(
158
                        $this->onFulfilled, $value, $idx, $this->aggregate
159
                    );
160
                }
161
                $this->step($idx);
162
            },
163
            function ($reason) use ($idx) {
164
                if ($this->onRejected) {
165
                    call_user_func(
166
                        $this->onRejected, $reason, $idx, $this->aggregate
167
                    );
168
                }
169
                $this->step($idx);
170
            }
171
        );
172
 
173
        return true;
174
    }
175
 
176
    private function advanceIterator()
177
    {
178
        // Place a lock on the iterator so that we ensure to not recurse,
179
        // preventing fatal generator errors.
180
        if ($this->mutex) {
181
            return false;
182
        }
183
 
184
        $this->mutex = true;
185
 
186
        try {
187
            $this->iterable->next();
188
            $this->mutex = false;
189
            return true;
190
        } catch (\Throwable $e) {
191
            $this->aggregate->reject($e);
192
            $this->mutex = false;
193
            return false;
194
        } catch (\Exception $e) {
195
            $this->aggregate->reject($e);
196
            $this->mutex = false;
197
            return false;
198
        }
199
    }
200
 
201
    private function step($idx)
202
    {
203
        // If the promise was already resolved, then ignore this step.
204
        if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
205
            return;
206
        }
207
 
208
        unset($this->pending[$idx]);
209
 
210
        // Only refill pending promises if we are not locked, preventing the
211
        // EachPromise to recursively invoke the provided iterator, which
212
        // cause a fatal error: "Cannot resume an already running generator"
213
        if ($this->advanceIterator() && !$this->checkIfFinished()) {
214
            // Add more pending promises if possible.
215
            $this->refillPending();
216
        }
217
    }
218
 
219
    private function checkIfFinished()
220
    {
221
        if (!$this->pending && !$this->iterable->valid()) {
222
            // Resolve the promise if there's nothing left to do.
223
            $this->aggregate->resolve(null);
224
            return true;
225
        }
226
 
227
        return false;
228
    }
229
}