Subversion Repositories cheapmusic

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
103 - 1
<?php
2
namespace GuzzleHttp\Promise;
3
 
4
/**
5
 * Get the global task queue used for promise resolution.
6
 *
7
 * This task queue MUST be run in an event loop in order for promises to be
8
 * settled asynchronously. It will be automatically run when synchronously
9
 * waiting on a promise.
10
 *
11
 * <code>
12
 * while ($eventLoop->isRunning()) {
13
 *     GuzzleHttp\Promise\queue()->run();
14
 * }
15
 * </code>
16
 *
17
 * @param TaskQueueInterface $assign Optionally specify a new queue instance.
18
 *
19
 * @return TaskQueueInterface
20
 */
21
function queue(TaskQueueInterface $assign = null)
22
{
23
    static $queue;
24
 
25
    if ($assign) {
26
        $queue = $assign;
27
    } elseif (!$queue) {
28
        $queue = new TaskQueue();
29
    }
30
 
31
    return $queue;
32
}
33
 
34
/**
35
 * Adds a function to run in the task queue when it is next `run()` and returns
36
 * a promise that is fulfilled or rejected with the result.
37
 *
38
 * @param callable $task Task function to run.
39
 *
40
 * @return PromiseInterface
41
 */
42
function task(callable $task)
43
{
44
    $queue = queue();
45
    $promise = new Promise([$queue, 'run']);
46
    $queue->add(function () use ($task, $promise) {
47
        try {
48
            $promise->resolve($task());
49
        } catch (\Throwable $e) {
50
            $promise->reject($e);
51
        } catch (\Exception $e) {
52
            $promise->reject($e);
53
        }
54
    });
55
 
56
    return $promise;
57
}
58
 
59
/**
60
 * Creates a promise for a value if the value is not a promise.
61
 *
62
 * @param mixed $value Promise or value.
63
 *
64
 * @return PromiseInterface
65
 */
66
function promise_for($value)
67
{
68
    if ($value instanceof PromiseInterface) {
69
        return $value;
70
    }
71
 
72
    // Return a Guzzle promise that shadows the given promise.
73
    if (method_exists($value, 'then')) {
74
        $wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null;
75
        $cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null;
76
        $promise = new Promise($wfn, $cfn);
77
        $value->then([$promise, 'resolve'], [$promise, 'reject']);
78
        return $promise;
79
    }
80
 
81
    return new FulfilledPromise($value);
82
}
83
 
84
/**
85
 * Creates a rejected promise for a reason if the reason is not a promise. If
86
 * the provided reason is a promise, then it is returned as-is.
87
 *
88
 * @param mixed $reason Promise or reason.
89
 *
90
 * @return PromiseInterface
91
 */
92
function rejection_for($reason)
93
{
94
    if ($reason instanceof PromiseInterface) {
95
        return $reason;
96
    }
97
 
98
    return new RejectedPromise($reason);
99
}
100
 
101
/**
102
 * Create an exception for a rejected promise value.
103
 *
104
 * @param mixed $reason
105
 *
106
 * @return \Exception|\Throwable
107
 */
108
function exception_for($reason)
109
{
110
    return $reason instanceof \Exception || $reason instanceof \Throwable
111
        ? $reason
112
        : new RejectionException($reason);
113
}
114
 
115
/**
116
 * Returns an iterator for the given value.
117
 *
118
 * @param mixed $value
119
 *
120
 * @return \Iterator
121
 */
122
function iter_for($value)
123
{
124
    if ($value instanceof \Iterator) {
125
        return $value;
126
    } elseif (is_array($value)) {
127
        return new \ArrayIterator($value);
128
    } else {
129
        return new \ArrayIterator([$value]);
130
    }
131
}
132
 
133
/**
134
 * Synchronously waits on a promise to resolve and returns an inspection state
135
 * array.
136
 *
137
 * Returns a state associative array containing a "state" key mapping to a
138
 * valid promise state. If the state of the promise is "fulfilled", the array
139
 * will contain a "value" key mapping to the fulfilled value of the promise. If
140
 * the promise is rejected, the array will contain a "reason" key mapping to
141
 * the rejection reason of the promise.
142
 *
143
 * @param PromiseInterface $promise Promise or value.
144
 *
145
 * @return array
146
 */
147
function inspect(PromiseInterface $promise)
148
{
149
    try {
150
        return [
151
            'state' => PromiseInterface::FULFILLED,
152
            'value' => $promise->wait()
153
        ];
154
    } catch (RejectionException $e) {
155
        return ['state' => PromiseInterface::REJECTED, 'reason' => $e->getReason()];
156
    } catch (\Throwable $e) {
157
        return ['state' => PromiseInterface::REJECTED, 'reason' => $e];
158
    } catch (\Exception $e) {
159
        return ['state' => PromiseInterface::REJECTED, 'reason' => $e];
160
    }
161
}
162
 
