123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- <?php
- /**
- * Created by PhpStorm.
- * User: Jenner
- * Date: 2015/8/12
- * Time: 15:15
- */
- namespace Jenner\SimpleFork\Queue;
- /**
- * system v message queue
- *
- * @package Jenner\SimpleFork\Queue
- */
- class SystemVMessageQueue implements QueueInterface
- {
- /**
- * @var int channel
- */
- protected $msg_type;
- /**
- * @var
- */
- protected $queue;
- /**
- * @var bool
- */
- protected $serialize_needed;
- /**
- * @var bool
- */
- protected $block_send;
- /**
- * @var int
- */
- protected $option_receive;
- /**
- * @var int
- */
- protected $maxsize;
- /**
- * @var
- */
- protected $key_t;
- /**
- * @var string
- */
- protected $ipc_filename;
- /**
- * @param string $ipc_filename ipc file to make ipc key.
- * if it does not exists, it will try to create the file.
- * @param int $channel message type
- * @param bool $serialize_needed serialize or not
- * @param bool $block_send if block when the queue is full
- * @param int $option_receive if the value is MSG_IPC_NOWAIT it will not
- * going to wait a message coming. if the value is null,
- * it will block and wait a message
- * @param int $maxsize the max size of queue
- */
- public function __construct(
- $ipc_filename = __FILE__,
- $channel = 1,
- $serialize_needed = true,
- $block_send = true,
- $option_receive = MSG_IPC_NOWAIT,
- $maxsize = 100000
- )
- {
- $this->ipc_filename = $ipc_filename;
- $this->msg_type = $channel;
- $this->serialize_needed = $serialize_needed;
- $this->block_send = $block_send;
- $this->option_receive = $option_receive;
- $this->maxsize = $maxsize;
- $this->initQueue($ipc_filename, $channel);
- }
- /**
- * init queue
- *
- * @param $ipc_filename
- * @param $msg_type
- * @throws \Exception
- */
- protected function initQueue($ipc_filename, $msg_type)
- {
- $this->key_t = $this->getIpcKey($ipc_filename, $msg_type);
- $this->queue = \msg_get_queue($this->key_t);
- if (!$this->queue) throw new \RuntimeException('msg_get_queue failed');
- }
- /**
- * @param $ipc_filename
- * @param $msg_type
- * @throws \Exception
- * @return int
- */
- public function getIpcKey($ipc_filename, $msg_type)
- {
- if (!file_exists($ipc_filename)) {
- $create_file = touch($ipc_filename);
- if ($create_file === false) {
- throw new \RuntimeException('ipc_file is not exists and create failed');
- }
- }
- $key_t = \ftok($ipc_filename, $msg_type);
- if ($key_t == 0) throw new \RuntimeException('ftok error');
- return $key_t;
- }
- /**
- * get message
- *
- * @param bool $block if block when the queue is empty
- * @return bool|string
- */
- public function get($block = false)
- {
- $queue_status = $this->status();
- if ($queue_status['msg_qnum'] > 0) {
- $option_receive = $block ? 0 : $this->option_receive;
- if (\msg_receive(
- $this->queue,
- $this->msg_type,
- $msgtype_erhalten,
- $this->maxsize,
- $data,
- $this->serialize_needed,
- $option_receive,
- $err
- ) === true
- ) {
- return $data;
- } else {
- throw new \RuntimeException($err);
- }
- } else {
- return false;
- }
- }
- public function status()
- {
- $queue_status = \msg_stat_queue($this->queue);
- return $queue_status;
- }
- /*
- * return array's keys
- * msg_perm.uid The uid of the owner of the queue.
- * msg_perm.gid The gid of the owner of the queue.
- * msg_perm.mode The file access mode of the queue.
- * msg_stime The time that the last message was sent to the queue.
- * msg_rtime The time that the last message was received from the queue.
- * msg_ctime The time that the queue was last changed.
- * msg_qnum The number of messages waiting to be read from the queue.
- * msg_qbytes The maximum number of bytes allowed in one message queue.
- * On Linux, this value may be read and modified via /proc/sys/kernel/msgmnb.
- * msg_lspid The pid of the process that sent the last message to the queue.
- * msg_lrpid The pid of the process that received the last message from the queue.
- *
- * @return array
- */
- /**
- * put message
- *
- * @param $message
- * @return bool
- * @throws \Exception
- */
- public function put($message)
- {
- if (!\msg_send($this->queue, $this->msg_type, $message, $this->serialize_needed, $this->block_send, $err) === true) {
- throw new \RuntimeException($err);
- }
- return true;
- }
- /**
- * get the size of queue
- *
- * @return mixed
- */
- public function size()
- {
- $status = $this->status();
- return $status['msg_qnum'];
- }
- /**
- * allows you to change the values of the msg_perm.uid,
- * msg_perm.gid, msg_perm.mode and msg_qbytes fields of the underlying message queue data structure
- *
- * @param string $key status key
- * @param int $value status value
- * @return bool
- */
- public function setStatus($key, $value)
- {
- $this->checkSetPrivilege($key);
- if ($key == 'msg_qbytes')
- return $this->setMaxQueueSize($value);
- $queue_status[$key] = $value;
- return \msg_set_queue($this->queue, $queue_status);
- }
- /**
- * check the privilege of update the queue's status
- *
- * @param $key
- * @throws \Exception
- */
- private function checkSetPrivilege($key)
- {
- $privilege_field = array('msg_perm.uid', 'msg_perm.gid', 'msg_perm.mode');
- if (!\in_array($key, $privilege_field)) {
- $message = 'you can only change msg_perm.uid, msg_perm.gid, ' .
- ' msg_perm.mode and msg_qbytes. And msg_qbytes needs root privileges';
- throw new \RuntimeException($message);
- }
- }
- /**
- * update the max size of queue
- * need root
- *
- * @param $size
- * @throws \Exception
- * @return bool
- */
- public function setMaxQueueSize($size)
- {
- $user = \get_current_user();
- if ($user !== 'root')
- throw new \Exception('changing msg_qbytes needs root privileges');
- return $this->setStatus('msg_qbytes', $size);
- }
- /**
- * remove queue
- *
- * @return bool
- */
- public function remove()
- {
- return \msg_remove_queue($this->queue);
- }
- /**
- * check if the queue is exists or not
- *
- * @param $key
- * @return bool
- */
- public function queueExists($key)
- {
- return \msg_queue_exists($key);
- }
- /**
- * init when wakeup
- */
- public function __wakeup()
- {
- $this->initQueue($this->ipc_filename, $this->msg_type);
- }
- /**
- *
- */
- public function __destruct()
- {
- unset($this);
- }
- }
|