Parallel.php 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\Utils;
  12. use Hyperf\Utils\Exception\ParallelExecutionException;
  13. use Swoole\Coroutine\Channel;
  14. class Parallel
  15. {
  16. /**
  17. * @var callable[]
  18. */
  19. private $callbacks = [];
  20. /**
  21. * @var null|Channel
  22. */
  23. private $concurrentChannel;
  24. private $results = [];
  25. /**
  26. * @var \Throwable[]
  27. */
  28. private $throwables = [];
  29. /**
  30. * @param int $concurrent if $concurrent is equal to 0, that means unlimit
  31. */
  32. public function __construct(int $concurrent = 0)
  33. {
  34. if ($concurrent > 0) {
  35. $this->concurrentChannel = new Channel($concurrent);
  36. }
  37. }
  38. public function add(callable $callable, $key = null)
  39. {
  40. if (is_null($key)) {
  41. $this->callbacks[] = $callable;
  42. } else {
  43. $this->callbacks[$key] = $callable;
  44. }
  45. }
  46. public function wait(bool $throw = true): array
  47. {
  48. $wg = new WaitGroup();
  49. $wg->add(count($this->callbacks));
  50. foreach ($this->callbacks as $key => $callback) {
  51. $this->concurrentChannel && $this->concurrentChannel->push(true);
  52. $this->results[$key] = null;
  53. Coroutine::create(function () use ($callback, $key, $wg) {
  54. try {
  55. $this->results[$key] = $callback();
  56. } catch (\Throwable $throwable) {
  57. $this->throwables[$key] = $throwable;
  58. unset($this->results[$key]);
  59. } finally {
  60. $this->concurrentChannel && $this->concurrentChannel->pop();
  61. $wg->done();
  62. }
  63. });
  64. }
  65. $wg->wait();
  66. if ($throw && ($throwableCount = count($this->throwables)) > 0) {
  67. $message = 'Detecting ' . $throwableCount . ' throwable occurred during parallel execution:' . PHP_EOL . $this->formatThrowables($this->throwables);
  68. $executionException = new ParallelExecutionException($message);
  69. $executionException->setResults($this->results);
  70. $executionException->setThrowables($this->throwables);
  71. unset($this->results, $this->throwables);
  72. throw $executionException;
  73. }
  74. return $this->results;
  75. }
  76. public function count(): int
  77. {
  78. return count($this->callbacks);
  79. }
  80. public function clear(): void
  81. {
  82. $this->callbacks = [];
  83. $this->results = [];
  84. $this->throwables = [];
  85. }
  86. /**
  87. * Format throwables into a nice list.
  88. *
  89. * @param \Throwable[] $throwables
  90. */
  91. private function formatThrowables(array $throwables): string
  92. {
  93. $output = '';
  94. foreach ($throwables as $key => $value) {
  95. $output .= \sprintf('(%s) %s: %s' . PHP_EOL . '%s' . PHP_EOL, $key, get_class($value), $value->getMessage(), $value->getTraceAsString());
  96. }
  97. return $output;
  98. }
  99. }