Gateway.php 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace GatewayWorker;
  15. use GatewayWorker\Lib\Context;
  16. use Workerman\Connection\TcpConnection;
  17. use Workerman\Worker;
  18. use Workerman\Timer;
  19. use Workerman\Autoloader;
  20. use Workerman\Connection\AsyncTcpConnection;
  21. use GatewayWorker\Protocols\GatewayProtocol;
  22. /**
  23. *
  24. * Gateway,基于Worker 开发
  25. * 用于转发客户端的数据给Worker处理,以及转发Worker的数据给客户端
  26. *
  27. * @author walkor<walkor@workerman.net>
  28. *
  29. */
  30. class Gateway extends Worker
  31. {
  32. /**
  33. * 版本
  34. *
  35. * @var string
  36. */
  37. const VERSION = '3.0.28';
  38. /**
  39. * 本机 IP
  40. * 单机部署默认 127.0.0.1,如果是分布式部署,需要设置成本机 IP
  41. *
  42. * @var string
  43. */
  44. public $lanIp = '127.0.0.1';
  45. /**
  46. * 如果宿主机为192.168.1.2 , gatewayworker in docker container (172.25.0.2)
  47. * 此时 lanIp=192.68.1.2 GatewayClientSDK 能连上,但是$this->_innerTcpWorker stream_socket_server(): Unable to connect to tcp://192.168.1.2:2901 (Address not available) in
  48. * 此时 lanIp=172.25.0.2 GatewayClientSDK stream_socket_server(): Unable to connect to tcp://172.25.0.2:2901 (Address not available) , $this->_innerTcpWorker 正常监听
  49. *
  50. * solution:
  51. * $gateway->lanIp=192.168.1.2 ;
  52. * $gateway->innerTcpWorkerListen=172.25.0.2; // || 0.0.0.0
  53. *
  54. * GatewayClientSDK connect 192.168.1.2:lanPort
  55. * $this->_innerTcpWorker listen $gateway->innerTcpWorkerListen:lanPort
  56. *
  57. */
  58. public $innerTcpWorkerListen='';
  59. /**
  60. * 本机端口
  61. *
  62. * @var string
  63. */
  64. public $lanPort = 0;
  65. /**
  66. * gateway 内部通讯起始端口,每个 gateway 实例应该都不同,步长1000
  67. *
  68. * @var int
  69. */
  70. public $startPort = 2000;
  71. /**
  72. * 注册服务地址,用于注册 Gateway BusinessWorker,使之能够通讯
  73. *
  74. * @var string|array
  75. */
  76. public $registerAddress = '127.0.0.1:1236';
  77. /**
  78. * 是否可以平滑重启,gateway 不能平滑重启,否则会导致连接断开
  79. *
  80. * @var bool
  81. */
  82. public $reloadable = false;
  83. /**
  84. * 心跳时间间隔
  85. *
  86. * @var int
  87. */
  88. public $pingInterval = 0;
  89. /**
  90. * $pingNotResponseLimit * $pingInterval 时间内,客户端未发送任何数据,断开客户端连接
  91. *
  92. * @var int
  93. */
  94. public $pingNotResponseLimit = 0;
  95. /**
  96. * 服务端向客户端发送的心跳数据
  97. *
  98. * @var string
  99. */
  100. public $pingData = '';
  101. /**
  102. * 秘钥
  103. *
  104. * @var string
  105. */
  106. public $secretKey = '';
  107. /**
  108. * 路由函数
  109. *
  110. * @var callable|null
  111. */
  112. public $router = null;
  113. /**
  114. * gateway进程转发给businessWorker进程的发送缓冲区大小
  115. *
  116. * @var int
  117. */
  118. public $sendToWorkerBufferSize = 10240000;
  119. /**
  120. * gateway进程将数据发给客户端时每个客户端发送缓冲区大小
  121. *
  122. * @var int
  123. */
  124. public $sendToClientBufferSize = 1024000;
  125. /**
  126. * 协议加速
  127. *
  128. * @var bool
  129. */
  130. public $protocolAccelerate = false;
  131. /**
  132. * BusinessWorker 连接成功之后触发
  133. *
  134. * @var callable|null
  135. */
  136. public $onBusinessWorkerConnected = null;
  137. /**
  138. * BusinessWorker 关闭时触发
  139. *
  140. * @var callable|null
  141. */
  142. public $onBusinessWorkerClose = null;
  143. /**
  144. * 保存客户端的所有 connection 对象
  145. *
  146. * @var array
  147. */
  148. protected $_clientConnections = array();
  149. /**
  150. * uid 到 connection 的映射,一对多关系
  151. */
  152. protected $_uidConnections = array();
  153. /**
  154. * group 到 connection 的映射,一对多关系
  155. *
  156. * @var array
  157. */
  158. protected $_groupConnections = array();
  159. /**
  160. * 保存所有 worker 的内部连接的 connection 对象
  161. *
  162. * @var array
  163. */
  164. protected $_workerConnections = array();
  165. /**
  166. * gateway 内部监听 worker 内部连接的 worker
  167. *
  168. * @var Worker
  169. */
  170. protected $_innerTcpWorker = null;
  171. /**
  172. * 当 worker 启动时
  173. *
  174. * @var callable|null
  175. */
  176. protected $_onWorkerStart = null;
  177. /**
  178. * 当有客户端连接时
  179. *
  180. * @var callable|null
  181. */
  182. protected $_onConnect = null;
  183. /**
  184. * 当客户端发来消息时
  185. *
  186. * @var callable|null
  187. */
  188. protected $_onMessage = null;
  189. /**
  190. * 当客户端连接关闭时
  191. *
  192. * @var callable|null
  193. */
  194. protected $_onClose = null;
  195. /**
  196. * 当 worker 停止时
  197. *
  198. * @var callable|null
  199. */
  200. protected $_onWorkerStop = null;
  201. /**
  202. * 进程启动时间
  203. *
  204. * @var int
  205. */
  206. protected $_startTime = 0;
  207. /**
  208. * gateway 监听的端口
  209. *
  210. * @var int
  211. */
  212. protected $_gatewayPort = 0;
  213. /**
  214. * connectionId 记录器
  215. * @var int
  216. */
  217. protected static $_connectionIdRecorder = 0;
  218. /**
  219. * 用于保持长连接的心跳时间间隔
  220. *
  221. * @var int
  222. */
  223. const PERSISTENCE_CONNECTION_PING_INTERVAL = 25;
  224. /**
  225. * 构造函数
  226. *
  227. * @param string $socket_name
  228. * @param array $context_option
  229. */
  230. public function __construct($socket_name, $context_option = array())
  231. {
  232. parent::__construct($socket_name, $context_option);
  233. $this->_gatewayPort = substr(strrchr($socket_name,':'),1);
  234. $this->router = array("\\GatewayWorker\\Gateway", 'routerBind');
  235. $backtrace = debug_backtrace();
  236. $this->_autoloadRootPath = dirname($backtrace[0]['file']);
  237. }
  238. /**
  239. * {@inheritdoc}
  240. */
  241. public function run()
  242. {
  243. // 保存用户的回调,当对应的事件发生时触发
  244. $this->_onWorkerStart = $this->onWorkerStart;
  245. $this->onWorkerStart = array($this, 'onWorkerStart');
  246. // 保存用户的回调,当对应的事件发生时触发
  247. $this->_onConnect = $this->onConnect;
  248. $this->onConnect = array($this, 'onClientConnect');
  249. // onMessage禁止用户设置回调
  250. $this->onMessage = array($this, 'onClientMessage');
  251. // 保存用户的回调,当对应的事件发生时触发
  252. $this->_onClose = $this->onClose;
  253. $this->onClose = array($this, 'onClientClose');
  254. // 保存用户的回调,当对应的事件发生时触发
  255. $this->_onWorkerStop = $this->onWorkerStop;
  256. $this->onWorkerStop = array($this, 'onWorkerStop');
  257. if (!is_array($this->registerAddress)) {
  258. $this->registerAddress = array($this->registerAddress);
  259. }
  260. // 记录进程启动的时间
  261. $this->_startTime = time();
  262. // 运行父方法
  263. parent::run();
  264. }
  265. /**
  266. * 当客户端发来数据时,转发给worker处理
  267. *
  268. * @param TcpConnection $connection
  269. * @param mixed $data
  270. */
  271. public function onClientMessage($connection, $data)
  272. {
  273. $connection->pingNotResponseCount = -1;
  274. $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $connection, $data);
  275. }
  276. /**
  277. * 当客户端连接上来时,初始化一些客户端的数据
  278. * 包括全局唯一的client_id、初始化session等
  279. *
  280. * @param TcpConnection $connection
  281. */
  282. public function onClientConnect($connection)
  283. {
  284. $connection->id = self::generateConnectionId();
  285. // 保存该连接的内部通讯的数据包报头,避免每次重新初始化
  286. $connection->gatewayHeader = array(
  287. 'local_ip' => ip2long($this->lanIp),
  288. 'local_port' => $this->lanPort,
  289. 'client_ip' => ip2long($connection->getRemoteIp()),
  290. 'client_port' => $connection->getRemotePort(),
  291. 'gateway_port' => $this->_gatewayPort,
  292. 'connection_id' => $connection->id,
  293. 'flag' => 0,
  294. );
  295. // 连接的 session
  296. $connection->session = '';
  297. // 该连接的心跳参数
  298. $connection->pingNotResponseCount = -1;
  299. // 该链接发送缓冲区大小
  300. $connection->maxSendBufferSize = $this->sendToClientBufferSize;
  301. // 保存客户端连接 connection 对象
  302. $this->_clientConnections[$connection->id] = $connection;
  303. // 如果用户有自定义 onConnect 回调,则执行
  304. if ($this->_onConnect) {
  305. call_user_func($this->_onConnect, $connection);
  306. if (isset($connection->onWebSocketConnect)) {
  307. $connection->_onWebSocketConnect = $connection->onWebSocketConnect;
  308. }
  309. }
  310. if ($connection->protocol === '\Workerman\Protocols\Websocket' || $connection->protocol === 'Workerman\Protocols\Websocket') {
  311. $connection->onWebSocketConnect = array($this, 'onWebsocketConnect');
  312. }
  313. $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECT, $connection);
  314. }
  315. /**
  316. * websocket握手时触发
  317. *
  318. * @param $connection
  319. * @param $request
  320. */
  321. public function onWebsocketConnect($connection, $request)
  322. {
  323. if (isset($connection->_onWebSocketConnect)) {
  324. call_user_func($connection->_onWebSocketConnect, $connection, $request);
  325. unset($connection->_onWebSocketConnect);
  326. }
  327. if (is_object($request)) {
  328. $server = [
  329. 'QUERY_STRING' => $request->queryString(),
  330. 'REQUEST_METHOD' => $request->method(),
  331. 'REQUEST_URI' => $request->uri(),
  332. 'SERVER_PROTOCOL' => "HTTP/" . $request->protocolVersion(),
  333. 'SERVER_NAME' => $request->host(false),
  334. 'CONTENT_TYPE' => $request->header('content-type'),
  335. 'REMOTE_ADDR' => $connection->getRemoteIp(),
  336. 'REMOTE_PORT' => $connection->getRemotePort(),
  337. 'SERVER_PORT' => $connection->getLocalPort(),
  338. ];
  339. foreach ($request->header() as $key => $header) {
  340. $key = str_replace('-', '_', strtoupper($key));
  341. $server["HTTP_$key"] = $header;
  342. }
  343. $data = array('get' => $request->get(), 'server' => $server, 'cookie' => $request->cookie());
  344. } else {
  345. $data = array('get' => $_GET, 'server' => $_SERVER, 'cookie' => $_COOKIE);
  346. }
  347. $this->sendToWorker(GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT, $connection, $data);
  348. }
  349. /**
  350. * 生成connection id
  351. * @return int
  352. */
  353. protected function generateConnectionId()
  354. {
  355. $max_unsigned_int = 4294967295;
  356. if (self::$_connectionIdRecorder >= $max_unsigned_int) {
  357. self::$_connectionIdRecorder = 0;
  358. }
  359. while(++self::$_connectionIdRecorder <= $max_unsigned_int) {
  360. if(!isset($this->_clientConnections[self::$_connectionIdRecorder])) {
  361. break;
  362. }
  363. }
  364. return self::$_connectionIdRecorder;
  365. }
  366. /**
  367. * 发送数据给 worker 进程
  368. *
  369. * @param int $cmd
  370. * @param TcpConnection $connection
  371. * @param mixed $body
  372. * @return bool
  373. */
  374. protected function sendToWorker($cmd, $connection, $body = '')
  375. {
  376. $gateway_data = $connection->gatewayHeader;
  377. $gateway_data['cmd'] = $cmd;
  378. $gateway_data['body'] = $body;
  379. $gateway_data['ext_data'] = $connection->session;
  380. if ($this->_workerConnections) {
  381. // 调用路由函数,选择一个worker把请求转发给它
  382. /** @var TcpConnection $worker_connection */
  383. $worker_connection = call_user_func($this->router, $this->_workerConnections, $connection, $cmd, $body);
  384. if (false === $worker_connection->send($gateway_data)) {
  385. $msg = "SendBufferToWorker fail. May be the send buffer are overflow. See http://doc2.workerman.net/send-buffer-overflow.html";
  386. static::log($msg);
  387. return false;
  388. }
  389. } // 没有可用的 worker
  390. else {
  391. // gateway 启动后 1-2 秒内 SendBufferToWorker fail 是正常现象,因为与 worker 的连接还没建立起来,
  392. // 所以不记录日志,只是关闭连接
  393. $time_diff = 2;
  394. if (time() - $this->_startTime >= $time_diff) {
  395. $msg = 'SendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready. See http://doc2.workerman.net/send-buffer-to-worker-fail.html';
  396. static::log($msg);
  397. }
  398. $connection->destroy();
  399. return false;
  400. }
  401. return true;
  402. }
  403. /**
  404. * 随机路由,返回 worker connection 对象
  405. *
  406. * @param array $worker_connections
  407. * @param TcpConnection $client_connection
  408. * @param int $cmd
  409. * @param mixed $buffer
  410. * @return TcpConnection
  411. */
  412. public static function routerRand($worker_connections, $client_connection, $cmd, $buffer)
  413. {
  414. return $worker_connections[array_rand($worker_connections)];
  415. }
  416. /**
  417. * client_id 与 worker 绑定
  418. *
  419. * @param array $worker_connections
  420. * @param TcpConnection $client_connection
  421. * @param int $cmd
  422. * @param mixed $buffer
  423. * @return TcpConnection
  424. */
  425. public static function routerBind($worker_connections, $client_connection, $cmd, $buffer)
  426. {
  427. if (!isset($client_connection->businessworker_address) || !isset($worker_connections[$client_connection->businessworker_address])) {
  428. $client_connection->businessworker_address = array_rand($worker_connections);
  429. }
  430. return $worker_connections[$client_connection->businessworker_address];
  431. }
  432. /**
  433. * 当客户端关闭时
  434. *
  435. * @param TcpConnection $connection
  436. */
  437. public function onClientClose($connection)
  438. {
  439. // 尝试通知 worker,触发 Event::onClose
  440. $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection);
  441. unset($this->_clientConnections[$connection->id]);
  442. // 清理 uid 数据
  443. if (!empty($connection->uid)) {
  444. $uid = $connection->uid;
  445. unset($this->_uidConnections[$uid][$connection->id]);
  446. if (empty($this->_uidConnections[$uid])) {
  447. unset($this->_uidConnections[$uid]);
  448. }
  449. }
  450. // 清理 group 数据
  451. if (!empty($connection->groups)) {
  452. foreach ($connection->groups as $group) {
  453. unset($this->_groupConnections[$group][$connection->id]);
  454. if (empty($this->_groupConnections[$group])) {
  455. unset($this->_groupConnections[$group]);
  456. }
  457. }
  458. }
  459. // 触发 onClose
  460. if ($this->_onClose) {
  461. call_user_func($this->_onClose, $connection);
  462. }
  463. }
  464. /**
  465. * 当 Gateway 启动的时候触发的回调函数
  466. *
  467. * @return void
  468. */
  469. public function onWorkerStart()
  470. {
  471. // 分配一个内部通讯端口
  472. $this->lanPort = $this->startPort + $this->id;
  473. // 如果有设置心跳,则定时执行
  474. if ($this->pingInterval > 0) {
  475. $timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval / 2 : $this->pingInterval;
  476. Timer::add($timer_interval, array($this, 'ping'));
  477. }
  478. // 如果BusinessWorker ip不是127.0.0.1,则需要加gateway到BusinessWorker的心跳
  479. if ($this->lanIp !== '127.0.0.1') {
  480. Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, 'pingBusinessWorker'));
  481. }
  482. if (!class_exists('\Protocols\GatewayProtocol')) {
  483. class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
  484. }
  485. //如为公网IP监听,直接换成0.0.0.0 ,否则用内网IP
  486. $listen_ip=filter_var($this->lanIp,FILTER_VALIDATE_IP,FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE)?'0.0.0.0':$this->lanIp;
  487. //Use scenario to see line 64
  488. if($this->innerTcpWorkerListen != '') {
  489. $listen_ip = $this->innerTcpWorkerListen;
  490. }
  491. // 初始化 gateway 内部的监听,用于监听 worker 的连接已经连接上发来的数据
  492. $this->_innerTcpWorker = new Worker("GatewayProtocol://{$listen_ip}:{$this->lanPort}");
  493. $this->_innerTcpWorker->reusePort = false;
  494. $this->_innerTcpWorker->listen();
  495. $this->_innerTcpWorker->name = 'GatewayInnerWorker';
  496. if ($this->_autoloadRootPath && class_exists(Autoloader::class)) {
  497. Autoloader::setRootPath($this->_autoloadRootPath);
  498. }
  499. // 设置内部监听的相关回调
  500. $this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');
  501. $this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');
  502. $this->_innerTcpWorker->onClose = array($this, 'onWorkerClose');
  503. // 注册 gateway 的内部通讯地址,worker 去连这个地址,以便 gateway 与 worker 之间建立起 TCP 长连接
  504. $this->registerAddress();
  505. if ($this->_onWorkerStart) {
  506. call_user_func($this->_onWorkerStart, $this);
  507. }
  508. }
  509. /**
  510. * 当 worker 通过内部通讯端口连接到 gateway 时
  511. *
  512. * @param TcpConnection $connection
  513. */
  514. public function onWorkerConnect($connection)
  515. {
  516. $connection->maxSendBufferSize = $this->sendToWorkerBufferSize;
  517. $connection->authorized = $this->secretKey ? false : true;
  518. }
  519. /**
  520. * 当 worker 发来数据时
  521. *
  522. * @param TcpConnection $connection
  523. * @param mixed $data
  524. * @throws \Exception
  525. *
  526. * @return void
  527. */
  528. public function onWorkerMessage($connection, $data)
  529. {
  530. $cmd = $data['cmd'];
  531. if (empty($connection->authorized) && $cmd !== GatewayProtocol::CMD_WORKER_CONNECT && $cmd !== GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT) {
  532. self::log("Unauthorized request from " . $connection->getRemoteIp() . ":" . $connection->getRemotePort());
  533. $connection->close();
  534. return;
  535. }
  536. switch ($cmd) {
  537. // BusinessWorker连接Gateway
  538. case GatewayProtocol::CMD_WORKER_CONNECT:
  539. $worker_info = json_decode($data['body'], true);
  540. if ($worker_info['secret_key'] !== $this->secretKey) {
  541. self::log("Gateway: Worker key does not match ".var_export($this->secretKey, true)." !== ". var_export($this->secretKey));
  542. $connection->close();
  543. return;
  544. }
  545. $key = $connection->getRemoteIp() . ':' . $worker_info['worker_key'];
  546. // 在一台服务器上businessWorker->name不能相同
  547. if (isset($this->_workerConnections[$key])) {
  548. self::log("Gateway: Worker->name conflict. Key:{$key}");
  549. $connection->close();
  550. return;
  551. }
  552. $connection->key = $key;
  553. $this->_workerConnections[$key] = $connection;
  554. $connection->authorized = true;
  555. if ($this->onBusinessWorkerConnected) {
  556. call_user_func($this->onBusinessWorkerConnected, $connection);
  557. }
  558. return;
  559. // GatewayClient连接Gateway
  560. case GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT:
  561. $worker_info = json_decode($data['body'], true);
  562. if ($worker_info['secret_key'] !== $this->secretKey) {
  563. self::log("Gateway: GatewayClient key does not match ".var_export($this->secretKey, true)." !== ".var_export($this->secretKey, true));
  564. $connection->close();
  565. return;
  566. }
  567. $connection->authorized = true;
  568. return;
  569. // 向某客户端发送数据,Gateway::sendToClient($client_id, $message);
  570. case GatewayProtocol::CMD_SEND_TO_ONE:
  571. if (isset($this->_clientConnections[$data['connection_id']])) {
  572. $raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
  573. $body = $data['body'];
  574. if (!$raw && $this->protocolAccelerate && $this->protocol) {
  575. $body = $this->preEncodeForClient($body);
  576. $raw = true;
  577. }
  578. $this->_clientConnections[$data['connection_id']]->send($body, $raw);
  579. }
  580. return;
  581. // 踢出用户,Gateway::closeClient($client_id, $message);
  582. case GatewayProtocol::CMD_KICK:
  583. if (isset($this->_clientConnections[$data['connection_id']])) {
  584. $this->_clientConnections[$data['connection_id']]->close($data['body']);
  585. }
  586. return;
  587. // 立即销毁用户连接, Gateway::destroyClient($client_id);
  588. case GatewayProtocol::CMD_DESTROY:
  589. if (isset($this->_clientConnections[$data['connection_id']])) {
  590. $this->_clientConnections[$data['connection_id']]->destroy();
  591. }
  592. return;
  593. // 广播, Gateway::sendToAll($message, $client_id_array)
  594. case GatewayProtocol::CMD_SEND_TO_ALL:
  595. $raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
  596. $body = $data['body'];
  597. if (!$raw && $this->protocolAccelerate && $this->protocol) {
  598. $body = $this->preEncodeForClient($body);
  599. $raw = true;
  600. }
  601. $ext_data = $data['ext_data'] ? json_decode($data['ext_data'], true) : '';
  602. // $client_id_array 不为空时,只广播给 $client_id_array 指定的客户端
  603. if (isset($ext_data['connections'])) {
  604. foreach ($ext_data['connections'] as $connection_id) {
  605. if (isset($this->_clientConnections[$connection_id])) {
  606. $this->_clientConnections[$connection_id]->send($body, $raw);
  607. }
  608. }
  609. } // $client_id_array 为空时,广播给所有在线客户端
  610. else {
  611. $exclude_connection_id = !empty($ext_data['exclude']) ? $ext_data['exclude'] : null;
  612. foreach ($this->_clientConnections as $client_connection) {
  613. if (!isset($exclude_connection_id[$client_connection->id])) {
  614. $client_connection->send($body, $raw);
  615. }
  616. }
  617. }
  618. return;
  619. case GatewayProtocol::CMD_SELECT:
  620. $client_info_array = array();
  621. $ext_data = json_decode($data['ext_data'], true);
  622. if (!$ext_data) {
  623. echo 'CMD_SELECT ext_data=' . var_export($data['ext_data'], true) . '\r\n';
  624. $buffer = serialize($client_info_array);
  625. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  626. return;
  627. }
  628. $fields = $ext_data['fields'];
  629. $where = $ext_data['where'];
  630. if ($where) {
  631. $connection_box_map = array(
  632. 'groups' => $this->_groupConnections,
  633. 'uid' => $this->_uidConnections
  634. );
  635. // $where = ['groups'=>[x,x..], 'uid'=>[x,x..], 'connection_id'=>[x,x..]]
  636. foreach ($where as $key => $items) {
  637. if ($key !== 'connection_id') {
  638. $connections_box = $connection_box_map[$key];
  639. foreach ($items as $item) {
  640. if (isset($connections_box[$item])) {
  641. foreach ($connections_box[$item] as $connection_id => $client_connection) {
  642. if (!isset($client_info_array[$connection_id])) {
  643. $client_info_array[$connection_id] = array();
  644. // $fields = ['groups', 'uid', 'session']
  645. foreach ($fields as $field) {
  646. $client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
  647. }
  648. }
  649. }
  650. }
  651. }
  652. } else {
  653. foreach ($items as $connection_id) {
  654. if (isset($this->_clientConnections[$connection_id])) {
  655. $client_connection = $this->_clientConnections[$connection_id];
  656. $client_info_array[$connection_id] = array();
  657. // $fields = ['groups', 'uid', 'session']
  658. foreach ($fields as $field) {
  659. $client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
  660. }
  661. }
  662. }
  663. }
  664. }
  665. } else {
  666. foreach ($this->_clientConnections as $connection_id => $client_connection) {
  667. foreach ($fields as $field) {
  668. $client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
  669. }
  670. }
  671. }
  672. $buffer = serialize($client_info_array);
  673. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  674. return;
  675. // 获取在线群组列表
  676. case GatewayProtocol::CMD_GET_GROUP_ID_LIST:
  677. $buffer = serialize(array_keys($this->_groupConnections));
  678. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  679. return;
  680. // 重新赋值 session
  681. case GatewayProtocol::CMD_SET_SESSION:
  682. if (isset($this->_clientConnections[$data['connection_id']])) {
  683. $this->_clientConnections[$data['connection_id']]->session = $data['ext_data'];
  684. }
  685. return;
  686. // session合并
  687. case GatewayProtocol::CMD_UPDATE_SESSION:
  688. if (!isset($this->_clientConnections[$data['connection_id']])) {
  689. return;
  690. } else {
  691. if (!$this->_clientConnections[$data['connection_id']]->session) {
  692. $this->_clientConnections[$data['connection_id']]->session = $data['ext_data'];
  693. return;
  694. }
  695. $session = Context::sessionDecode($this->_clientConnections[$data['connection_id']]->session);
  696. $session_for_merge = Context::sessionDecode($data['ext_data']);
  697. $session = array_replace_recursive($session, $session_for_merge);
  698. $this->_clientConnections[$data['connection_id']]->session = Context::sessionEncode($session);
  699. }
  700. return;
  701. case GatewayProtocol::CMD_GET_SESSION_BY_CLIENT_ID:
  702. if (!isset($this->_clientConnections[$data['connection_id']])) {
  703. $session = serialize(null);
  704. } else {
  705. if (!$this->_clientConnections[$data['connection_id']]->session) {
  706. $session = serialize(array());
  707. } else {
  708. $session = $this->_clientConnections[$data['connection_id']]->session;
  709. }
  710. }
  711. $connection->send(pack('N', strlen($session)) . $session, true);
  712. return;
  713. // 获得客户端sessions
  714. case GatewayProtocol::CMD_GET_ALL_CLIENT_SESSIONS:
  715. $client_info_array = array();
  716. foreach ($this->_clientConnections as $connection_id => $client_connection) {
  717. $client_info_array[$connection_id] = $client_connection->session;
  718. }
  719. $buffer = serialize($client_info_array);
  720. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  721. return;
  722. // 判断某个 client_id 是否在线 Gateway::isOnline($client_id)
  723. case GatewayProtocol::CMD_IS_ONLINE:
  724. $buffer = serialize((int)isset($this->_clientConnections[$data['connection_id']]));
  725. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  726. return;
  727. // 将 client_id 与 uid 绑定
  728. case GatewayProtocol::CMD_BIND_UID:
  729. $uid = $data['ext_data'];
  730. if (empty($uid)) {
  731. echo "bindUid(client_id, uid) uid empty, uid=" . var_export($uid, true);
  732. return;
  733. }
  734. $connection_id = $data['connection_id'];
  735. if (!isset($this->_clientConnections[$connection_id])) {
  736. return;
  737. }
  738. $client_connection = $this->_clientConnections[$connection_id];
  739. if (isset($client_connection->uid)) {
  740. $current_uid = $client_connection->uid;
  741. unset($this->_uidConnections[$current_uid][$connection_id]);
  742. if (empty($this->_uidConnections[$current_uid])) {
  743. unset($this->_uidConnections[$current_uid]);
  744. }
  745. }
  746. $client_connection->uid = $uid;
  747. $this->_uidConnections[$uid][$connection_id] = $client_connection;
  748. return;
  749. // client_id 与 uid 解绑 Gateway::unbindUid($client_id, $uid);
  750. case GatewayProtocol::CMD_UNBIND_UID:
  751. $connection_id = $data['connection_id'];
  752. if (!isset($this->_clientConnections[$connection_id])) {
  753. return;
  754. }
  755. $client_connection = $this->_clientConnections[$connection_id];
  756. if (isset($client_connection->uid)) {
  757. $current_uid = $client_connection->uid;
  758. unset($this->_uidConnections[$current_uid][$connection_id]);
  759. if (empty($this->_uidConnections[$current_uid])) {
  760. unset($this->_uidConnections[$current_uid]);
  761. }
  762. $client_connection->uid_info = '';
  763. $client_connection->uid = null;
  764. }
  765. return;
  766. // 发送数据给 uid Gateway::sendToUid($uid, $msg);
  767. case GatewayProtocol::CMD_SEND_TO_UID:
  768. $raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
  769. $body = $data['body'];
  770. if (!$raw && $this->protocolAccelerate && $this->protocol) {
  771. $body = $this->preEncodeForClient($body);
  772. $raw = true;
  773. }
  774. $uid_array = json_decode($data['ext_data'], true);
  775. foreach ($uid_array as $uid) {
  776. if (!empty($this->_uidConnections[$uid])) {
  777. foreach ($this->_uidConnections[$uid] as $connection) {
  778. /** @var TcpConnection $connection */
  779. $connection->send($body, $raw);
  780. }
  781. }
  782. }
  783. return;
  784. // 将 $client_id 加入用户组 Gateway::joinGroup($client_id, $group);
  785. case GatewayProtocol::CMD_JOIN_GROUP:
  786. $group = $data['ext_data'];
  787. if (empty($group)) {
  788. echo "join(group) group empty, group=" . var_export($group, true);
  789. return;
  790. }
  791. $connection_id = $data['connection_id'];
  792. if (!isset($this->_clientConnections[$connection_id])) {
  793. return;
  794. }
  795. $client_connection = $this->_clientConnections[$connection_id];
  796. if (!isset($client_connection->groups)) {
  797. $client_connection->groups = array();
  798. }
  799. $client_connection->groups[$group] = $group;
  800. $this->_groupConnections[$group][$connection_id] = $client_connection;
  801. return;
  802. // 将 $client_id 从某个用户组中移除 Gateway::leaveGroup($client_id, $group);
  803. case GatewayProtocol::CMD_LEAVE_GROUP:
  804. $group = $data['ext_data'];
  805. if (empty($group)) {
  806. echo "leave(group) group empty, group=" . var_export($group, true);
  807. return;
  808. }
  809. $connection_id = $data['connection_id'];
  810. if (!isset($this->_clientConnections[$connection_id])) {
  811. return;
  812. }
  813. $client_connection = $this->_clientConnections[$connection_id];
  814. if (!isset($client_connection->groups[$group])) {
  815. return;
  816. }
  817. unset($client_connection->groups[$group], $this->_groupConnections[$group][$connection_id]);
  818. if (empty($this->_groupConnections[$group])) {
  819. unset($this->_groupConnections[$group]);
  820. }
  821. return;
  822. // 解散分组
  823. case GatewayProtocol::CMD_UNGROUP:
  824. $group = $data['ext_data'];
  825. if (empty($group)) {
  826. echo "leave(group) group empty, group=" . var_export($group, true);
  827. return;
  828. }
  829. if (empty($this->_groupConnections[$group])) {
  830. return;
  831. }
  832. foreach ($this->_groupConnections[$group] as $client_connection) {
  833. unset($client_connection->groups[$group]);
  834. }
  835. unset($this->_groupConnections[$group]);
  836. return;
  837. // 向某个用户组发送消息 Gateway::sendToGroup($group, $msg);
  838. case GatewayProtocol::CMD_SEND_TO_GROUP:
  839. $raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
  840. $body = $data['body'];
  841. if (!$raw && $this->protocolAccelerate && $this->protocol) {
  842. $body = $this->preEncodeForClient($body);
  843. $raw = true;
  844. }
  845. $ext_data = json_decode($data['ext_data'], true);
  846. $group_array = $ext_data['group'];
  847. $exclude_connection_id = $ext_data['exclude'];
  848. foreach ($group_array as $group) {
  849. if (!empty($this->_groupConnections[$group])) {
  850. foreach ($this->_groupConnections[$group] as $connection) {
  851. if(!isset($exclude_connection_id[$connection->id]))
  852. {
  853. /** @var TcpConnection $connection */
  854. $connection->send($body, $raw);
  855. }
  856. }
  857. }
  858. }
  859. return;
  860. // 获取某用户组成员信息 Gateway::getClientSessionsByGroup($group);
  861. case GatewayProtocol::CMD_GET_CLIENT_SESSIONS_BY_GROUP:
  862. $group = $data['ext_data'];
  863. if (!isset($this->_groupConnections[$group])) {
  864. $buffer = serialize(array());
  865. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  866. return;
  867. }
  868. $client_info_array = array();
  869. foreach ($this->_groupConnections[$group] as $connection_id => $client_connection) {
  870. $client_info_array[$connection_id] = $client_connection->session;
  871. }
  872. $buffer = serialize($client_info_array);
  873. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  874. return;
  875. // 获取用户组成员数 Gateway::getClientCountByGroup($group);
  876. case GatewayProtocol::CMD_GET_CLIENT_COUNT_BY_GROUP:
  877. $group = $data['ext_data'];
  878. $count = 0;
  879. if ($group !== '') {
  880. if (isset($this->_groupConnections[$group])) {
  881. $count = count($this->_groupConnections[$group]);
  882. }
  883. } else {
  884. $count = count($this->_clientConnections);
  885. }
  886. $buffer = serialize($count);
  887. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  888. return;
  889. // 获取与某个 uid 绑定的所有 client_id Gateway::getClientIdByUid($uid);
  890. case GatewayProtocol::CMD_GET_CLIENT_ID_BY_UID:
  891. $uid = $data['ext_data'];
  892. if (empty($this->_uidConnections[$uid])) {
  893. $buffer = serialize(array());
  894. } else {
  895. $buffer = serialize(array_keys($this->_uidConnections[$uid]));
  896. }
  897. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  898. return;
  899. default :
  900. $err_msg = "gateway inner pack err cmd=$cmd";
  901. echo $err_msg;
  902. }
  903. }
  904. /**
  905. * 当worker连接关闭时
  906. *
  907. * @param TcpConnection $connection
  908. */
  909. public function onWorkerClose($connection)
  910. {
  911. if (isset($connection->key)) {
  912. unset($this->_workerConnections[$connection->key]);
  913. if ($this->onBusinessWorkerClose) {
  914. call_user_func($this->onBusinessWorkerClose, $connection);
  915. }
  916. }
  917. }
  918. /**
  919. * 存储当前 Gateway 的内部通信地址
  920. *
  921. * @return bool
  922. */
  923. public function registerAddress()
  924. {
  925. $address = $this->lanIp . ':' . $this->lanPort;
  926. foreach ($this->registerAddress as $register_address) {
  927. $register_connection = new AsyncTcpConnection("text://{$register_address}");
  928. $secret_key = $this->secretKey;
  929. $register_connection->onConnect = function($register_connection) use ($address, $secret_key, $register_address){
  930. $register_connection->send('{"event":"gateway_connect", "address":"' . $address . '", "secret_key":"' . $secret_key . '"}');
  931. // 如果Register服务器不在本地服务器,则需要保持心跳
  932. if (strpos($register_address, '127.0.0.1') !== 0) {
  933. $register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
  934. $register_connection->send('{"event":"ping"}');
  935. });
  936. }
  937. };
  938. $register_connection->onClose = function ($register_connection) {
  939. if(!empty($register_connection->ping_timer)) {
  940. Timer::del($register_connection->ping_timer);
  941. }
  942. $register_connection->reconnect(1);
  943. };
  944. $register_connection->connect();
  945. }
  946. }
  947. /**
  948. * 心跳逻辑
  949. *
  950. * @return void
  951. */
  952. public function ping()
  953. {
  954. $ping_data = $this->pingData ? (string)$this->pingData : null;
  955. $raw = false;
  956. if ($this->protocolAccelerate && $ping_data && $this->protocol) {
  957. $ping_data = $this->preEncodeForClient($ping_data);
  958. $raw = true;
  959. }
  960. // 遍历所有客户端连接
  961. foreach ($this->_clientConnections as $connection) {
  962. // 上次发送的心跳还没有回复次数大于限定值就断开
  963. if ($this->pingNotResponseLimit > 0 &&
  964. $connection->pingNotResponseCount >= $this->pingNotResponseLimit * 2
  965. ) {
  966. $connection->destroy();
  967. continue;
  968. }
  969. // $connection->pingNotResponseCount 为 -1 说明最近客户端有发来消息,则不给客户端发送心跳
  970. $connection->pingNotResponseCount++;
  971. if ($ping_data) {
  972. if ($connection->pingNotResponseCount === 0 ||
  973. ($this->pingNotResponseLimit > 0 && $connection->pingNotResponseCount % 2 === 1)
  974. ) {
  975. continue;
  976. }
  977. $connection->send($ping_data, $raw);
  978. }
  979. }
  980. }
  981. /**
  982. * 向 BusinessWorker 发送心跳数据,用于保持长连接
  983. *
  984. * @return void
  985. */
  986. public function pingBusinessWorker()
  987. {
  988. $gateway_data = GatewayProtocol::$empty;
  989. $gateway_data['cmd'] = GatewayProtocol::CMD_PING;
  990. foreach ($this->_workerConnections as $connection) {
  991. $connection->send($gateway_data);
  992. }
  993. }
  994. /**
  995. * @param mixed $data
  996. *
  997. * @return string
  998. */
  999. protected function preEncodeForClient($data)
  1000. {
  1001. foreach ($this->_clientConnections as $client_connection) {
  1002. return call_user_func(array($client_connection->protocol, 'encode'), $data, $client_connection);
  1003. }
  1004. }
  1005. /**
  1006. * 当 gateway 关闭时触发,清理数据
  1007. *
  1008. * @return void
  1009. */
  1010. public function onWorkerStop()
  1011. {
  1012. // 尝试触发用户设置的回调
  1013. if ($this->_onWorkerStop) {
  1014. call_user_func($this->_onWorkerStop, $this);
  1015. }
  1016. }
  1017. /**
  1018. * Log.
  1019. * @param string $msg
  1020. */
  1021. public static function log($msg){
  1022. Timer::add(1, function() use ($msg) {
  1023. Worker::log($msg);
  1024. }, null, false);
  1025. }
  1026. }