PipeQueue.php 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. <?php
  2. /**
  3. * @author Jenner <hypxm@qq.com>
  4. * @blog http://www.huyanping.cn
  5. * @license https://opensource.org/licenses/MIT MIT
  6. * @datetime: 2015/11/24 18:38
  7. */
  8. namespace Jenner\SimpleFork\Queue;
  9. class PipeQueue implements QueueInterface
  10. {
  11. /**
  12. * @var Pipe
  13. */
  14. protected $pipe;
  15. /**
  16. * @var bool
  17. */
  18. protected $block;
  19. /**
  20. * @param string $filename fifo filename
  21. * @param int $mode
  22. * @param bool $block if blocking
  23. */
  24. public function __construct($filename = '/tmp/simple-fork.pipe', $mode = 0666)
  25. {
  26. $this->pipe = new Pipe($filename, $mode);
  27. $this->block = false;
  28. $this->pipe->setBlock($this->block);
  29. }
  30. /**
  31. * put value into the queue of channel
  32. *
  33. * @param $value
  34. * @return bool
  35. */
  36. public function put($value)
  37. {
  38. $len = strlen($value);
  39. if ($len > 2147483647) {
  40. throw new \RuntimeException('value is too long');
  41. }
  42. $raw = pack('N', $len) . $value;
  43. $write_len = $this->pipe->write($raw);
  44. return $write_len == strlen($raw);
  45. }
  46. /**
  47. * get value from the queue of channel
  48. *
  49. * @param bool $block if block when the queue is empty
  50. * @return bool|string
  51. */
  52. public function get($block = false)
  53. {
  54. if ($this->block != $block) {
  55. $this->pipe->setBlock($block);
  56. $this->block = $block;
  57. }
  58. $len = $this->pipe->read(4);
  59. if ($len === false) {
  60. throw new \RuntimeException('read pipe failed');
  61. }
  62. if (strlen($len) === 0) {
  63. return null;
  64. }
  65. $len = unpack('N', $len);
  66. if (empty($len) || !array_key_exists(1, $len) || empty($len[1])) {
  67. throw new \RuntimeException('data protocol error');
  68. }
  69. $len = intval($len[1]);
  70. $value = '';
  71. while (true) {
  72. $temp = $this->pipe->read($len);
  73. if (strlen($temp) == $len) {
  74. return $temp;
  75. }
  76. $value .= $temp;
  77. $len -= strlen($temp);
  78. if ($len == 0) {
  79. return $value;
  80. }
  81. }
  82. }
  83. /**
  84. * remove the queue resource
  85. *
  86. * @return bool
  87. */
  88. public function remove()
  89. {
  90. $this->pipe->close();
  91. $this->pipe->remove();
  92. }
  93. }