ParallelPool.php 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. <?php
  2. /**
  3. * @author Jenner <hypxm@qq.com>
  4. * @blog http://www.huyanping.cn
  5. * @license https://opensource.org/licenses/MIT MIT
  6. * @datetime: 2015/11/19 20:49
  7. */
  8. namespace Jenner\SimpleFork;
  9. /**
  10. * parallel pool
  11. *
  12. * @package Jenner\SimpleFork
  13. */
  14. class ParallelPool extends AbstractPool
  15. {
  16. /**
  17. * @var callable|Runnable sub process callback
  18. */
  19. protected $runnable;
  20. /**
  21. * @var int max process count
  22. */
  23. protected $max;
  24. /**
  25. * @param callable|Runnable $callback
  26. * @param int $max
  27. */
  28. public function __construct($callback, $max = 4)
  29. {
  30. if (!is_callable($callback) && !($callback instanceof Runnable)) {
  31. throw new \InvalidArgumentException('callback must be a callback function or a object of Runnalbe');
  32. }
  33. $this->runnable = $callback;
  34. $this->max = $max;
  35. }
  36. /**
  37. * start the same number processes and kill the old sub process
  38. * just like nginx -s reload
  39. * this method will block until all the old process exit;
  40. *
  41. * @param bool $block
  42. */
  43. public function reload($block = true)
  44. {
  45. $old_processes = $this->processes;
  46. for ($i = 0; $i < $this->max; $i++) {
  47. $process = new Process($this->runnable);
  48. $process->start();
  49. $this->processes[$process->getPid()] = $process;
  50. }
  51. foreach ($old_processes as $process) {
  52. $process->shutdown();
  53. $process->wait($block);
  54. unset($this->processes[$process->getPid()]);
  55. }
  56. }
  57. /**
  58. * keep sub process count
  59. *
  60. * @param bool $block block the master process
  61. * to keep the sub process count all the time
  62. * @param int $interval check time interval
  63. */
  64. public function keep($block = false, $interval = 100)
  65. {
  66. do {
  67. $this->start();
  68. // recycle sub process and delete the processes
  69. // which are not running from process list
  70. foreach ($this->processes as $process) {
  71. if (!$process->isRunning()) {
  72. unset($this->processes[$process->getPid()]);
  73. }
  74. }
  75. $block ? usleep($interval) : null;
  76. } while ($block);
  77. }
  78. /**
  79. * start the pool
  80. */
  81. public function start()
  82. {
  83. $alive_count = $this->aliveCount();
  84. // create sub process and run
  85. if ($alive_count < $this->max) {
  86. $need = $this->max - $alive_count;
  87. for ($i = 0; $i < $need; $i++) {
  88. $process = new Process($this->runnable);
  89. $process->start();
  90. $this->processes[$process->getPid()] = $process;
  91. }
  92. }
  93. }
  94. }