SystemVMessageQueue.php 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Jenner
  5. * Date: 2015/8/12
  6. * Time: 15:15
  7. */
  8. namespace Jenner\SimpleFork\Queue;
  9. /**
  10. * system v message queue
  11. *
  12. * @package Jenner\SimpleFork\Queue
  13. */
  14. class SystemVMessageQueue implements QueueInterface
  15. {
  16. /**
  17. * @var int channel
  18. */
  19. protected $msg_type;
  20. /**
  21. * @var
  22. */
  23. protected $queue;
  24. /**
  25. * @var bool
  26. */
  27. protected $serialize_needed;
  28. /**
  29. * @var bool
  30. */
  31. protected $block_send;
  32. /**
  33. * @var int
  34. */
  35. protected $option_receive;
  36. /**
  37. * @var int
  38. */
  39. protected $maxsize;
  40. /**
  41. * @var
  42. */
  43. protected $key_t;
  44. /**
  45. * @var string
  46. */
  47. protected $ipc_filename;
  48. /**
  49. * @param string $ipc_filename ipc file to make ipc key.
  50. * if it does not exists, it will try to create the file.
  51. * @param int $channel message type
  52. * @param bool $serialize_needed serialize or not
  53. * @param bool $block_send if block when the queue is full
  54. * @param int $option_receive if the value is MSG_IPC_NOWAIT it will not
  55. * going to wait a message coming. if the value is null,
  56. * it will block and wait a message
  57. * @param int $maxsize the max size of queue
  58. */
  59. public function __construct(
  60. $ipc_filename = __FILE__,
  61. $channel = 1,
  62. $serialize_needed = true,
  63. $block_send = true,
  64. $option_receive = MSG_IPC_NOWAIT,
  65. $maxsize = 100000
  66. )
  67. {
  68. $this->ipc_filename = $ipc_filename;
  69. $this->msg_type = $channel;
  70. $this->serialize_needed = $serialize_needed;
  71. $this->block_send = $block_send;
  72. $this->option_receive = $option_receive;
  73. $this->maxsize = $maxsize;
  74. $this->initQueue($ipc_filename, $channel);
  75. }
  76. /**
  77. * init queue
  78. *
  79. * @param $ipc_filename
  80. * @param $msg_type
  81. * @throws \Exception
  82. */
  83. protected function initQueue($ipc_filename, $msg_type)
  84. {
  85. $this->key_t = $this->getIpcKey($ipc_filename, $msg_type);
  86. $this->queue = \msg_get_queue($this->key_t);
  87. if (!$this->queue) throw new \RuntimeException('msg_get_queue failed');
  88. }
  89. /**
  90. * @param $ipc_filename
  91. * @param $msg_type
  92. * @throws \Exception
  93. * @return int
  94. */
  95. public function getIpcKey($ipc_filename, $msg_type)
  96. {
  97. if (!file_exists($ipc_filename)) {
  98. $create_file = touch($ipc_filename);
  99. if ($create_file === false) {
  100. throw new \RuntimeException('ipc_file is not exists and create failed');
  101. }
  102. }
  103. $key_t = \ftok($ipc_filename, $msg_type);
  104. if ($key_t == 0) throw new \RuntimeException('ftok error');
  105. return $key_t;
  106. }
  107. /**
  108. * get message
  109. *
  110. * @param bool $block if block when the queue is empty
  111. * @return bool|string
  112. */
  113. public function get($block = false)
  114. {
  115. $queue_status = $this->status();
  116. if ($queue_status['msg_qnum'] > 0) {
  117. $option_receive = $block ? 0 : $this->option_receive;
  118. if (\msg_receive(
  119. $this->queue,
  120. $this->msg_type,
  121. $msgtype_erhalten,
  122. $this->maxsize,
  123. $data,
  124. $this->serialize_needed,
  125. $option_receive,
  126. $err
  127. ) === true
  128. ) {
  129. return $data;
  130. } else {
  131. throw new \RuntimeException($err);
  132. }
  133. } else {
  134. return false;
  135. }
  136. }
  137. public function status()
  138. {
  139. $queue_status = \msg_stat_queue($this->queue);
  140. return $queue_status;
  141. }
  142. /*
  143. * return array's keys
  144. * msg_perm.uid The uid of the owner of the queue.
  145. * msg_perm.gid The gid of the owner of the queue.
  146. * msg_perm.mode The file access mode of the queue.
  147. * msg_stime The time that the last message was sent to the queue.
  148. * msg_rtime The time that the last message was received from the queue.
  149. * msg_ctime The time that the queue was last changed.
  150. * msg_qnum The number of messages waiting to be read from the queue.
  151. * msg_qbytes The maximum number of bytes allowed in one message queue.
  152. * On Linux, this value may be read and modified via /proc/sys/kernel/msgmnb.
  153. * msg_lspid The pid of the process that sent the last message to the queue.
  154. * msg_lrpid The pid of the process that received the last message from the queue.
  155. *
  156. * @return array
  157. */
  158. /**
  159. * put message
  160. *
  161. * @param $message
  162. * @return bool
  163. * @throws \Exception
  164. */
  165. public function put($message)
  166. {
  167. if (!\msg_send($this->queue, $this->msg_type, $message, $this->serialize_needed, $this->block_send, $err) === true) {
  168. throw new \RuntimeException($err);
  169. }
  170. return true;
  171. }
  172. /**
  173. * get the size of queue
  174. *
  175. * @return mixed
  176. */
  177. public function size()
  178. {
  179. $status = $this->status();
  180. return $status['msg_qnum'];
  181. }
  182. /**
  183. * allows you to change the values of the msg_perm.uid,
  184. * msg_perm.gid, msg_perm.mode and msg_qbytes fields of the underlying message queue data structure
  185. *
  186. * @param string $key status key
  187. * @param int $value status value
  188. * @return bool
  189. */
  190. public function setStatus($key, $value)
  191. {
  192. $this->checkSetPrivilege($key);
  193. if ($key == 'msg_qbytes')
  194. return $this->setMaxQueueSize($value);
  195. $queue_status[$key] = $value;
  196. return \msg_set_queue($this->queue, $queue_status);
  197. }
  198. /**
  199. * check the privilege of update the queue's status
  200. *
  201. * @param $key
  202. * @throws \Exception
  203. */
  204. private function checkSetPrivilege($key)
  205. {
  206. $privilege_field = array('msg_perm.uid', 'msg_perm.gid', 'msg_perm.mode');
  207. if (!\in_array($key, $privilege_field)) {
  208. $message = 'you can only change msg_perm.uid, msg_perm.gid, ' .
  209. ' msg_perm.mode and msg_qbytes. And msg_qbytes needs root privileges';
  210. throw new \RuntimeException($message);
  211. }
  212. }
  213. /**
  214. * update the max size of queue
  215. * need root
  216. *
  217. * @param $size
  218. * @throws \Exception
  219. * @return bool
  220. */
  221. public function setMaxQueueSize($size)
  222. {
  223. $user = \get_current_user();
  224. if ($user !== 'root')
  225. throw new \Exception('changing msg_qbytes needs root privileges');
  226. return $this->setStatus('msg_qbytes', $size);
  227. }
  228. /**
  229. * remove queue
  230. *
  231. * @return bool
  232. */
  233. public function remove()
  234. {
  235. return \msg_remove_queue($this->queue);
  236. }
  237. /**
  238. * check if the queue is exists or not
  239. *
  240. * @param $key
  241. * @return bool
  242. */
  243. public function queueExists($key)
  244. {
  245. return \msg_queue_exists($key);
  246. }
  247. /**
  248. * init when wakeup
  249. */
  250. public function __wakeup()
  251. {
  252. $this->initQueue($this->ipc_filename, $this->msg_type);
  253. }
  254. /**
  255. *
  256. */
  257. public function __destruct()
  258. {
  259. unset($this);
  260. }
  261. }