Select.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Events;
  15. use Throwable;
  16. use Workerman\Worker;
  17. /**
  18. * select eventloop
  19. */
  20. class Select implements EventInterface
  21. {
  22. /**
  23. * All listeners for read/write event.
  24. *
  25. * @var array
  26. */
  27. public $_allEvents = array();
  28. /**
  29. * Event listeners of signal.
  30. *
  31. * @var array
  32. */
  33. public $_signalEvents = array();
  34. /**
  35. * Fds waiting for read event.
  36. *
  37. * @var array
  38. */
  39. protected $_readFds = array();
  40. /**
  41. * Fds waiting for write event.
  42. *
  43. * @var array
  44. */
  45. protected $_writeFds = array();
  46. /**
  47. * Fds waiting for except event.
  48. *
  49. * @var array
  50. */
  51. protected $_exceptFds = array();
  52. /**
  53. * Timer scheduler.
  54. * {['data':timer_id, 'priority':run_timestamp], ..}
  55. *
  56. * @var \SplPriorityQueue
  57. */
  58. protected $_scheduler = null;
  59. /**
  60. * All timer event listeners.
  61. * [[func, args, flag, timer_interval], ..]
  62. *
  63. * @var array
  64. */
  65. protected $_eventTimer = array();
  66. /**
  67. * Timer id.
  68. *
  69. * @var int
  70. */
  71. protected $_timerId = 1;
  72. /**
  73. * Select timeout.
  74. *
  75. * @var int
  76. */
  77. protected $_selectTimeout = 100000000;
  78. /**
  79. * Paired socket channels
  80. *
  81. * @var array
  82. */
  83. protected $channel = array();
  84. /**
  85. * Construct.
  86. */
  87. public function __construct()
  88. {
  89. // Init SplPriorityQueue.
  90. $this->_scheduler = new \SplPriorityQueue();
  91. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  92. }
  93. /**
  94. * {@inheritdoc}
  95. */
  96. public function add($fd, $flag, $func, $args = array())
  97. {
  98. switch ($flag) {
  99. case self::EV_READ:
  100. case self::EV_WRITE:
  101. $count = $flag === self::EV_READ ? \count($this->_readFds) : \count($this->_writeFds);
  102. if ($count >= 1024) {
  103. echo "Warning: system call select exceeded the maximum number of connections 1024, please install event/libevent extension for more connections.\n";
  104. } else if (\DIRECTORY_SEPARATOR !== '/' && $count >= 256) {
  105. echo "Warning: system call select exceeded the maximum number of connections 256.\n";
  106. }
  107. $fd_key = (int)$fd;
  108. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  109. if ($flag === self::EV_READ) {
  110. $this->_readFds[$fd_key] = $fd;
  111. } else {
  112. $this->_writeFds[$fd_key] = $fd;
  113. }
  114. break;
  115. case self::EV_EXCEPT:
  116. $fd_key = (int)$fd;
  117. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  118. $this->_exceptFds[$fd_key] = $fd;
  119. break;
  120. case self::EV_SIGNAL:
  121. // Windows not support signal.
  122. if(\DIRECTORY_SEPARATOR !== '/') {
  123. return false;
  124. }
  125. $fd_key = (int)$fd;
  126. $this->_signalEvents[$fd_key][$flag] = array($func, $fd);
  127. \pcntl_signal($fd, array($this, 'signalHandler'));
  128. break;
  129. case self::EV_TIMER:
  130. case self::EV_TIMER_ONCE:
  131. $timer_id = $this->_timerId++;
  132. $run_time = \microtime(true) + $fd;
  133. $this->_scheduler->insert($timer_id, -$run_time);
  134. $this->_eventTimer[$timer_id] = array($func, (array)$args, $flag, $fd);
  135. $select_timeout = ($run_time - \microtime(true)) * 1000000;
  136. $select_timeout = $select_timeout <= 0 ? 1 : $select_timeout;
  137. if( $this->_selectTimeout > $select_timeout ){
  138. $this->_selectTimeout = (int) $select_timeout;
  139. }
  140. return $timer_id;
  141. }
  142. return true;
  143. }
  144. /**
  145. * Signal handler.
  146. *
  147. * @param int $signal
  148. */
  149. public function signalHandler($signal)
  150. {
  151. \call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal));
  152. }
  153. /**
  154. * {@inheritdoc}
  155. */
  156. public function del($fd, $flag)
  157. {
  158. $fd_key = (int)$fd;
  159. switch ($flag) {
  160. case self::EV_READ:
  161. unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]);
  162. if (empty($this->_allEvents[$fd_key])) {
  163. unset($this->_allEvents[$fd_key]);
  164. }
  165. return true;
  166. case self::EV_WRITE:
  167. unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]);
  168. if (empty($this->_allEvents[$fd_key])) {
  169. unset($this->_allEvents[$fd_key]);
  170. }
  171. return true;
  172. case self::EV_EXCEPT:
  173. unset($this->_allEvents[$fd_key][$flag], $this->_exceptFds[$fd_key]);
  174. if(empty($this->_allEvents[$fd_key]))
  175. {
  176. unset($this->_allEvents[$fd_key]);
  177. }
  178. return true;
  179. case self::EV_SIGNAL:
  180. if(\DIRECTORY_SEPARATOR !== '/') {
  181. return false;
  182. }
  183. unset($this->_signalEvents[$fd_key]);
  184. \pcntl_signal($fd, SIG_IGN);
  185. break;
  186. case self::EV_TIMER:
  187. case self::EV_TIMER_ONCE;
  188. unset($this->_eventTimer[$fd_key]);
  189. return true;
  190. }
  191. return false;
  192. }
  193. /**
  194. * Tick for timer.
  195. *
  196. * @return void
  197. */
  198. protected function tick()
  199. {
  200. $tasks_to_insert = [];
  201. while (!$this->_scheduler->isEmpty()) {
  202. $scheduler_data = $this->_scheduler->top();
  203. $timer_id = $scheduler_data['data'];
  204. $next_run_time = -$scheduler_data['priority'];
  205. $time_now = \microtime(true);
  206. $this->_selectTimeout = (int) (($next_run_time - $time_now) * 1000000);
  207. if ($this->_selectTimeout <= 0) {
  208. $this->_scheduler->extract();
  209. if (!isset($this->_eventTimer[$timer_id])) {
  210. continue;
  211. }
  212. // [func, args, flag, timer_interval]
  213. $task_data = $this->_eventTimer[$timer_id];
  214. if ($task_data[2] === self::EV_TIMER) {
  215. $next_run_time = $time_now + $task_data[3];
  216. $tasks_to_insert[] = [$timer_id, -$next_run_time];
  217. }
  218. try {
  219. \call_user_func_array($task_data[0], $task_data[1]);
  220. } catch (Throwable $e) {
  221. Worker::stopAll(250, $e);
  222. }
  223. if (isset($this->_eventTimer[$timer_id]) && $task_data[2] === self::EV_TIMER_ONCE) {
  224. $this->del($timer_id, self::EV_TIMER_ONCE);
  225. }
  226. } else {
  227. break;
  228. }
  229. }
  230. foreach ($tasks_to_insert as $item) {
  231. $this->_scheduler->insert($item[0], $item[1]);
  232. }
  233. if (!$this->_scheduler->isEmpty()) {
  234. $scheduler_data = $this->_scheduler->top();
  235. $next_run_time = -$scheduler_data['priority'];
  236. $time_now = \microtime(true);
  237. $this->_selectTimeout = \max((int) (($next_run_time - $time_now) * 1000000), 0);
  238. return;
  239. }
  240. $this->_selectTimeout = 100000000;
  241. }
  242. /**
  243. * {@inheritdoc}
  244. */
  245. public function clearAllTimer()
  246. {
  247. $this->_scheduler = new \SplPriorityQueue();
  248. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  249. $this->_eventTimer = array();
  250. }
  251. /**
  252. * {@inheritdoc}
  253. */
  254. public function loop()
  255. {
  256. while (1) {
  257. if(\DIRECTORY_SEPARATOR === '/') {
  258. // Calls signal handlers for pending signals
  259. \pcntl_signal_dispatch();
  260. }
  261. $read = $this->_readFds;
  262. $write = $this->_writeFds;
  263. $except = $this->_exceptFds;
  264. $ret = false;
  265. if ($read || $write || $except) {
  266. // Waiting read/write/signal/timeout events.
  267. try {
  268. $ret = @stream_select($read, $write, $except, 0, $this->_selectTimeout);
  269. } catch (\Exception $e) {} catch (\Error $e) {}
  270. } else {
  271. $this->_selectTimeout >= 1 && usleep($this->_selectTimeout);
  272. }
  273. if (!$this->_scheduler->isEmpty()) {
  274. $this->tick();
  275. }
  276. if (!$ret) {
  277. continue;
  278. }
  279. if ($read) {
  280. foreach ($read as $fd) {
  281. $fd_key = (int)$fd;
  282. if (isset($this->_allEvents[$fd_key][self::EV_READ])) {
  283. \call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0],
  284. array($this->_allEvents[$fd_key][self::EV_READ][1]));
  285. }
  286. }
  287. }
  288. if ($write) {
  289. foreach ($write as $fd) {
  290. $fd_key = (int)$fd;
  291. if (isset($this->_allEvents[$fd_key][self::EV_WRITE])) {
  292. \call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0],
  293. array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
  294. }
  295. }
  296. }
  297. if($except) {
  298. foreach($except as $fd) {
  299. $fd_key = (int) $fd;
  300. if(isset($this->_allEvents[$fd_key][self::EV_EXCEPT])) {
  301. \call_user_func_array($this->_allEvents[$fd_key][self::EV_EXCEPT][0],
  302. array($this->_allEvents[$fd_key][self::EV_EXCEPT][1]));
  303. }
  304. }
  305. }
  306. }
  307. }
  308. /**
  309. * Destroy loop.
  310. *
  311. * @return void
  312. */
  313. public function destroy()
  314. {
  315. }
  316. /**
  317. * Get timer count.
  318. *
  319. * @return integer
  320. */
  321. public function getTimerCount()
  322. {
  323. return \count($this->_eventTimer);
  324. }
  325. }