BusinessWorker.php 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace GatewayWorker;
  15. require_once __DIR__.'/../../workerman/Worker.php';
  16. require_once __DIR__.'/Lib/Gateway.php';
  17. use Workerman\Connection\TcpConnection;
  18. use Workerman\Worker;
  19. use Workerman\Lib\Timer;
  20. use Workerman\Connection\AsyncTcpConnection;
  21. use GatewayWorker\Protocols\GatewayProtocol;
  22. use GatewayWorker\Lib\Context;
  23. /**
  24. *
  25. * BusinessWorker 用于处理Gateway转发来的数据
  26. *
  27. * @author walkor<walkor@workerman.net>
  28. *
  29. */
  30. class BusinessWorker extends Worker
  31. {
  32. /**
  33. * 保存与 gateway 的连接 connection 对象
  34. *
  35. * @var array
  36. */
  37. public $gatewayConnections = array();
  38. /**
  39. * 注册中心地址
  40. *
  41. * @var string|array
  42. */
  43. public $registerAddress = '127.0.0.1:1236';
  44. /**
  45. * 事件处理类,默认是 Event 类
  46. *
  47. * @var string
  48. */
  49. public $eventHandler = 'Events';
  50. /**
  51. * 业务超时时间,可用来定位程序卡在哪里
  52. *
  53. * @var int
  54. */
  55. public $processTimeout = 30;
  56. /**
  57. * 业务超时时间,可用来定位程序卡在哪里
  58. *
  59. * @var callable
  60. */
  61. public $processTimeoutHandler = '\\Workerman\\Worker::log';
  62. /**
  63. * 秘钥
  64. *
  65. * @var string
  66. */
  67. public $secretKey = '';
  68. /**
  69. * businessWorker进程将消息转发给gateway进程的发送缓冲区大小
  70. *
  71. * @var int
  72. */
  73. public $sendToGatewayBufferSize = 10240000;
  74. /**
  75. * 保存用户设置的 worker 启动回调
  76. *
  77. * @var callback
  78. */
  79. protected $_onWorkerStart = null;
  80. /**
  81. * 保存用户设置的 workerReload 回调
  82. *
  83. * @var callback
  84. */
  85. protected $_onWorkerReload = null;
  86. /**
  87. * 保存用户设置的 workerStop 回调
  88. *
  89. * @var callback
  90. */
  91. protected $_onWorkerStop= null;
  92. /**
  93. * 到注册中心的连接
  94. *
  95. * @var AsyncTcpConnection
  96. */
  97. protected $_registerConnection = null;
  98. /**
  99. * 处于连接状态的 gateway 通讯地址
  100. *
  101. * @var array
  102. */
  103. protected $_connectingGatewayAddresses = array();
  104. /**
  105. * 所有 geteway 内部通讯地址
  106. *
  107. * @var array
  108. */
  109. protected $_gatewayAddresses = array();
  110. /**
  111. * 等待连接个 gateway 地址
  112. *
  113. * @var array
  114. */
  115. protected $_waitingConnectGatewayAddresses = array();
  116. /**
  117. * Event::onConnect 回调
  118. *
  119. * @var callback
  120. */
  121. protected $_eventOnConnect = null;
  122. /**
  123. * Event::onMessage 回调
  124. *
  125. * @var callback
  126. */
  127. protected $_eventOnMessage = null;
  128. /**
  129. * Event::onClose 回调
  130. *
  131. * @var callback
  132. */
  133. protected $_eventOnClose = null;
  134. /**
  135. * websocket回调
  136. *
  137. * @var null
  138. */
  139. protected $_eventOnWebSocketConnect = null;
  140. /**
  141. * SESSION 版本缓存
  142. *
  143. * @var array
  144. */
  145. protected $_sessionVersion = array();
  146. /**
  147. * 用于保持长连接的心跳时间间隔
  148. *
  149. * @var int
  150. */
  151. const PERSISTENCE_CONNECTION_PING_INTERVAL = 25;
  152. /**
  153. * 构造函数
  154. *
  155. * @param string $socket_name
  156. * @param array $context_option
  157. */
  158. public function __construct($socket_name = '', $context_option = array())
  159. {
  160. parent::__construct($socket_name, $context_option);
  161. $backrace = debug_backtrace();
  162. $this->_autoloadRootPath = dirname($backrace[0]['file']);
  163. }
  164. /**
  165. * {@inheritdoc}
  166. */
  167. public function run()
  168. {
  169. $this->_onWorkerStart = $this->onWorkerStart;
  170. $this->_onWorkerReload = $this->onWorkerReload;
  171. $this->_onWorkerStop = $this->onWorkerStop;
  172. $this->onWorkerStop = array($this, 'onWorkerStop');
  173. $this->onWorkerStart = array($this, 'onWorkerStart');
  174. $this->onWorkerReload = array($this, 'onWorkerReload');
  175. parent::run();
  176. }
  177. /**
  178. * 当进程启动时一些初始化工作
  179. *
  180. * @return void
  181. */
  182. protected function onWorkerStart()
  183. {
  184. if (function_exists('opcache_reset')) {
  185. opcache_reset();
  186. }
  187. if (!class_exists('\Protocols\GatewayProtocol')) {
  188. class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
  189. }
  190. if (!is_array($this->registerAddress)) {
  191. $this->registerAddress = array($this->registerAddress);
  192. }
  193. $this->connectToRegister();
  194. \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
  195. \GatewayWorker\Lib\Gateway::$secretKey = $this->secretKey;
  196. if ($this->_onWorkerStart) {
  197. call_user_func($this->_onWorkerStart, $this);
  198. }
  199. if (is_callable($this->eventHandler . '::onWorkerStart')) {
  200. call_user_func($this->eventHandler . '::onWorkerStart', $this);
  201. }
  202. if (function_exists('pcntl_signal')) {
  203. // 业务超时信号处理
  204. pcntl_signal(SIGALRM, array($this, 'timeoutHandler'), false);
  205. } else {
  206. $this->processTimeout = 0;
  207. }
  208. // 设置回调
  209. if (is_callable($this->eventHandler . '::onConnect')) {
  210. $this->_eventOnConnect = $this->eventHandler . '::onConnect';
  211. }
  212. if (is_callable($this->eventHandler . '::onMessage')) {
  213. $this->_eventOnMessage = $this->eventHandler . '::onMessage';
  214. } else {
  215. echo "Waring: {$this->eventHandler}::onMessage is not callable\n";
  216. }
  217. if (is_callable($this->eventHandler . '::onClose')) {
  218. $this->_eventOnClose = $this->eventHandler . '::onClose';
  219. }
  220. if (is_callable($this->eventHandler . '::onWebSocketConnect')) {
  221. $this->_eventOnWebSocketConnect = $this->eventHandler . '::onWebSocketConnect';
  222. }
  223. }
  224. /**
  225. * onWorkerReload 回调
  226. *
  227. * @param Worker $worker
  228. */
  229. protected function onWorkerReload($worker)
  230. {
  231. // 防止进程立刻退出
  232. $worker->reloadable = false;
  233. // 延迟 0.05 秒退出,避免 BusinessWorker 瞬间全部退出导致没有可用的 BusinessWorker 进程
  234. Timer::add(0.05, array('Workerman\Worker', 'stopAll'));
  235. // 执行用户定义的 onWorkerReload 回调
  236. if ($this->_onWorkerReload) {
  237. call_user_func($this->_onWorkerReload, $this);
  238. }
  239. }
  240. /**
  241. * 当进程关闭时一些清理工作
  242. *
  243. * @return void
  244. */
  245. protected function onWorkerStop()
  246. {
  247. if ($this->_onWorkerStop) {
  248. call_user_func($this->_onWorkerStop, $this);
  249. }
  250. if (is_callable($this->eventHandler . '::onWorkerStop')) {
  251. call_user_func($this->eventHandler . '::onWorkerStop', $this);
  252. }
  253. }
  254. /**
  255. * 连接服务注册中心
  256. *
  257. * @return void
  258. */
  259. public function connectToRegister()
  260. {
  261. foreach ($this->registerAddress as $register_address) {
  262. $register_connection = new AsyncTcpConnection("text://{$register_address}");
  263. $secret_key = $this->secretKey;
  264. $register_connection->onConnect = function () use ($register_connection, $secret_key, $register_address) {
  265. $register_connection->send('{"event":"worker_connect","secret_key":"' . $secret_key . '"}');
  266. // 如果Register服务器不在本地服务器,则需要保持心跳
  267. if (strpos($register_address, '127.0.0.1') !== 0) {
  268. $register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
  269. $register_connection->send('{"event":"ping"}');
  270. });
  271. }
  272. };
  273. $register_connection->onClose = function ($register_connection) {
  274. if(!empty($register_connection->ping_timer)) {
  275. Timer::del($register_connection->ping_timer);
  276. }
  277. $register_connection->reconnect(1);
  278. };
  279. $register_connection->onMessage = array($this, 'onRegisterConnectionMessage');
  280. $register_connection->connect();
  281. }
  282. }
  283. /**
  284. * 当注册中心发来消息时
  285. *
  286. * @return void
  287. */
  288. public function onRegisterConnectionMessage($register_connection, $data)
  289. {
  290. $data = json_decode($data, true);
  291. if (!isset($data['event'])) {
  292. echo "Received bad data from Register\n";
  293. return;
  294. }
  295. $event = $data['event'];
  296. switch ($event) {
  297. case 'broadcast_addresses':
  298. if (!is_array($data['addresses'])) {
  299. echo "Received bad data from Register. Addresses empty\n";
  300. return;
  301. }
  302. $addresses = $data['addresses'];
  303. $this->_gatewayAddresses = array();
  304. foreach ($addresses as $addr) {
  305. $this->_gatewayAddresses[$addr] = $addr;
  306. }
  307. $this->checkGatewayConnections($addresses);
  308. break;
  309. default:
  310. echo "Receive bad event:$event from Register.\n";
  311. }
  312. }
  313. /**
  314. * 当 gateway 转发来数据时
  315. *
  316. * @param TcpConnection $connection
  317. * @param mixed $data
  318. */
  319. public function onGatewayMessage($connection, $data)
  320. {
  321. $cmd = $data['cmd'];
  322. if ($cmd === GatewayProtocol::CMD_PING) {
  323. return;
  324. }
  325. // 上下文数据
  326. Context::$client_ip = $data['client_ip'];
  327. Context::$client_port = $data['client_port'];
  328. Context::$local_ip = $data['local_ip'];
  329. Context::$local_port = $data['local_port'];
  330. Context::$connection_id = $data['connection_id'];
  331. Context::$client_id = Context::addressToClientId($data['local_ip'], $data['local_port'],
  332. $data['connection_id']);
  333. // $_SERVER 变量
  334. $_SERVER = array(
  335. 'REMOTE_ADDR' => long2ip($data['client_ip']),
  336. 'REMOTE_PORT' => $data['client_port'],
  337. 'GATEWAY_ADDR' => long2ip($data['local_ip']),
  338. 'GATEWAY_PORT' => $data['gateway_port'],
  339. 'GATEWAY_CLIENT_ID' => Context::$client_id,
  340. );
  341. // 检查session版本,如果是过期的session数据则拉取最新的数据
  342. if ($cmd !== GatewayProtocol::CMD_ON_CLOSE && isset($this->_sessionVersion[Context::$client_id]) && $this->_sessionVersion[Context::$client_id] !== crc32($data['ext_data'])) {
  343. $_SESSION = Context::$old_session = \GatewayWorker\Lib\Gateway::getSession(Context::$client_id);
  344. $this->_sessionVersion[Context::$client_id] = crc32($data['ext_data']);
  345. } else {
  346. if (!isset($this->_sessionVersion[Context::$client_id])) {
  347. $this->_sessionVersion[Context::$client_id] = crc32($data['ext_data']);
  348. }
  349. // 尝试解析 session
  350. if ($data['ext_data'] != '') {
  351. Context::$old_session = $_SESSION = Context::sessionDecode($data['ext_data']);
  352. } else {
  353. Context::$old_session = $_SESSION = null;
  354. }
  355. }
  356. if ($this->processTimeout) {
  357. pcntl_alarm($this->processTimeout);
  358. }
  359. // 尝试执行 Event::onConnection、Event::onMessage、Event::onClose
  360. switch ($cmd) {
  361. case GatewayProtocol::CMD_ON_CONNECT:
  362. if ($this->_eventOnConnect) {
  363. call_user_func($this->_eventOnConnect, Context::$client_id);
  364. }
  365. break;
  366. case GatewayProtocol::CMD_ON_MESSAGE:
  367. if ($this->_eventOnMessage) {
  368. call_user_func($this->_eventOnMessage, Context::$client_id, $data['body']);
  369. }
  370. break;
  371. case GatewayProtocol::CMD_ON_CLOSE:
  372. unset($this->_sessionVersion[Context::$client_id]);
  373. if ($this->_eventOnClose) {
  374. call_user_func($this->_eventOnClose, Context::$client_id);
  375. }
  376. break;
  377. case GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT:
  378. if ($this->_eventOnWebSocketConnect) {
  379. call_user_func($this->_eventOnWebSocketConnect, Context::$client_id, $data['body']);
  380. }
  381. break;
  382. }
  383. if ($this->processTimeout) {
  384. pcntl_alarm(0);
  385. }
  386. // session 必须是数组
  387. if ($_SESSION !== null && !is_array($_SESSION)) {
  388. throw new \Exception('$_SESSION must be an array. But $_SESSION=' . var_export($_SESSION, true) . ' is not array.');
  389. }
  390. // 判断 session 是否被更改
  391. if ($_SESSION !== Context::$old_session && $cmd !== GatewayProtocol::CMD_ON_CLOSE) {
  392. $session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
  393. \GatewayWorker\Lib\Gateway::setSocketSession(Context::$client_id, $session_str_now);
  394. $this->_sessionVersion[Context::$client_id] = crc32($session_str_now);
  395. }
  396. Context::clear();
  397. }
  398. /**
  399. * 当与 Gateway 的连接断开时触发
  400. *
  401. * @param TcpConnection $connection
  402. * @return void
  403. */
  404. public function onGatewayClose($connection)
  405. {
  406. $addr = $connection->remoteAddress;
  407. unset($this->gatewayConnections[$addr], $this->_connectingGatewayAddresses[$addr]);
  408. if (isset($this->_gatewayAddresses[$addr]) && !isset($this->_waitingConnectGatewayAddresses[$addr])) {
  409. Timer::add(1, array($this, 'tryToConnectGateway'), array($addr), false);
  410. $this->_waitingConnectGatewayAddresses[$addr] = $addr;
  411. }
  412. }
  413. /**
  414. * 尝试连接 Gateway 内部通讯地址
  415. *
  416. * @param string $addr
  417. */
  418. public function tryToConnectGateway($addr)
  419. {
  420. if (!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddresses[$addr]) && isset($this->_gatewayAddresses[$addr])) {
  421. $gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
  422. $gateway_connection->remoteAddress = $addr;
  423. $gateway_connection->onConnect = array($this, 'onConnectGateway');
  424. $gateway_connection->onMessage = array($this, 'onGatewayMessage');
  425. $gateway_connection->onClose = array($this, 'onGatewayClose');
  426. $gateway_connection->onError = array($this, 'onGatewayError');
  427. $gateway_connection->maxSendBufferSize = $this->sendToGatewayBufferSize;
  428. if (TcpConnection::$defaultMaxSendBufferSize == $gateway_connection->maxSendBufferSize) {
  429. $gateway_connection->maxSendBufferSize = 50 * 1024 * 1024;
  430. }
  431. $gateway_data = GatewayProtocol::$empty;
  432. $gateway_data['cmd'] = GatewayProtocol::CMD_WORKER_CONNECT;
  433. $gateway_data['body'] = json_encode(array(
  434. 'worker_key' =>"{$this->name}:{$this->id}",
  435. 'secret_key' => $this->secretKey,
  436. ));
  437. $gateway_connection->send($gateway_data);
  438. $gateway_connection->connect();
  439. $this->_connectingGatewayAddresses[$addr] = $addr;
  440. }
  441. unset($this->_waitingConnectGatewayAddresses[$addr]);
  442. }
  443. /**
  444. * 检查 gateway 的通信端口是否都已经连
  445. * 如果有未连接的端口,则尝试连接
  446. *
  447. * @param array $addresses_list
  448. */
  449. public function checkGatewayConnections($addresses_list)
  450. {
  451. if (empty($addresses_list)) {
  452. return;
  453. }
  454. foreach ($addresses_list as $addr) {
  455. if (!isset($this->_waitingConnectGatewayAddresses[$addr])) {
  456. $this->tryToConnectGateway($addr);
  457. }
  458. }
  459. }
  460. /**
  461. * 当连接上 gateway 的通讯端口时触发
  462. * 将连接 connection 对象保存起来
  463. *
  464. * @param TcpConnection $connection
  465. * @return void
  466. */
  467. public function onConnectGateway($connection)
  468. {
  469. $this->gatewayConnections[$connection->remoteAddress] = $connection;
  470. unset($this->_connectingGatewayAddresses[$connection->remoteAddress], $this->_waitingConnectGatewayAddresses[$connection->remoteAddress]);
  471. }
  472. /**
  473. * 当与 gateway 的连接出现错误时触发
  474. *
  475. * @param TcpConnection $connection
  476. * @param int $error_no
  477. * @param string $error_msg
  478. */
  479. public function onGatewayError($connection, $error_no, $error_msg)
  480. {
  481. echo "GatewayConnection Error : $error_no ,$error_msg\n";
  482. }
  483. /**
  484. * 获取所有 Gateway 内部通讯地址
  485. *
  486. * @return array
  487. */
  488. public function getAllGatewayAddresses()
  489. {
  490. return $this->_gatewayAddresses;
  491. }
  492. /**
  493. * 业务超时回调
  494. *
  495. * @param int $signal
  496. * @throws \Exception
  497. */
  498. public function timeoutHandler($signal)
  499. {
  500. switch ($signal) {
  501. // 超时时钟
  502. case SIGALRM:
  503. // 超时异常
  504. $e = new \Exception("process_timeout", 506);
  505. $trace_str = $e->getTraceAsString();
  506. // 去掉第一行timeoutHandler的调用栈
  507. $trace_str = $e->getMessage() . ":\n" . substr($trace_str, strpos($trace_str, "\n") + 1) . "\n";
  508. // 开发者没有设置超时处理函数,或者超时处理函数返回空则执行退出
  509. if (!$this->processTimeoutHandler || !call_user_func($this->processTimeoutHandler, $trace_str, $e)) {
  510. Worker::stopAll();
  511. }
  512. break;
  513. }
  514. }
  515. }