163
/**
164
 * Waits on all of the provided promises, but does not unwrap rejected promises
165
 * as thrown exception.
166
 *
167
 * Returns an array of inspection state arrays.
168
 *
169
 * @param PromiseInterface[] $promises Traversable of promises to wait upon.
170
 *
171
 * @return array
172
 * @see GuzzleHttp\Promise\inspect for the inspection state array format.
173
 */
174
function inspect_all($promises)
175
{
176
    $results = [];
177
    foreach ($promises as $key => $promise) {
178
        $results[$key] = inspect($promise);
179
    }
180
 
181
    return $results;
182
}
183
 
184
/**
185
 * Waits on all of the provided promises and returns the fulfilled values.
186
 *
187
 * Returns an array that contains the value of each promise (in the same order
188
 * the promises were provided). An exception is thrown if any of the promises
189
 * are rejected.
190
 *
191
 * @param mixed $promises Iterable of PromiseInterface objects to wait on.
192
 *
193
 * @return array
194
 * @throws \Exception on error
195
 * @throws \Throwable on error in PHP >=7
196
 */
197
function unwrap($promises)
198
{
199
    $results = [];
200
    foreach ($promises as $key => $promise) {
201
        $results[$key] = $promise->wait();
202
    }
203
 
204
    return $results;
205
}
206
 
207
/**
208
 * Given an array of promises, return a promise that is fulfilled when all the
209
 * items in the array are fulfilled.
210
 *
211
 * The promise's fulfillment value is an array with fulfillment values at
212
 * respective positions to the original array. If any promise in the array
213
 * rejects, the returned promise is rejected with the rejection reason.
214
 *
215
 * @param mixed $promises Promises or values.
216
 *
217
 * @return PromiseInterface
218
 */
219
function all($promises)
220
{
221
    $results = [];
222
    return each(
223
        $promises,
224
        function ($value, $idx) use (&$results) {
225
            $results[$idx] = $value;
226
        },
227
        function ($reason, $idx, Promise $aggregate) {
228
            $aggregate->reject($reason);
229
        }
230
    )->then(function () use (&$results) {
231
        ksort($results);
232
        return $results;
233
    });
234
}
235
 
236
/**
237
 * Initiate a competitive race between multiple promises or values (values will
238
 * become immediately fulfilled promises).
239
 *
240
 * When count amount of promises have been fulfilled, the returned promise is
241
 * fulfilled with an array that contains the fulfillment values of the winners
242
 * in order of resolution.
243
 *
244
 * This prommise is rejected with a {@see GuzzleHttp\Promise\AggregateException}
245
 * if the number of fulfilled promises is less than the desired $count.
246
 *
247
 * @param int   $count    Total number of promises.
248
 * @param mixed $promises Promises or values.
249
 *
250
 * @return PromiseInterface
251
 */
252
function some($count, $promises)
253
{
254
    $results = [];
255
    $rejections = [];
256
 
257
    return each(
258
        $promises,
259
        function ($value, $idx, PromiseInterface $p) use (&$results, $count) {
260
            if ($p->getState() !== PromiseInterface::PENDING) {
261
                return;
262
            }
263
            $results[$idx] = $value;
264
            if (count($results) >= $count) {
265
                $p->resolve(null);
266
            }
267
        },
268
        function ($reason) use (&$rejections) {
269
            $rejections[] = $reason;
270
        }
271
    )->then(
272
        function () use (&$results, &$rejections, $count) {
273
            if (count($results) !== $count) {
274
                throw new AggregateException(
275
                    'Not enough promises to fulfill count',
276
                    $rejections
277
                );
278
            }
279
            ksort($results);
280
            return array_values($results);
281
        }
282
    );
283
}
284
 
285
/**
286
 * Like some(), with 1 as count. However, if the promise fulfills, the
287
 * fulfillment value is not an array of 1 but the value directly.
288
 *
289
 * @param mixed $promises Promises or values.
290
 *
291
 * @return PromiseInterface
292
 */
293
function any($promises)
294
{
295
    return some(1, $promises)->then(function ($values) { return $values[0]; });
296
}
297
 
298
/**
299
 * Returns a promise that is fulfilled when all of the provided promises have
300
 * been fulfilled or rejected.
301
 *
302
 * The returned promise is fulfilled with an array of inspection state arrays.
303
 *
304
 * @param mixed $promises Promises or values.
305
 *
306
 * @return PromiseInterface
307
 * @see GuzzleHttp\Promise\inspect for the inspection state array format.
308
 */
