Gateway.php 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614
  1. <?php
  2. namespace GatewayClient;
  3. use \Exception;
  4. /**
  5. * This file is part of workerman.
  6. *
  7. * Licensed under The MIT License
  8. * For full copyright and license information, please see the MIT-LICENSE.txt
  9. * Redistributions of files must retain the above copyright notice.
  10. *
  11. * @author walkor<walkor@workerman.net>
  12. * @copyright walkor<walkor@workerman.net>
  13. * @link http://www.workerman.net/
  14. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  15. */
  16. /**
  17. * 数据发送相关
  18. * @version 3.0.12
  19. */
  20. /**
  21. * 数据发送相关
  22. */
  23. class Gateway
  24. {
  25. /**
  26. * gateway 实例
  27. *
  28. * @var object
  29. */
  30. protected static $businessWorker = null;
  31. /**
  32. * 注册中心地址
  33. *
  34. * @var string|array
  35. */
  36. public static $registerAddress = '127.0.0.1:1236';
  37. /**
  38. * 秘钥
  39. * @var string
  40. */
  41. public static $secretKey = '';
  42. /**
  43. * 链接超时时间
  44. * @var int
  45. */
  46. public static $connectTimeout = 3;
  47. /**
  48. * 与Gateway是否是长链接
  49. * @var bool
  50. */
  51. public static $persistentConnection = false;
  52. /**
  53. * 是否清除注册地址缓存
  54. * @var bool
  55. */
  56. public static $addressesCacheDisable = false;
  57. /**
  58. * 向所有客户端连接(或者 client_id_array 指定的客户端连接)广播消息
  59. *
  60. * @param string $message 向客户端发送的消息
  61. * @param array $client_id_array 客户端 id 数组
  62. * @param array $exclude_client_id 不给这些client_id发
  63. * @param bool $raw 是否发送原始数据(即不调用gateway的协议的encode方法)
  64. * @return void
  65. * @throws Exception
  66. */
  67. public static function sendToAll($message, $client_id_array = null, $exclude_client_id = null, $raw = false)
  68. {
  69. $gateway_data = GatewayProtocol::$empty;
  70. $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_ALL;
  71. $gateway_data['body'] = $message;
  72. if ($raw) {
  73. $gateway_data['flag'] |= GatewayProtocol::FLAG_NOT_CALL_ENCODE;
  74. }
  75. if ($exclude_client_id) {
  76. if (!is_array($exclude_client_id)) {
  77. $exclude_client_id = array($exclude_client_id);
  78. }
  79. if ($client_id_array) {
  80. $exclude_client_id = array_flip($exclude_client_id);
  81. }
  82. }
  83. if ($client_id_array) {
  84. if (!is_array($client_id_array)) {
  85. echo new \Exception('bad $client_id_array:'.var_export($client_id_array, true));
  86. return;
  87. }
  88. $data_array = array();
  89. foreach ($client_id_array as $client_id) {
  90. if (isset($exclude_client_id[$client_id])) {
  91. continue;
  92. }
  93. $address = Context::clientIdToAddress($client_id);
  94. if ($address) {
  95. $key = long2ip($address['local_ip']) . ":{$address['local_port']}";
  96. $data_array[$key][$address['connection_id']] = $address['connection_id'];
  97. }
  98. }
  99. foreach ($data_array as $addr => $connection_id_list) {
  100. $the_gateway_data = $gateway_data;
  101. $the_gateway_data['ext_data'] = json_encode(array('connections' => $connection_id_list));
  102. static::sendToGateway($addr, $the_gateway_data);
  103. }
  104. return;
  105. } elseif (empty($client_id_array) && is_array($client_id_array)) {
  106. return;
  107. }
  108. if (!$exclude_client_id) {
  109. return static::sendToAllGateway($gateway_data);
  110. }
  111. $address_connection_array = static::clientIdArrayToAddressArray($exclude_client_id);
  112. // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
  113. if (static::$businessWorker) {
  114. foreach (static::$businessWorker->gatewayConnections as $address => $gateway_connection) {
  115. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  116. json_encode(array('exclude'=> $address_connection_array[$address])) : '';
  117. /** @var TcpConnection $gateway_connection */
  118. $gateway_connection->send($gateway_data);
  119. }
  120. } // 运行在其它环境中,通过注册中心得到gateway地址
  121. else {
  122. $all_addresses = static::getAllGatewayAddressesFromRegister();
  123. foreach ($all_addresses as $address) {
  124. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  125. json_encode(array('exclude'=> $address_connection_array[$address])) : '';
  126. static::sendToGateway($address, $gateway_data);
  127. }
  128. }
  129. }
  130. /**
  131. * 向某个client_id对应的连接发消息
  132. *
  133. * @param string $client_id
  134. * @param string $message
  135. * @return void
  136. */
  137. public static function sendToClient($client_id, $message)
  138. {
  139. return static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_SEND_TO_ONE, $message);
  140. }
  141. /**
  142. * 判断某个uid是否在线
  143. *
  144. * @param string $uid
  145. * @return int 0|1
  146. */
  147. public static function isUidOnline($uid)
  148. {
  149. return (int)static::getClientIdByUid($uid);
  150. }
  151. /**
  152. * 判断client_id对应的连接是否在线
  153. *
  154. * @param string $client_id
  155. * @return int 0|1
  156. */
  157. public static function isOnline($client_id)
  158. {
  159. $address_data = Context::clientIdToAddress($client_id);
  160. if (!$address_data) {
  161. return 0;
  162. }
  163. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  164. if (isset(static::$businessWorker)) {
  165. if (!isset(static::$businessWorker->gatewayConnections[$address])) {
  166. return 0;
  167. }
  168. }
  169. $gateway_data = GatewayProtocol::$empty;
  170. $gateway_data['cmd'] = GatewayProtocol::CMD_IS_ONLINE;
  171. $gateway_data['connection_id'] = $address_data['connection_id'];
  172. return (int)static::sendAndRecv($address, $gateway_data);
  173. }
  174. /**
  175. * 获取所有在线用户的session,client_id为 key(弃用,请用getAllClientSessions代替)
  176. *
  177. * @param string $group
  178. * @return array
  179. */
  180. public static function getAllClientInfo($group = '')
  181. {
  182. echo "Warning: Gateway::getAllClientInfo is deprecated and will be removed in a future, please use Gateway::getAllClientSessions instead.";
  183. return static::getAllClientSessions($group);
  184. }
  185. /**
  186. * 获取所有在线client_id的session,client_id为 key
  187. *
  188. * @param string $group
  189. * @return array
  190. */
  191. public static function getAllClientSessions($group = '')
  192. {
  193. $gateway_data = GatewayProtocol::$empty;
  194. if (!$group) {
  195. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_ALL_CLIENT_SESSIONS;
  196. } else {
  197. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_CLIENT_SESSIONS_BY_GROUP;
  198. $gateway_data['ext_data'] = $group;
  199. }
  200. $status_data = array();
  201. $all_buffer_array = static::getBufferFromAllGateway($gateway_data);
  202. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  203. foreach ($buffer_array as $local_port => $data) {
  204. if ($data) {
  205. foreach ($data as $connection_id => $session_buffer) {
  206. $client_id = Context::addressToClientId($local_ip, $local_port, $connection_id);
  207. if ($client_id === Context::$client_id) {
  208. $status_data[$client_id] = (array)$_SESSION;
  209. } else {
  210. $status_data[$client_id] = $session_buffer ? Context::sessionDecode($session_buffer) : array();
  211. }
  212. }
  213. }
  214. }
  215. }
  216. return $status_data;
  217. }
  218. /**
  219. * 获取某个组的连接信息(弃用,请用getClientSessionsByGroup代替)
  220. *
  221. * @param string $group
  222. * @return array
  223. */
  224. public static function getClientInfoByGroup($group)
  225. {
  226. echo "Warning: Gateway::getClientInfoByGroup is deprecated and will be removed in a future, please use Gateway::getClientSessionsByGroup instead.";
  227. return static::getAllClientSessions($group);
  228. }
  229. /**
  230. * 获取某个组的所有client_id的session信息
  231. *
  232. * @param string $group
  233. *
  234. * @return array
  235. */
  236. public static function getClientSessionsByGroup($group)
  237. {
  238. if (static::isValidGroupId($group)) {
  239. return static::getAllClientSessions($group);
  240. }
  241. return array();
  242. }
  243. /**
  244. * 获取所有在线client_id数
  245. *
  246. * @return int
  247. */
  248. public static function getAllClientIdCount()
  249. {
  250. return static::getClientCountByGroup();
  251. }
  252. /**
  253. * 获取所有在线client_id数(getAllClientIdCount的别名)
  254. *
  255. * @return int
  256. */
  257. public static function getAllClientCount()
  258. {
  259. return static::getAllClientIdCount();
  260. }
  261. /**
  262. * 获取某个组的在线client_id数
  263. *
  264. * @param string $group
  265. * @return int
  266. */
  267. public static function getClientIdCountByGroup($group = '')
  268. {
  269. $gateway_data = GatewayProtocol::$empty;
  270. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_CLIENT_COUNT_BY_GROUP;
  271. $gateway_data['ext_data'] = $group;
  272. $total_count = 0;
  273. $all_buffer_array = static::getBufferFromAllGateway($gateway_data);
  274. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  275. foreach ($buffer_array as $local_port => $count) {
  276. if ($count) {
  277. $total_count += $count;
  278. }
  279. }
  280. }
  281. return $total_count;
  282. }
  283. /**
  284. * getClientIdCountByGroup 函数的别名
  285. *
  286. * @param string $group
  287. * @return int
  288. */
  289. public static function getClientCountByGroup($group = '')
  290. {
  291. return static::getClientIdCountByGroup($group);
  292. }
  293. /**
  294. * 获取某个群组在线client_id列表
  295. *
  296. * @param string $group
  297. * @return array
  298. */
  299. public static function getClientIdListByGroup($group)
  300. {
  301. if (!static::isValidGroupId($group)) {
  302. return array();
  303. }
  304. $data = static::select(array('uid'), array('groups' => is_array($group) ? $group : array($group)));
  305. $client_id_map = array();
  306. foreach ($data as $local_ip => $buffer_array) {
  307. foreach ($buffer_array as $local_port => $items) {
  308. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  309. foreach ($items as $connection_id => $info) {
  310. $client_id = Context::addressToClientId($local_ip, $local_port, $connection_id);
  311. $client_id_map[$client_id] = $client_id;
  312. }
  313. }
  314. }
  315. return $client_id_map;
  316. }
  317. /**
  318. * 获取集群所有在线client_id列表
  319. *
  320. * @return array
  321. */
  322. public static function getAllClientIdList()
  323. {
  324. return static::formatClientIdFromGatewayBuffer(static::select(array('uid')));
  325. }
  326. /**
  327. * 格式化client_id
  328. *
  329. * @param $data
  330. * @return array
  331. */
  332. protected static function formatClientIdFromGatewayBuffer($data)
  333. {
  334. $client_id_list = array();
  335. foreach ($data as $local_ip => $buffer_array) {
  336. foreach ($buffer_array as $local_port => $items) {
  337. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  338. foreach ($items as $connection_id => $info) {
  339. $client_id = Context::addressToClientId($local_ip, $local_port, $connection_id);
  340. $client_id_list[$client_id] = $client_id;
  341. }
  342. }
  343. }
  344. return $client_id_list;
  345. }
  346. /**
  347. * 获取与 uid 绑定的 client_id 列表
  348. *
  349. * @param string $uid
  350. * @return array
  351. */
  352. public static function getClientIdByUid($uid)
  353. {
  354. $gateway_data = GatewayProtocol::$empty;
  355. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_CLIENT_ID_BY_UID;
  356. $gateway_data['ext_data'] = $uid;
  357. $client_list = array();
  358. $all_buffer_array = static::getBufferFromAllGateway($gateway_data);
  359. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  360. foreach ($buffer_array as $local_port => $connection_id_array) {
  361. if ($connection_id_array) {
  362. foreach ($connection_id_array as $connection_id) {
  363. $client_list[] = Context::addressToClientId($local_ip, $local_port, $connection_id);
  364. }
  365. }
  366. }
  367. }
  368. return $client_list;
  369. }
  370. /**
  371. * 获取某个群组在线uid列表
  372. *
  373. * @param string $group
  374. * @return array
  375. */
  376. public static function getUidListByGroup($group)
  377. {
  378. if (!static::isValidGroupId($group)) {
  379. return array();
  380. }
  381. $group = is_array($group) ? $group : array($group);
  382. $data = static::select(array('uid'), array('groups' => $group));
  383. $uid_map = array();
  384. foreach ($data as $local_ip => $buffer_array) {
  385. foreach ($buffer_array as $local_port => $items) {
  386. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  387. foreach ($items as $connection_id => $info) {
  388. if (!empty($info['uid'])) {
  389. $uid_map[$info['uid']] = $info['uid'];
  390. }
  391. }
  392. }
  393. }
  394. return $uid_map;
  395. }
  396. /**
  397. * 获取某个群组在线uid数
  398. *
  399. * @param string $group
  400. * @return int
  401. */
  402. public static function getUidCountByGroup($group)
  403. {
  404. if (static::isValidGroupId($group)) {
  405. return count(static::getUidListByGroup($group));
  406. }
  407. return 0;
  408. }
  409. /**
  410. * 获取全局在线uid列表
  411. *
  412. * @return array
  413. */
  414. public static function getAllUidList()
  415. {
  416. $data = static::select(array('uid'));
  417. $uid_map = array();
  418. foreach ($data as $local_ip => $buffer_array) {
  419. foreach ($buffer_array as $local_port => $items) {
  420. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  421. foreach ($items as $connection_id => $info) {
  422. if (!empty($info['uid'])) {
  423. $uid_map[$info['uid']] = $info['uid'];
  424. }
  425. }
  426. }
  427. }
  428. return $uid_map;
  429. }
  430. /**
  431. * 获取全局在线uid数
  432. * @return int
  433. */
  434. public static function getAllUidCount()
  435. {
  436. return count(static::getAllUidList());
  437. }
  438. /**
  439. * 通过client_id获取uid
  440. *
  441. * @param $client_id
  442. * @return mixed
  443. */
  444. public static function getUidByClientId($client_id)
  445. {
  446. $data = static::select(array('uid'), array('client_id'=>array($client_id)));
  447. foreach ($data as $local_ip => $buffer_array) {
  448. foreach ($buffer_array as $local_port => $items) {
  449. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  450. foreach ($items as $info) {
  451. return $info['uid'];
  452. }
  453. }
  454. }
  455. }
  456. /**
  457. * 获取所有在线的群组id
  458. *
  459. * @return array
  460. */
  461. public static function getAllGroupIdList()
  462. {
  463. $gateway_data = GatewayProtocol::$empty;
  464. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_GROUP_ID_LIST;
  465. $group_id_list = array();
  466. $all_buffer_array = static::getBufferFromAllGateway($gateway_data);
  467. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  468. foreach ($buffer_array as $local_port => $group_id_array) {
  469. if (is_array($group_id_array)) {
  470. foreach ($group_id_array as $group_id) {
  471. if (!isset($group_id_list[$group_id])) {
  472. $group_id_list[$group_id] = $group_id;
  473. }
  474. }
  475. }
  476. }
  477. }
  478. return $group_id_list;
  479. }
  480. /**
  481. * 获取所有在线分组的uid数量,也就是每个分组的在线用户数
  482. *
  483. * @return array
  484. */
  485. public static function getAllGroupUidCount()
  486. {
  487. $group_uid_map = static::getAllGroupUidList();
  488. $group_uid_count_map = array();
  489. foreach ($group_uid_map as $group_id => $uid_list) {
  490. $group_uid_count_map[$group_id] = count($uid_list);
  491. }
  492. return $group_uid_count_map;
  493. }
  494. /**
  495. * 获取所有分组uid在线列表
  496. *
  497. * @return array
  498. */
  499. public static function getAllGroupUidList()
  500. {
  501. $data = static::select(array('uid','groups'));
  502. $group_uid_map = array();
  503. foreach ($data as $local_ip => $buffer_array) {
  504. foreach ($buffer_array as $local_port => $items) {
  505. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  506. foreach ($items as $connection_id => $info) {
  507. if (empty($info['uid']) || empty($info['groups'])) {
  508. break;
  509. }
  510. $uid = $info['uid'];
  511. foreach ($info['groups'] as $group_id) {
  512. if(!isset($group_uid_map[$group_id])) {
  513. $group_uid_map[$group_id] = array();
  514. }
  515. $group_uid_map[$group_id][$uid] = $uid;
  516. }
  517. }
  518. }
  519. }
  520. return $group_uid_map;
  521. }
  522. /**
  523. * 获取所有群组在线client_id列表
  524. *
  525. * @return array
  526. */
  527. public static function getAllGroupClientIdList()
  528. {
  529. $data = static::select(array('groups'));
  530. $group_client_id_map = array();
  531. foreach ($data as $local_ip => $buffer_array) {
  532. foreach ($buffer_array as $local_port => $items) {
  533. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  534. foreach ($items as $connection_id => $info) {
  535. if (empty($info['groups'])) {
  536. break;
  537. }
  538. $client_id = Context::addressToClientId($local_ip, $local_port, $connection_id);
  539. foreach ($info['groups'] as $group_id) {
  540. if(!isset($group_client_id_map[$group_id])) {
  541. $group_client_id_map[$group_id] = array();
  542. }
  543. $group_client_id_map[$group_id][$client_id] = $client_id;
  544. }
  545. }
  546. }
  547. }
  548. return $group_client_id_map;
  549. }
  550. /**
  551. * 获取所有群组在线client_id数量,也就是获取每个群组在线连接数
  552. *
  553. * @return array
  554. */
  555. public static function getAllGroupClientIdCount()
  556. {
  557. $group_client_map = static::getAllGroupClientIdList();
  558. $group_client_count_map = array();
  559. foreach ($group_client_map as $group_id => $client_id_list) {
  560. $group_client_count_map[$group_id] = count($client_id_list);
  561. }
  562. return $group_client_count_map;
  563. }
  564. /**
  565. * 根据条件到gateway搜索数据
  566. *
  567. * @param array $fields
  568. * @param array $where
  569. * @return array
  570. */
  571. protected static function select($fields = array('session','uid','groups'), $where = array())
  572. {
  573. $t = microtime(true);
  574. $gateway_data = GatewayProtocol::$empty;
  575. $gateway_data['cmd'] = GatewayProtocol::CMD_SELECT;
  576. $gateway_data['ext_data'] = array('fields' => $fields, 'where' => $where);
  577. $gateway_data_list = array();
  578. // 有client_id,能计算出需要和哪些gateway通讯,只和必要的gateway通讯能降低系统负载
  579. if (isset($where['client_id'])) {
  580. $client_id_list = $where['client_id'];
  581. unset($gateway_data['ext_data']['where']['client_id']);
  582. $gateway_data['ext_data']['where']['connection_id'] = array();
  583. foreach ($client_id_list as $client_id) {
  584. $address_data = Context::clientIdToAddress($client_id);
  585. if (!$address_data) {
  586. continue;
  587. }
  588. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  589. if (!isset($gateway_data_list[$address])) {
  590. $gateway_data_list[$address] = $gateway_data;
  591. }
  592. $gateway_data_list[$address]['ext_data']['where']['connection_id'][$address_data['connection_id']] = $address_data['connection_id'];
  593. }
  594. foreach ($gateway_data_list as $address => $item) {
  595. $gateway_data_list[$address]['ext_data'] = json_encode($item['ext_data']);
  596. }
  597. // 有其它条件,则还是需要向所有gateway发送
  598. if (count($where) !== 1) {
  599. $gateway_data['ext_data'] = json_encode($gateway_data['ext_data']);
  600. foreach (static::getAllGatewayAddress() as $address) {
  601. if (!isset($gateway_data_list[$address])) {
  602. $gateway_data_list[$address] = $gateway_data;
  603. }
  604. }
  605. }
  606. $data = static::getBufferFromSomeGateway($gateway_data_list);
  607. } else {
  608. $gateway_data['ext_data'] = json_encode($gateway_data['ext_data']);
  609. $data = static::getBufferFromAllGateway($gateway_data);
  610. }
  611. return $data;
  612. }
  613. /**
  614. * 生成验证包,用于验证此客户端的合法性
  615. *
  616. * @return string
  617. */
  618. protected static function generateAuthBuffer()
  619. {
  620. $gateway_data = GatewayProtocol::$empty;
  621. $gateway_data['cmd'] = GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT;
  622. $gateway_data['body'] = json_encode(array(
  623. 'secret_key' => static::$secretKey,
  624. ));
  625. return GatewayProtocol::encode($gateway_data);
  626. }
  627. /**
  628. * 批量向某些gateway发包,并得到返回数组
  629. *
  630. * @param array $gateway_data_array
  631. * @return array
  632. * @throws Exception
  633. */
  634. protected static function getBufferFromSomeGateway($gateway_data_array)
  635. {
  636. $gateway_buffer_array = array();
  637. $auth_buffer = static::$secretKey ? static::generateAuthBuffer() : '';
  638. foreach ($gateway_data_array as $address => $gateway_data) {
  639. if ($auth_buffer) {
  640. $gateway_buffer_array[$address] = $auth_buffer.GatewayProtocol::encode($gateway_data);
  641. } else {
  642. $gateway_buffer_array[$address] = GatewayProtocol::encode($gateway_data);
  643. }
  644. }
  645. return static::getBufferFromGateway($gateway_buffer_array);
  646. }
  647. /**
  648. * 批量向所有 gateway 发包,并得到返回数组
  649. *
  650. * @param string $gateway_data
  651. * @return array
  652. * @throws Exception
  653. */
  654. protected static function getBufferFromAllGateway($gateway_data)
  655. {
  656. $addresses = static::getAllGatewayAddress();
  657. $gateway_buffer_array = array();
  658. $gateway_buffer = GatewayProtocol::encode($gateway_data);
  659. $gateway_buffer = static::$secretKey ? static::generateAuthBuffer() . $gateway_buffer : $gateway_buffer;
  660. foreach ($addresses as $address) {
  661. $gateway_buffer_array[$address] = $gateway_buffer;
  662. }
  663. return static::getBufferFromGateway($gateway_buffer_array);
  664. }
  665. /**
  666. * 获取所有gateway内部通讯地址
  667. *
  668. * @return array
  669. * @throws Exception
  670. */
  671. protected static function getAllGatewayAddress()
  672. {
  673. if (isset(static::$businessWorker)) {
  674. $addresses = static::$businessWorker->getAllGatewayAddresses();
  675. if (empty($addresses)) {
  676. throw new Exception('businessWorker::getAllGatewayAddresses return empty');
  677. }
  678. } else {
  679. $addresses = static::getAllGatewayAddressesFromRegister();
  680. if (empty($addresses)) {
  681. return array();
  682. }
  683. }
  684. return $addresses;
  685. }
  686. /**
  687. * 批量向gateway发送并获取数据
  688. * @param $gateway_buffer_array
  689. * @return array
  690. */
  691. protected static function getBufferFromGateway($gateway_buffer_array)
  692. {
  693. $client_array = $status_data = $client_address_map = $receive_buffer_array = $recv_length_array = array();
  694. // 批量向所有gateway进程发送请求数据
  695. foreach ($gateway_buffer_array as $address => $gateway_buffer) {
  696. $client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout);
  697. if ($client && strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer)) {
  698. $socket_id = (int)$client;
  699. $client_array[$socket_id] = $client;
  700. $client_address_map[$socket_id] = explode(':', $address);
  701. $receive_buffer_array[$socket_id] = '';
  702. }
  703. }
  704. // 超时5秒
  705. $timeout = 5;
  706. $time_start = microtime(true);
  707. // 批量接收请求
  708. while (count($client_array) > 0) {
  709. $write = $except = array();
  710. $read = $client_array;
  711. if (@stream_select($read, $write, $except, $timeout)) {
  712. foreach ($read as $client) {
  713. $socket_id = (int)$client;
  714. $buffer = stream_socket_recvfrom($client, 65535);
  715. if ($buffer !== '' && $buffer !== false) {
  716. $receive_buffer_array[$socket_id] .= $buffer;
  717. $receive_length = strlen($receive_buffer_array[$socket_id]);
  718. if (empty($recv_length_array[$socket_id]) && $receive_length >= 4) {
  719. $recv_length_array[$socket_id] = current(unpack('N', $receive_buffer_array[$socket_id]));
  720. }
  721. if (!empty($recv_length_array[$socket_id]) && $receive_length >= $recv_length_array[$socket_id] + 4) {
  722. unset($client_array[$socket_id]);
  723. }
  724. } elseif (feof($client)) {
  725. unset($client_array[$socket_id]);
  726. }
  727. }
  728. }
  729. if (microtime(true) - $time_start > $timeout) {
  730. break;
  731. }
  732. }
  733. $format_buffer_array = array();
  734. foreach ($receive_buffer_array as $socket_id => $buffer) {
  735. $local_ip = ip2long($client_address_map[$socket_id][0]);
  736. $local_port = $client_address_map[$socket_id][1];
  737. $format_buffer_array[$local_ip][$local_port] = unserialize(substr($buffer, 4));
  738. }
  739. return $format_buffer_array;
  740. }
  741. /**
  742. * 踢掉某个客户端,并以$message通知被踢掉客户端
  743. *
  744. * @param string $client_id
  745. * @param string $message
  746. * @return void
  747. */
  748. public static function closeClient($client_id, $message = null)
  749. {
  750. if ($client_id === Context::$client_id) {
  751. return static::closeCurrentClient($message);
  752. } // 不是发给当前用户则使用存储中的地址
  753. else {
  754. $address_data = Context::clientIdToAddress($client_id);
  755. if (!$address_data) {
  756. return false;
  757. }
  758. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  759. return static::kickAddress($address, $address_data['connection_id'], $message);
  760. }
  761. }
  762. /**
  763. * 踢掉某个客户端并直接立即销毁相关连接
  764. *
  765. * @param string $client_id
  766. * @return bool
  767. */
  768. public static function destoryClient($client_id)
  769. {
  770. if ($client_id === Context::$client_id) {
  771. return static::destoryCurrentClient();
  772. } // 不是发给当前用户则使用存储中的地址
  773. else {
  774. $address_data = Context::clientIdToAddress($client_id);
  775. if (!$address_data) {
  776. return false;
  777. }
  778. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  779. return static::destroyAddress($address, $address_data['connection_id']);
  780. }
  781. }
  782. /**
  783. * 踢掉当前客户端并直接立即销毁相关连接
  784. *
  785. * @return bool
  786. * @throws Exception
  787. */
  788. public static function destoryCurrentClient()
  789. {
  790. if (!Context::$connection_id) {
  791. throw new Exception('destoryCurrentClient can not be called in async context');
  792. }
  793. $address = long2ip(Context::$local_ip) . ':' . Context::$local_port;
  794. return static::destroyAddress($address, Context::$connection_id);
  795. }
  796. /**
  797. * 将 client_id 与 uid 绑定
  798. *
  799. * @param string $client_id
  800. * @param int|string $uid
  801. * @return void
  802. */
  803. public static function bindUid($client_id, $uid)
  804. {
  805. static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_BIND_UID, '', $uid);
  806. }
  807. /**
  808. * 将 client_id 与 uid 解除绑定
  809. *
  810. * @param string $client_id
  811. * @param int|string $uid
  812. * @return void
  813. */
  814. public static function unbindUid($client_id, $uid)
  815. {
  816. static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_UNBIND_UID, '', $uid);
  817. }
  818. /**
  819. * 将 client_id 加入组
  820. *
  821. * @param string $client_id
  822. * @param int|string $group
  823. * @return void
  824. */
  825. public static function joinGroup($client_id, $group)
  826. {
  827. static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_JOIN_GROUP, '', $group);
  828. }
  829. /**
  830. * 将 client_id 离开组
  831. *
  832. * @param string $client_id
  833. * @param int|string $group
  834. *
  835. * @return void
  836. */
  837. public static function leaveGroup($client_id, $group)
  838. {
  839. static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_LEAVE_GROUP, '', $group);
  840. }
  841. /**
  842. * 取消分组
  843. *
  844. * @param int|string $group
  845. *
  846. * @return void
  847. */
  848. public static function ungroup($group)
  849. {
  850. if (!static::isValidGroupId($group)) {
  851. return false;
  852. }
  853. $gateway_data = GatewayProtocol::$empty;
  854. $gateway_data['cmd'] = GatewayProtocol::CMD_UNGROUP;
  855. $gateway_data['ext_data'] = $group;
  856. return static::sendToAllGateway($gateway_data);
  857. }
  858. /**
  859. * 向所有 uid 发送
  860. *
  861. * @param int|string|array $uid
  862. * @param string $message
  863. *
  864. * @return void
  865. */
  866. public static function sendToUid($uid, $message)
  867. {
  868. $gateway_data = GatewayProtocol::$empty;
  869. $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_UID;
  870. $gateway_data['body'] = $message;
  871. if (!is_array($uid)) {
  872. $uid = array($uid);
  873. }
  874. $gateway_data['ext_data'] = json_encode($uid);
  875. static::sendToAllGateway($gateway_data);
  876. }
  877. /**
  878. * 向 group 发送
  879. *
  880. * @param int|string|array $group 组(不允许是 0 '0' false null array()等为空的值)
  881. * @param string $message 消息
  882. * @param array $exclude_client_id 不给这些client_id发
  883. * @param bool $raw 发送原始数据(即不调用gateway的协议的encode方法)
  884. *
  885. * @return void
  886. */
  887. public static function sendToGroup($group, $message, $exclude_client_id = null, $raw = false)
  888. {
  889. if (!static::isValidGroupId($group)) {
  890. return false;
  891. }
  892. $gateway_data = GatewayProtocol::$empty;
  893. $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_GROUP;
  894. $gateway_data['body'] = $message;
  895. if ($raw) {
  896. $gateway_data['flag'] |= GatewayProtocol::FLAG_NOT_CALL_ENCODE;
  897. }
  898. if (!is_array($group)) {
  899. $group = array($group);
  900. }
  901. // 分组发送,没有排除的client_id,直接发送
  902. $default_ext_data_buffer = json_encode(array('group'=> $group, 'exclude'=> null));
  903. if (empty($exclude_client_id)) {
  904. $gateway_data['ext_data'] = $default_ext_data_buffer;
  905. return static::sendToAllGateway($gateway_data);
  906. }
  907. // 分组发送,有排除的client_id,需要将client_id转换成对应gateway进程内的connectionId
  908. if (!is_array($exclude_client_id)) {
  909. $exclude_client_id = array($exclude_client_id);
  910. }
  911. $address_connection_array = static::clientIdArrayToAddressArray($exclude_client_id);
  912. // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
  913. if (static::$businessWorker) {
  914. foreach (static::$businessWorker->gatewayConnections as $address => $gateway_connection) {
  915. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  916. json_encode(array('group'=> $group, 'exclude'=> $address_connection_array[$address])) :
  917. $default_ext_data_buffer;
  918. /** @var TcpConnection $gateway_connection */
  919. $gateway_connection->send($gateway_data);
  920. }
  921. } // 运行在其它环境中,通过注册中心得到gateway地址
  922. else {
  923. $addresses = static::getAllGatewayAddressesFromRegister();
  924. foreach ($addresses as $address) {
  925. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  926. json_encode(array('group'=> $group, 'exclude'=> $address_connection_array[$address])) :
  927. $default_ext_data_buffer;
  928. static::sendToGateway($address, $gateway_data);
  929. }
  930. }
  931. }
  932. /**
  933. * 更新 session,框架自动调用,开发者不要调用
  934. *
  935. * @param string $client_id
  936. * @param string $session_str
  937. * @return bool
  938. */
  939. public static function setSocketSession($client_id, $session_str)
  940. {
  941. return static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_SET_SESSION, '', $session_str);
  942. }
  943. /**
  944. * 设置 session,原session值会被覆盖
  945. *
  946. * @param string $client_id
  947. * @param array $session
  948. *
  949. * @return void
  950. */
  951. public static function setSession($client_id, array $session)
  952. {
  953. if (Context::$client_id === $client_id) {
  954. $_SESSION = $session;
  955. Context::$old_session = $_SESSION;
  956. }
  957. static::setSocketSession($client_id, Context::sessionEncode($session));
  958. }
  959. /**
  960. * 更新 session,实际上是与老的session合并
  961. *
  962. * @param string $client_id
  963. * @param array $session
  964. *
  965. * @return void
  966. */
  967. public static function updateSession($client_id, array $session)
  968. {
  969. if (Context::$client_id === $client_id) {
  970. $_SESSION = array_replace_recursive((array)$_SESSION, $session);
  971. Context::$old_session = $_SESSION;
  972. }
  973. static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_UPDATE_SESSION, '', Context::sessionEncode($session));
  974. }
  975. /**
  976. * 获取某个client_id的session
  977. *
  978. * @param string $client_id
  979. * @return mixed false表示出错、null表示用户不存在、array表示具体的session信息
  980. */
  981. public static function getSession($client_id)
  982. {
  983. $address_data = Context::clientIdToAddress($client_id);
  984. if (!$address_data) {
  985. return false;
  986. }
  987. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  988. if (isset(static::$businessWorker)) {
  989. if (!isset(static::$businessWorker->gatewayConnections[$address])) {
  990. return null;
  991. }
  992. }
  993. $gateway_data = GatewayProtocol::$empty;
  994. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_SESSION_BY_CLIENT_ID;
  995. $gateway_data['connection_id'] = $address_data['connection_id'];
  996. return static::sendAndRecv($address, $gateway_data);
  997. }
  998. /**
  999. * 向某个用户网关发送命令和消息
  1000. *
  1001. * @param string $client_id
  1002. * @param int $cmd
  1003. * @param string $message
  1004. * @param string $ext_data
  1005. * @return boolean
  1006. */
  1007. protected static function sendCmdAndMessageToClient($client_id, $cmd, $message, $ext_data = '')
  1008. {
  1009. // 如果是发给当前用户则直接获取上下文中的地址
  1010. if ($client_id === Context::$client_id || $client_id === null) {
  1011. $address = long2ip(Context::$local_ip) . ':' . Context::$local_port;
  1012. $connection_id = Context::$connection_id;
  1013. } else {
  1014. $address_data = Context::clientIdToAddress($client_id);
  1015. if (!$address_data) {
  1016. return false;
  1017. }
  1018. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  1019. $connection_id = $address_data['connection_id'];
  1020. }
  1021. $gateway_data = GatewayProtocol::$empty;
  1022. $gateway_data['cmd'] = $cmd;
  1023. $gateway_data['connection_id'] = $connection_id;
  1024. $gateway_data['body'] = $message;
  1025. if (!empty($ext_data)) {
  1026. $gateway_data['ext_data'] = $ext_data;
  1027. }
  1028. return static::sendToGateway($address, $gateway_data);
  1029. }
  1030. /**
  1031. * 发送数据并返回
  1032. *
  1033. * @param int $address
  1034. * @param mixed $data
  1035. * @return bool
  1036. * @throws Exception
  1037. */
  1038. protected static function sendAndRecv($address, $data)
  1039. {
  1040. $buffer = GatewayProtocol::encode($data);
  1041. $buffer = static::$secretKey ? static::generateAuthBuffer() . $buffer : $buffer;
  1042. $client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout);
  1043. if (!$client) {
  1044. throw new Exception("can not connect to tcp://$address $errmsg");
  1045. }
  1046. if (strlen($buffer) === stream_socket_sendto($client, $buffer)) {
  1047. $timeout = 5;
  1048. // 阻塞读
  1049. stream_set_blocking($client, 1);
  1050. // 1秒超时
  1051. stream_set_timeout($client, 1);
  1052. $all_buffer = '';
  1053. $time_start = microtime(true);
  1054. $pack_len = 0;
  1055. while (1) {
  1056. $buf = stream_socket_recvfrom($client, 655350);
  1057. if ($buf !== '' && $buf !== false) {
  1058. $all_buffer .= $buf;
  1059. } else {
  1060. if (feof($client)) {
  1061. throw new Exception("connection close tcp://$address");
  1062. } elseif (microtime(true) - $time_start > $timeout) {
  1063. break;
  1064. }
  1065. continue;
  1066. }
  1067. $recv_len = strlen($all_buffer);
  1068. if (!$pack_len && $recv_len >= 4) {
  1069. $pack_len= current(unpack('N', $all_buffer));
  1070. }
  1071. // 回复的数据都是以\n结尾
  1072. if (($pack_len && $recv_len >= $pack_len + 4) || microtime(true) - $time_start > $timeout) {
  1073. break;
  1074. }
  1075. }
  1076. // 返回结果
  1077. return unserialize(substr($all_buffer, 4));
  1078. } else {
  1079. throw new Exception("sendAndRecv($address, \$bufer) fail ! Can not send data!", 502);
  1080. }
  1081. }
  1082. /**
  1083. * 发送数据到网关
  1084. *
  1085. * @param string $address
  1086. * @param array $gateway_data
  1087. * @return bool
  1088. */
  1089. protected static function sendToGateway($address, $gateway_data)
  1090. {
  1091. return static::sendBufferToGateway($address, GatewayProtocol::encode($gateway_data));
  1092. }
  1093. /**
  1094. * 发送buffer数据到网关
  1095. * @param string $address
  1096. * @param string $gateway_buffer
  1097. * @return bool
  1098. */
  1099. protected static function sendBufferToGateway($address, $gateway_buffer)
  1100. {
  1101. // 有$businessWorker说明是workerman环境,使用$businessWorker发送数据
  1102. if (static::$businessWorker) {
  1103. if (!isset(static::$businessWorker->gatewayConnections[$address])) {
  1104. return false;
  1105. }
  1106. return static::$businessWorker->gatewayConnections[$address]->send($gateway_buffer, true);
  1107. }
  1108. // 非workerman环境
  1109. $gateway_buffer = static::$secretKey ? static::generateAuthBuffer() . $gateway_buffer : $gateway_buffer;
  1110. $flag = static::$persistentConnection ? STREAM_CLIENT_PERSISTENT | STREAM_CLIENT_CONNECT : STREAM_CLIENT_CONNECT;
  1111. $client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout, $flag);
  1112. return strlen($gateway_buffer) == stream_socket_sendto($client, $gateway_buffer);
  1113. }
  1114. /**
  1115. * 向所有 gateway 发送数据
  1116. *
  1117. * @param string $gateway_data
  1118. * @throws Exception
  1119. *
  1120. * @return void
  1121. */
  1122. protected static function sendToAllGateway($gateway_data)
  1123. {
  1124. $buffer = GatewayProtocol::encode($gateway_data);
  1125. // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
  1126. if (static::$businessWorker) {
  1127. foreach (static::$businessWorker->gatewayConnections as $gateway_connection) {
  1128. /** @var TcpConnection $gateway_connection */
  1129. $gateway_connection->send($buffer, true);
  1130. }
  1131. } // 运行在其它环境中,通过注册中心得到gateway地址
  1132. else {
  1133. $all_addresses = static::getAllGatewayAddressesFromRegister();
  1134. foreach ($all_addresses as $address) {
  1135. static::sendBufferToGateway($address, $buffer);
  1136. }
  1137. }
  1138. }
  1139. /**
  1140. * 踢掉某个网关的 socket
  1141. *
  1142. * @param string $address
  1143. * @param int $connection_id
  1144. * @return bool
  1145. */
  1146. protected static function kickAddress($address, $connection_id, $message)
  1147. {
  1148. $gateway_data = GatewayProtocol::$empty;
  1149. $gateway_data['cmd'] = GatewayProtocol::CMD_KICK;
  1150. $gateway_data['connection_id'] = $connection_id;
  1151. $gateway_data['body'] = $message;
  1152. return static::sendToGateway($address, $gateway_data);
  1153. }
  1154. /**
  1155. * 销毁某个网关的 socket
  1156. *
  1157. * @param string $address
  1158. * @param int $connection_id
  1159. * @return bool
  1160. */
  1161. protected static function destroyAddress($address, $connection_id)
  1162. {
  1163. $gateway_data = GatewayProtocol::$empty;
  1164. $gateway_data['cmd'] = GatewayProtocol::CMD_DESTROY;
  1165. $gateway_data['connection_id'] = $connection_id;
  1166. return static::sendToGateway($address, $gateway_data);
  1167. }
  1168. /**
  1169. * 将clientid数组转换成address数组
  1170. *
  1171. * @param array $client_id_array
  1172. * @return array
  1173. */
  1174. protected static function clientIdArrayToAddressArray(array $client_id_array)
  1175. {
  1176. $address_connection_array = array();
  1177. foreach ($client_id_array as $client_id) {
  1178. $address_data = Context::clientIdToAddress($client_id);
  1179. if ($address_data) {
  1180. $address = long2ip($address_data['local_ip']) .
  1181. ":{$address_data['local_port']}";
  1182. $address_connection_array[$address][$address_data['connection_id']] = $address_data['connection_id'];
  1183. }
  1184. }
  1185. return $address_connection_array;
  1186. }
  1187. /**
  1188. * 设置 gateway 实例
  1189. *
  1190. * @param \GatewayWorker\BusinessWorker $business_worker_instance
  1191. */
  1192. public static function setBusinessWorker($business_worker_instance)
  1193. {
  1194. static::$businessWorker = $business_worker_instance;
  1195. }
  1196. /**
  1197. * 获取通过注册中心获取所有 gateway 通讯地址
  1198. *
  1199. * @return array
  1200. * @throws Exception
  1201. */
  1202. protected static function getAllGatewayAddressesFromRegister()
  1203. {
  1204. static $addresses_cache, $last_update;
  1205. if (static::$addressesCacheDisable) {
  1206. $addresses_cache = null;
  1207. }
  1208. $time_now = time();
  1209. $expiration_time = 1;
  1210. $register_addresses = (array)static::$registerAddress;
  1211. $client = null;
  1212. if(empty($addresses_cache) || $time_now - $last_update > $expiration_time) {
  1213. foreach ($register_addresses as $register_address) {
  1214. set_error_handler(function(){});
  1215. $client = stream_socket_client('tcp://' . $register_address, $errno, $errmsg, static::$connectTimeout);
  1216. restore_error_handler();
  1217. if ($client) {
  1218. break;
  1219. }
  1220. }
  1221. if (!$client) {
  1222. throw new Exception('Can not connect to tcp://' . $register_address . ' ' . $errmsg);
  1223. }
  1224. fwrite($client, '{"event":"worker_connect","secret_key":"' . static::$secretKey . '"}' . "\n");
  1225. stream_set_timeout($client, 5);
  1226. $ret = fgets($client, 655350);
  1227. if (!$ret || !$data = json_decode(trim($ret), true)) {
  1228. throw new Exception('getAllGatewayAddressesFromRegister fail. tcp://' .
  1229. $register_address . ' return ' . var_export($ret, true));
  1230. }
  1231. $last_update = $time_now;
  1232. $addresses_cache = $data['addresses'];
  1233. }
  1234. if (!$addresses_cache) {
  1235. throw new Exception('Gateway::getAllGatewayAddressesFromRegister() with registerAddress:' .
  1236. json_encode(static::$registerAddress) . ' return ' . var_export($addresses_cache, true));
  1237. }
  1238. return $addresses_cache;
  1239. }
  1240. /**
  1241. * 检查群组id是否合法
  1242. *
  1243. * @param $group
  1244. * @return bool
  1245. */
  1246. protected static function isValidGroupId($group)
  1247. {
  1248. if (empty($group)) {
  1249. echo new \Exception('group('.var_export($group, true).') empty');
  1250. return false;
  1251. }
  1252. return true;
  1253. }
  1254. }
  1255. /**
  1256. * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
  1257. */
  1258. class Context
  1259. {
  1260. /**
  1261. * 内部通讯id
  1262. * @var string
  1263. */
  1264. public static $local_ip;
  1265. /**
  1266. * 内部通讯端口
  1267. * @var int
  1268. */
  1269. public static $local_port;
  1270. /**
  1271. * 客户端ip
  1272. * @var string
  1273. */
  1274. public static $client_ip;
  1275. /**
  1276. * 客户端端口
  1277. * @var int
  1278. */
  1279. public static $client_port;
  1280. /**
  1281. * client_id
  1282. * @var string
  1283. */
  1284. public static $client_id;
  1285. /**
  1286. * 连接connection->id
  1287. * @var int
  1288. */
  1289. public static $connection_id;
  1290. /**
  1291. * 旧的session
  1292. *
  1293. * @var string
  1294. */
  1295. public static $old_session;
  1296. /**
  1297. * 编码session
  1298. * @param mixed $session_data
  1299. * @return string
  1300. */
  1301. public static function sessionEncode($session_data = '')
  1302. {
  1303. if($session_data !== '')
  1304. {
  1305. return serialize($session_data);
  1306. }
  1307. return '';
  1308. }
  1309. /**
  1310. * 解码session
  1311. * @param string $session_buffer
  1312. * @return mixed
  1313. */
  1314. public static function sessionDecode($session_buffer)
  1315. {
  1316. return unserialize($session_buffer);
  1317. }
  1318. /**
  1319. * 清除上下文
  1320. * @return void
  1321. */
  1322. public static function clear()
  1323. {
  1324. static::$local_ip = static::$local_port = static::$client_ip = static::$client_port =
  1325. static::$client_id = static::$connection_id = static::$old_session = null;
  1326. }
  1327. /**
  1328. * 通讯地址到client_id的转换
  1329. * @return string
  1330. */
  1331. public static function addressToClientId($local_ip, $local_port, $connection_id)
  1332. {
  1333. return bin2hex(pack('NnN', $local_ip, $local_port, $connection_id));
  1334. }
  1335. /**
  1336. * client_id到通讯地址的转换
  1337. * @return array
  1338. */
  1339. public static function clientIdToAddress($client_id)
  1340. {
  1341. if(strlen($client_id) !== 20)
  1342. {
  1343. throw new \Exception("client_id $client_id is invalid");
  1344. }
  1345. return unpack('Nlocal_ip/nlocal_port/Nconnection_id' ,pack('H*', $client_id));
  1346. }
  1347. }
  1348. /**
  1349. * Gateway 与 Worker 间通讯的二进制协议
  1350. *
  1351. * struct GatewayProtocol
  1352. * {
  1353. * unsigned int pack_len,
  1354. * unsigned char cmd,//命令字
  1355. * unsigned int local_ip,
  1356. * unsigned short local_port,
  1357. * unsigned int client_ip,
  1358. * unsigned short client_port,
  1359. * unsigned int connection_id,
  1360. * unsigned char flag,
  1361. * unsigned short gateway_port,
  1362. * unsigned int ext_len,
  1363. * char[ext_len] ext_data,
  1364. * char[pack_length-HEAD_LEN] body//包体
  1365. * }
  1366. * NCNnNnNCnN
  1367. */
  1368. class GatewayProtocol
  1369. {
  1370. // 发给worker,gateway有一个新的连接
  1371. const CMD_ON_CONNECT = 1;
  1372. // 发给worker的,客户端有消息
  1373. const CMD_ON_MESSAGE = 3;
  1374. // 发给worker上的关闭链接事件
  1375. const CMD_ON_CLOSE = 4;
  1376. // 发给gateway的向单个用户发送数据
  1377. const CMD_SEND_TO_ONE = 5;
  1378. // 发给gateway的向所有用户发送数据
  1379. const CMD_SEND_TO_ALL = 6;
  1380. // 发给gateway的踢出用户
  1381. // 1、如果有待发消息,将在发送完后立即销毁用户连接
  1382. // 2、如果无待发消息,将立即销毁用户连接
  1383. const CMD_KICK = 7;
  1384. // 发给gateway的立即销毁用户连接
  1385. const CMD_DESTROY = 8;
  1386. // 发给gateway,通知用户session更新
  1387. const CMD_UPDATE_SESSION = 9;
  1388. // 获取在线状态
  1389. const CMD_GET_ALL_CLIENT_SESSIONS = 10;
  1390. // 判断是否在线
  1391. const CMD_IS_ONLINE = 11;
  1392. // client_id绑定到uid
  1393. const CMD_BIND_UID = 12;
  1394. // 解绑
  1395. const CMD_UNBIND_UID = 13;
  1396. // 向uid发送数据
  1397. const CMD_SEND_TO_UID = 14;
  1398. // 根据uid获取绑定的clientid
  1399. const CMD_GET_CLIENT_ID_BY_UID = 15;
  1400. // 加入组
  1401. const CMD_JOIN_GROUP = 20;
  1402. // 离开组
  1403. const CMD_LEAVE_GROUP = 21;
  1404. // 向组成员发消息
  1405. const CMD_SEND_TO_GROUP = 22;
  1406. // 获取组成员
  1407. const CMD_GET_CLIENT_SESSIONS_BY_GROUP = 23;
  1408. // 获取组在线连接数
  1409. const CMD_GET_CLIENT_COUNT_BY_GROUP = 24;
  1410. // 按照条件查找
  1411. const CMD_SELECT = 25;
  1412. // 获取在线的群组ID
  1413. const CMD_GET_GROUP_ID_LIST = 26;
  1414. // 取消分组
  1415. const CMD_UNGROUP = 27;
  1416. // worker连接gateway事件
  1417. const CMD_WORKER_CONNECT = 200;
  1418. // 心跳
  1419. const CMD_PING = 201;
  1420. // GatewayClient连接gateway事件
  1421. const CMD_GATEWAY_CLIENT_CONNECT = 202;
  1422. // 根据client_id获取session
  1423. const CMD_GET_SESSION_BY_CLIENT_ID = 203;
  1424. // 发给gateway,覆盖session
  1425. const CMD_SET_SESSION = 204;
  1426. // 当websocket握手时触发,只有websocket协议支持此命令字
  1427. const CMD_ON_WEBSOCKET_CONNECT = 205;
  1428. // 包体是标量
  1429. const FLAG_BODY_IS_SCALAR = 0x01;
  1430. // 通知gateway在send时不调用协议encode方法,在广播组播时提升性能
  1431. const FLAG_NOT_CALL_ENCODE = 0x02;
  1432. /**
  1433. * 包头长度
  1434. *
  1435. * @var int
  1436. */
  1437. const HEAD_LEN = 28;
  1438. public static $empty = array(
  1439. 'cmd' => 0,
  1440. 'local_ip' => 0,
  1441. 'local_port' => 0,
  1442. 'client_ip' => 0,
  1443. 'client_port' => 0,
  1444. 'connection_id' => 0,
  1445. 'flag' => 0,
  1446. 'gateway_port' => 0,
  1447. 'ext_data' => '',
  1448. 'body' => '',
  1449. );
  1450. /**
  1451. * 返回包长度
  1452. *
  1453. * @param string $buffer
  1454. * @return int return current package length
  1455. */
  1456. public static function input($buffer)
  1457. {
  1458. if (strlen($buffer) < self::HEAD_LEN) {
  1459. return 0;
  1460. }
  1461. $data = unpack("Npack_len", $buffer);
  1462. return $data['pack_len'];
  1463. }
  1464. /**
  1465. * 获取整个包的 buffer
  1466. *
  1467. * @param mixed $data
  1468. * @return string
  1469. */
  1470. public static function encode($data)
  1471. {
  1472. $flag = (int)is_scalar($data['body']);
  1473. if (!$flag) {
  1474. $data['body'] = serialize($data['body']);
  1475. }
  1476. $data['flag'] |= $flag;
  1477. $ext_len = strlen($data['ext_data']);
  1478. $package_len = self::HEAD_LEN + $ext_len + strlen($data['body']);
  1479. return pack("NCNnNnNCnN", $package_len,
  1480. $data['cmd'], $data['local_ip'],
  1481. $data['local_port'], $data['client_ip'],
  1482. $data['client_port'], $data['connection_id'],
  1483. $data['flag'], $data['gateway_port'],
  1484. $ext_len) . $data['ext_data'] . $data['body'];
  1485. }
  1486. /**
  1487. * 从二进制数据转换为数组
  1488. *
  1489. * @param string $buffer
  1490. * @return array
  1491. */
  1492. public static function decode($buffer)
  1493. {
  1494. $data = unpack("Npack_len/Ccmd/Nlocal_ip/nlocal_port/Nclient_ip/nclient_port/Nconnection_id/Cflag/ngateway_port/Next_len",
  1495. $buffer);
  1496. if ($data['ext_len'] > 0) {
  1497. $data['ext_data'] = substr($buffer, self::HEAD_LEN, $data['ext_len']);
  1498. if ($data['flag'] & self::FLAG_BODY_IS_SCALAR) {
  1499. $data['body'] = substr($buffer, self::HEAD_LEN + $data['ext_len']);
  1500. } else {
  1501. $data['body'] = unserialize(substr($buffer, self::HEAD_LEN + $data['ext_len']));
  1502. }
  1503. } else {
  1504. $data['ext_data'] = '';
  1505. if ($data['flag'] & self::FLAG_BODY_IS_SCALAR) {
  1506. $data['body'] = substr($buffer, self::HEAD_LEN);
  1507. } else {
  1508. $data['body'] = unserialize(substr($buffer, self::HEAD_LEN));
  1509. }
  1510. }
  1511. return $data;
  1512. }
  1513. }