EachPromise.php 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. <?php
  2. namespace GuzzleHttp\Promise;
  3. /**
  4. * Represents a promise that iterates over many promises and invokes
  5. * side-effect functions in the process.
  6. */
  7. class EachPromise implements PromisorInterface
  8. {
  9. private $pending = [];
  10. private $nextPendingIndex = 0;
  11. /** @var \Iterator|null */
  12. private $iterable;
  13. /** @var callable|int|null */
  14. private $concurrency;
  15. /** @var callable|null */
  16. private $onFulfilled;
  17. /** @var callable|null */
  18. private $onRejected;
  19. /** @var Promise|null */
  20. private $aggregate;
  21. /** @var bool|null */
  22. private $mutex;
  23. /**
  24. * Configuration hash can include the following key value pairs:
  25. *
  26. * - fulfilled: (callable) Invoked when a promise fulfills. The function
  27. * is invoked with three arguments: the fulfillment value, the index
  28. * position from the iterable list of the promise, and the aggregate
  29. * promise that manages all of the promises. The aggregate promise may
  30. * be resolved from within the callback to short-circuit the promise.
  31. * - rejected: (callable) Invoked when a promise is rejected. The
  32. * function is invoked with three arguments: the rejection reason, the
  33. * index position from the iterable list of the promise, and the
  34. * aggregate promise that manages all of the promises. The aggregate
  35. * promise may be resolved from within the callback to short-circuit
  36. * the promise.
  37. * - concurrency: (integer) Pass this configuration option to limit the
  38. * allowed number of outstanding concurrently executing promises,
  39. * creating a capped pool of promises. There is no limit by default.
  40. *
  41. * @param mixed $iterable Promises or values to iterate.
  42. * @param array $config Configuration options
  43. */
  44. public function __construct($iterable, array $config = [])
  45. {
  46. $this->iterable = Create::iterFor($iterable);
  47. if (isset($config['concurrency'])) {
  48. $this->concurrency = $config['concurrency'];
  49. }
  50. if (isset($config['fulfilled'])) {
  51. $this->onFulfilled = $config['fulfilled'];
  52. }
  53. if (isset($config['rejected'])) {
  54. $this->onRejected = $config['rejected'];
  55. }
  56. }
  57. /** @psalm-suppress InvalidNullableReturnType */
  58. public function promise()
  59. {
  60. if ($this->aggregate) {
  61. return $this->aggregate;
  62. }
  63. try {
  64. $this->createPromise();
  65. /** @psalm-assert Promise $this->aggregate */
  66. $this->iterable->rewind();
  67. $this->refillPending();
  68. } catch (\Throwable $e) {
  69. /**
  70. * @psalm-suppress NullReference
  71. * @phpstan-ignore-next-line
  72. */
  73. $this->aggregate->reject($e);
  74. } catch (\Exception $e) {
  75. /**
  76. * @psalm-suppress NullReference
  77. * @phpstan-ignore-next-line
  78. */
  79. $this->aggregate->reject($e);
  80. }
  81. /**
  82. * @psalm-suppress NullableReturnStatement
  83. * @phpstan-ignore-next-line
  84. */
  85. return $this->aggregate;
  86. }
  87. private function createPromise()
  88. {
  89. $this->mutex = false;
  90. $this->aggregate = new Promise(function () {
  91. if ($this->checkIfFinished()) {
  92. return;
  93. }
  94. reset($this->pending);
  95. // Consume a potentially fluctuating list of promises while
  96. // ensuring that indexes are maintained (precluding array_shift).
  97. while ($promise = current($this->pending)) {
  98. next($this->pending);
  99. $promise->wait();
  100. if (Is::settled($this->aggregate)) {
  101. return;
  102. }
  103. }
  104. });
  105. // Clear the references when the promise is resolved.
  106. $clearFn = function () {
  107. $this->iterable = $this->concurrency = $this->pending = null;
  108. $this->onFulfilled = $this->onRejected = null;
  109. $this->nextPendingIndex = 0;
  110. };
  111. $this->aggregate->then($clearFn, $clearFn);
  112. }
  113. private function refillPending()
  114. {
  115. if (!$this->concurrency) {
  116. // Add all pending promises.
  117. while ($this->addPending() && $this->advanceIterator());
  118. return;
  119. }
  120. // Add only up to N pending promises.
  121. $concurrency = is_callable($this->concurrency)
  122. ? call_user_func($this->concurrency, count($this->pending))
  123. : $this->concurrency;
  124. $concurrency = max($concurrency - count($this->pending), 0);
  125. // Concurrency may be set to 0 to disallow new promises.
  126. if (!$concurrency) {
  127. return;
  128. }
  129. // Add the first pending promise.
  130. $this->addPending();
  131. // Note this is special handling for concurrency=1 so that we do
  132. // not advance the iterator after adding the first promise. This
  133. // helps work around issues with generators that might not have the
  134. // next value to yield until promise callbacks are called.
  135. while (--$concurrency
  136. && $this->advanceIterator()
  137. && $this->addPending());
  138. }
  139. private function addPending()
  140. {
  141. if (!$this->iterable || !$this->iterable->valid()) {
  142. return false;
  143. }
  144. $promise = Create::promiseFor($this->iterable->current());
  145. $key = $this->iterable->key();
  146. // Iterable keys may not be unique, so we use a counter to
  147. // guarantee uniqueness
  148. $idx = $this->nextPendingIndex++;
  149. $this->pending[$idx] = $promise->then(
  150. function ($value) use ($idx, $key) {
  151. if ($this->onFulfilled) {
  152. call_user_func(
  153. $this->onFulfilled,
  154. $value,
  155. $key,
  156. $this->aggregate
  157. );
  158. }
  159. $this->step($idx);
  160. },
  161. function ($reason) use ($idx, $key) {
  162. if ($this->onRejected) {
  163. call_user_func(
  164. $this->onRejected,
  165. $reason,
  166. $key,
  167. $this->aggregate
  168. );
  169. }
  170. $this->step($idx);
  171. }
  172. );
  173. return true;
  174. }
  175. private function advanceIterator()
  176. {
  177. // Place a lock on the iterator so that we ensure to not recurse,
  178. // preventing fatal generator errors.
  179. if ($this->mutex) {
  180. return false;
  181. }
  182. $this->mutex = true;
  183. try {
  184. $this->iterable->next();
  185. $this->mutex = false;
  186. return true;
  187. } catch (\Throwable $e) {
  188. $this->aggregate->reject($e);
  189. $this->mutex = false;
  190. return false;
  191. } catch (\Exception $e) {
  192. $this->aggregate->reject($e);
  193. $this->mutex = false;
  194. return false;
  195. }
  196. }
  197. private function step($idx)
  198. {
  199. // If the promise was already resolved, then ignore this step.
  200. if (Is::settled($this->aggregate)) {
  201. return;
  202. }
  203. unset($this->pending[$idx]);
  204. // Only refill pending promises if we are not locked, preventing the
  205. // EachPromise to recursively invoke the provided iterator, which
  206. // cause a fatal error: "Cannot resume an already running generator"
  207. if ($this->advanceIterator() && !$this->checkIfFinished()) {
  208. // Add more pending promises if possible.
  209. $this->refillPending();
  210. }
  211. }
  212. private function checkIfFinished()
  213. {
  214. if (!$this->pending && !$this->iterable->valid()) {
  215. // Resolve the promise if there's nothing left to do.
  216. $this->aggregate->resolve(null);
  217. return true;
  218. }
  219. return false;
  220. }
  221. }