309
function settle($promises)
310
{
311
    $results = [];
312
 
313
    return each(
314
        $promises,
315
        function ($value, $idx) use (&$results) {
316
            $results[$idx] = ['state' => PromiseInterface::FULFILLED, 'value' => $value];
317
        },
318
        function ($reason, $idx) use (&$results) {
319
            $results[$idx] = ['state' => PromiseInterface::REJECTED, 'reason' => $reason];
320
        }
321
    )->then(function () use (&$results) {
322
        ksort($results);
323
        return $results;
324
    });
325
}
326
 
327
/**
328
 * Given an iterator that yields promises or values, returns a promise that is
329
 * fulfilled with a null value when the iterator has been consumed or the
330
 * aggregate promise has been fulfilled or rejected.
331
 *
332
 * $onFulfilled is a function that accepts the fulfilled value, iterator
333
 * index, and the aggregate promise. The callback can invoke any necessary side
334
 * effects and choose to resolve or reject the aggregate promise if needed.
335
 *
336
 * $onRejected is a function that accepts the rejection reason, iterator
337
 * index, and the aggregate promise. The callback can invoke any necessary side
338
 * effects and choose to resolve or reject the aggregate promise if needed.
339
 *
340
 * @param mixed    $iterable    Iterator or array to iterate over.
341
 * @param callable $onFulfilled
342
 * @param callable $onRejected
343
 *
344
 * @return PromiseInterface
345
 */
346
function each(
347
    $iterable,
348
    callable $onFulfilled = null,
349
    callable $onRejected = null
350
) {
351
    return (new EachPromise($iterable, [
352
        'fulfilled' => $onFulfilled,
353
        'rejected'  => $onRejected
354
    ]))->promise();
355
}
356
 
357
/**
358
 * Like each, but only allows a certain number of outstanding promises at any
359
 * given time.
360
 *
361
 * $concurrency may be an integer or a function that accepts the number of
362
 * pending promises and returns a numeric concurrency limit value to allow for
363
 * dynamic a concurrency size.
364
 *
365
 * @param mixed        $iterable
366
 * @param int|callable $concurrency
367
 * @param callable     $onFulfilled
368
 * @param callable     $onRejected
369
 *
370
 * @return PromiseInterface
371
 */
372
function each_limit(
373
    $iterable,
374
    $concurrency,
375
    callable $onFulfilled = null,
376
    callable $onRejected = null
377
) {
378
    return (new EachPromise($iterable, [
379
        'fulfilled'   => $onFulfilled,
380
        'rejected'    => $onRejected,
381
        'concurrency' => $concurrency
382
    ]))->promise();
383
}
384
 
385
/**
386
 * Like each_limit, but ensures that no promise in the given $iterable argument
387
 * is rejected. If any promise is rejected, then the aggregate promise is
388
 * rejected with the encountered rejection.
389
 *
390
 * @param mixed        $iterable
391
 * @param int|callable $concurrency
392
 * @param callable     $onFulfilled
393
 *
394
 * @return PromiseInterface
395
 */
396
function each_limit_all(
397
    $iterable,
398
    $concurrency,
399
    callable $onFulfilled = null
400
) {
401
    return each_limit(
402
        $iterable,
403
        $concurrency,
404
        $onFulfilled,
405
        function ($reason, $idx, PromiseInterface $aggregate) {
406
            $aggregate->reject($reason);
407
        }
408
    );
409
}
410
 
411
/**
412
 * Returns true if a promise is fulfilled.
413
 *
414
 * @param PromiseInterface $promise
415
 *
416
 * @return bool
417
 */
418
function is_fulfilled(PromiseInterface $promise)
419
{
420
    return $promise->getState() === PromiseInterface::FULFILLED;
421
}
422
 
423
/**
424
 * Returns true if a promise is rejected.
425
 *
426
 * @param PromiseInterface $promise
427
 *
428
 * @return bool
429
 */
430
function is_rejected(PromiseInterface $promise)
431
{
432
    return $promise->getState() === PromiseInterface::REJECTED;
433
}
434
 
435
/**
436
 * Returns true if a promise is fulfilled or rejected.
437
 *
438
 * @param PromiseInterface $promise
439
 *
440
 * @return bool
441
 */
442
function is_settled(PromiseInterface $promise)
443
{
444
    return $promise->getState() !== PromiseInterface::PENDING;
445
}
446
 
447
/**
448
 * @see Coroutine
449
 *
450
 * @param callable $generatorFn
451
 *
452
 * @return PromiseInterface
453
 */
454
function coroutine(callable $generatorFn)
455
{
456
    return new Coroutine($generatorFn);
457
}