RedisQueue.php 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Jenner
  5. * Date: 2015/8/20
  6. * Time: 15:03
  7. */
  8. namespace Jenner\SimpleFork\Queue;
  9. /**
  10. * redis queue
  11. *
  12. * @package Jenner\SimpleFork\Queue
  13. */
  14. class RedisQueue implements QueueInterface
  15. {
  16. /**
  17. * @var \Redis
  18. */
  19. protected $redis;
  20. /**
  21. * @var string redis key of queue
  22. */
  23. protected $channel;
  24. /**
  25. * @param string $host redis server host
  26. * @param int $port redis server port
  27. * @param int $database redis server database num
  28. * @param string $channel redis queue key
  29. * @param string $prefix prefix of redis queue key
  30. */
  31. public function __construct(
  32. $host = '127.0.0.1',
  33. $port = 6379,
  34. $database = 0,
  35. $channel = 'cache',
  36. $prefix = 'simple-fork-'
  37. )
  38. {
  39. $this->redis = new \Redis();
  40. $connection_result = $this->redis->connect($host, $port);
  41. if (!$connection_result) {
  42. throw new \RuntimeException('can not connect to the redis server');
  43. }
  44. if ($database != 0) {
  45. $select_result = $this->redis->select($database);
  46. if (!$select_result) {
  47. throw new \RuntimeException('can not select the database');
  48. }
  49. }
  50. if (empty($channel)) {
  51. throw new \InvalidArgumentException('channel can not be empty');
  52. }
  53. $this->channel = $channel;
  54. if (empty($prefix)) return;
  55. $set_option_result = $this->redis->setOption(\Redis::OPT_PREFIX, $prefix);
  56. if (!$set_option_result) {
  57. throw new \RuntimeException('can not set the \Redis::OPT_PREFIX Option');
  58. }
  59. }
  60. /**
  61. * put value into the queue
  62. *
  63. * @param $value
  64. * @return bool
  65. */
  66. public function put($value)
  67. {
  68. if ($this->redis->lPush($this->channel, $value) !== false) {
  69. return true;
  70. }
  71. return false;
  72. }
  73. /**
  74. * get value from the queue
  75. *
  76. * @param bool $block if block when the queue is empty
  77. * @return bool|string
  78. */
  79. public function get($block = false)
  80. {
  81. if (!$block) {
  82. return $this->redis->rPop($this->channel);
  83. } else {
  84. while (true) {
  85. $record = $this->redis->rPop($this->channel);
  86. if ($record === false) {
  87. usleep(1000);
  88. continue;
  89. }
  90. return $record;
  91. }
  92. }
  93. }
  94. /**
  95. * get the size of the queue
  96. *
  97. * @return int
  98. */
  99. public function size()
  100. {
  101. return $this->redis->lSize($this->channel);
  102. }
  103. /**
  104. * remove the queue resource
  105. *
  106. * @return mixed
  107. */
  108. public function remove()
  109. {
  110. return $this->redis->delete($this->channel);
  111. }
  112. /**
  113. * close the connection
  114. */
  115. public function __destruct()
  116. {
  117. $this->close();
  118. }
  119. /**
  120. * close the connection
  121. */
  122. public function close()
  123. {
  124. $this->redis->close();
  125. }
  126. }