Base.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. <?php
  2. /**
  3. * Copyright (C) 2014-2020 Textalk/Abicart and contributors.
  4. *
  5. * This file is part of Websocket PHP and is free software under the ISC License.
  6. * License text: https://raw.githubusercontent.com/Textalk/websocket-php/master/COPYING
  7. */
  8. namespace WebSocket;
  9. use Psr\Log\LoggerAwareInterface;
  10. use Psr\Log\LoggerInterface;
  11. use Psr\Log\NullLogger;
  12. class Base implements LoggerAwareInterface
  13. {
  14. protected $socket;
  15. protected $options = [];
  16. protected $is_closing = false;
  17. protected $last_opcode = null;
  18. protected $close_status = null;
  19. protected $logger;
  20. protected static $opcodes = array(
  21. 'continuation' => 0,
  22. 'text' => 1,
  23. 'binary' => 2,
  24. 'close' => 8,
  25. 'ping' => 9,
  26. 'pong' => 10,
  27. );
  28. public function getLastOpcode()
  29. {
  30. return $this->last_opcode;
  31. }
  32. public function getCloseStatus()
  33. {
  34. return $this->close_status;
  35. }
  36. public function isConnected()
  37. {
  38. return $this->socket &&
  39. (get_resource_type($this->socket) == 'stream' ||
  40. get_resource_type($this->socket) == 'persistent stream');
  41. }
  42. public function setTimeout($timeout)
  43. {
  44. $this->options['timeout'] = $timeout;
  45. if ($this->isConnected()) {
  46. stream_set_timeout($this->socket, $timeout);
  47. }
  48. }
  49. public function setFragmentSize($fragment_size)
  50. {
  51. $this->options['fragment_size'] = $fragment_size;
  52. return $this;
  53. }
  54. public function getFragmentSize()
  55. {
  56. return $this->options['fragment_size'];
  57. }
  58. public function setLogger(LoggerInterface $logger = null)
  59. {
  60. $this->logger = $logger ?: new NullLogger();
  61. }
  62. public function send($payload, $opcode = 'text', $masked = true)
  63. {
  64. if (!$this->isConnected()) {
  65. $this->connect();
  66. }
  67. if (!in_array($opcode, array_keys(self::$opcodes))) {
  68. $warning = "Bad opcode '{$opcode}'. Try 'text' or 'binary'.";
  69. $this->logger->warning($warning);
  70. throw new BadOpcodeException($warning);
  71. }
  72. $payload_chunks = str_split($payload, $this->options['fragment_size']);
  73. $frame_opcode = $opcode;
  74. for ($index = 0; $index < count($payload_chunks); ++$index) {
  75. $chunk = $payload_chunks[$index];
  76. $final = $index == count($payload_chunks) - 1;
  77. $this->sendFragment($final, $chunk, $frame_opcode, $masked);
  78. // all fragments after the first will be marked a continuation
  79. $frame_opcode = 'continuation';
  80. }
  81. $this->logger->info("Sent '{$opcode}' message");
  82. }
  83. protected function sendFragment($final, $payload, $opcode, $masked)
  84. {
  85. // Binary string for header.
  86. $frame_head_binstr = '';
  87. // Write FIN, final fragment bit.
  88. $frame_head_binstr .= (bool) $final ? '1' : '0';
  89. // RSV 1, 2, & 3 false and unused.
  90. $frame_head_binstr .= '000';
  91. // Opcode rest of the byte.
  92. $frame_head_binstr .= sprintf('%04b', self::$opcodes[$opcode]);
  93. // Use masking?
  94. $frame_head_binstr .= $masked ? '1' : '0';
  95. // 7 bits of payload length...
  96. $payload_length = strlen($payload);
  97. if ($payload_length > 65535) {
  98. $frame_head_binstr .= decbin(127);
  99. $frame_head_binstr .= sprintf('%064b', $payload_length);
  100. } elseif ($payload_length > 125) {
  101. $frame_head_binstr .= decbin(126);
  102. $frame_head_binstr .= sprintf('%016b', $payload_length);
  103. } else {
  104. $frame_head_binstr .= sprintf('%07b', $payload_length);
  105. }
  106. $frame = '';
  107. // Write frame head to frame.
  108. foreach (str_split($frame_head_binstr, 8) as $binstr) {
  109. $frame .= chr(bindec($binstr));
  110. }
  111. // Handle masking
  112. if ($masked) {
  113. // generate a random mask:
  114. $mask = '';
  115. for ($i = 0; $i < 4; $i++) {
  116. $mask .= chr(rand(0, 255));
  117. }
  118. $frame .= $mask;
  119. }
  120. // Append payload to frame:
  121. for ($i = 0; $i < $payload_length; $i++) {
  122. $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
  123. }
  124. $this->write($frame);
  125. }
  126. public function receive()
  127. {
  128. if (!$this->isConnected()) {
  129. $this->connect();
  130. }
  131. $payload = '';
  132. do {
  133. $response = $this->receiveFragment();
  134. $payload .= $response[0];
  135. } while (!$response[1]);
  136. $this->logger->info("Received '{$this->last_opcode}' message");
  137. return $payload;
  138. }
  139. protected function receiveFragment()
  140. {
  141. // Just read the main fragment information first.
  142. $data = $this->read(2);
  143. // Is this the final fragment? // Bit 0 in byte 0
  144. $final = (bool) (ord($data[0]) & 1 << 7);
  145. // Should be unused, and must be false… // Bits 1, 2, & 3
  146. $rsv1 = (bool) (ord($data[0]) & 1 << 6);
  147. $rsv2 = (bool) (ord($data[0]) & 1 << 5);
  148. $rsv3 = (bool) (ord($data[0]) & 1 << 4);
  149. // Parse opcode
  150. $opcode_int = ord($data[0]) & 15; // Bits 4-7
  151. $opcode_ints = array_flip(self::$opcodes);
  152. if (!array_key_exists($opcode_int, $opcode_ints)) {
  153. $warning = "Bad opcode in websocket frame: {$opcode_int}";
  154. $this->logger->warning($warning);
  155. throw new ConnectionException($warning, ConnectionException::BAD_OPCODE);
  156. }
  157. $opcode = $opcode_ints[$opcode_int];
  158. // Masking?
  159. $mask = (bool) (ord($data[1]) >> 7); // Bit 0 in byte 1
  160. $payload = '';
  161. // Payload length
  162. $payload_length = (int) ord($data[1]) & 127; // Bits 1-7 in byte 1
  163. if ($payload_length > 125) {
  164. if ($payload_length === 126) {
  165. $data = $this->read(2); // 126: Payload is a 16-bit unsigned int
  166. } else {
  167. $data = $this->read(8); // 127: Payload is a 64-bit unsigned int
  168. }
  169. $payload_length = bindec(self::sprintB($data));
  170. }
  171. // Get masking key.
  172. if ($mask) {
  173. $masking_key = $this->read(4);
  174. }
  175. // Get the actual payload, if any (might not be for e.g. close frames.
  176. if ($payload_length > 0) {
  177. $data = $this->read($payload_length);
  178. if ($mask) {
  179. // Unmask payload.
  180. for ($i = 0; $i < $payload_length; $i++) {
  181. $payload .= ($data[$i] ^ $masking_key[$i % 4]);
  182. }
  183. } else {
  184. $payload = $data;
  185. }
  186. }
  187. // if we received a ping, send a pong and wait for the next message
  188. if ($opcode === 'ping') {
  189. $this->logger->debug("Received 'ping', sending 'pong'.");
  190. $this->send($payload, 'pong', true);
  191. return [null, false];
  192. }
  193. // if we received a pong, wait for the next message
  194. if ($opcode === 'pong') {
  195. $this->logger->debug("Received 'pong'.");
  196. return [null, false];
  197. }
  198. // Record the opcode if we are not receiving a continutation fragment
  199. if ($opcode !== 'continuation') {
  200. $this->last_opcode = $opcode;
  201. }
  202. if ($opcode === 'close') {
  203. // Get the close status.
  204. if ($payload_length > 0) {
  205. $status_bin = $payload[0] . $payload[1];
  206. $status = bindec(sprintf("%08b%08b", ord($payload[0]), ord($payload[1])));
  207. $this->close_status = $status;
  208. }
  209. // Get additional close message-
  210. if ($payload_length >= 2) {
  211. $payload = substr($payload, 2);
  212. }
  213. $this->logger->debug("Received 'close', status: {$this->close_status}.");
  214. if ($this->is_closing) {
  215. $this->is_closing = false; // A close response, all done.
  216. } else {
  217. $this->send($status_bin . 'Close acknowledged: ' . $status, 'close', true); // Respond.
  218. }
  219. // Close the socket.
  220. fclose($this->socket);
  221. // Closing should not return message.
  222. return [null, true];
  223. }
  224. return [$payload, $final];
  225. }
  226. /**
  227. * Tell the socket to close.
  228. *
  229. * @param integer $status http://tools.ietf.org/html/rfc6455#section-7.4
  230. * @param string $message A closing message, max 125 bytes.
  231. */
  232. public function close($status = 1000, $message = 'ttfn')
  233. {
  234. if (!$this->isConnected()) {
  235. return null;
  236. }
  237. $status_binstr = sprintf('%016b', $status);
  238. $status_str = '';
  239. foreach (str_split($status_binstr, 8) as $binstr) {
  240. $status_str .= chr(bindec($binstr));
  241. }
  242. $this->send($status_str . $message, 'close', true);
  243. $this->logger->debug("Closing with status: {$status_str}.");
  244. $this->is_closing = true;
  245. $this->receive(); // Receiving a close frame will close the socket now.
  246. }
  247. protected function write($data)
  248. {
  249. $length = strlen($data);
  250. $written = fwrite($this->socket, $data);
  251. if ($written === false) {
  252. fclose($this->socket);
  253. $this->throwException("Failed to write {$length} bytes.");
  254. }
  255. if ($written < strlen($data)) {
  256. fclose($this->socket);
  257. $this->throwException("Could only write {$written} out of {$length} bytes.");
  258. }
  259. $this->logger->debug("Wrote {$written} of {$length} bytes.");
  260. }
  261. protected function read($length)
  262. {
  263. $data = '';
  264. while (strlen($data) < $length) {
  265. $buffer = fread($this->socket, $length - strlen($data));
  266. if ($buffer === false) {
  267. $read = strlen($data);
  268. $this->throwException("Broken frame, read {$read} of stated {$length} bytes.");
  269. }
  270. if ($buffer === '') {
  271. $this->throwException("Empty read; connection dead?");
  272. }
  273. $data .= $buffer;
  274. }
  275. return $data;
  276. }
  277. protected function throwException($message, $code = 0)
  278. {
  279. $meta = stream_get_meta_data($this->socket);
  280. $json_meta = json_encode($meta);
  281. if (!empty($meta['timed_out'])) {
  282. $code = ConnectionException::TIMED_OUT;
  283. $this->logger->warning("{$message}", (array)$meta);
  284. throw new TimeoutException("{$message} Stream state: {$json_meta}", $code);
  285. }
  286. if (!empty($meta['eof'])) {
  287. $code = ConnectionException::EOF;
  288. }
  289. $this->logger->error("{$message}", (array)$meta);
  290. throw new ConnectionException("{$message} Stream state: {$json_meta}", $code);
  291. }
  292. /**
  293. * Helper to convert a binary to a string of '0' and '1'.
  294. */
  295. protected static function sprintB($string)
  296. {
  297. $return = '';
  298. for ($i = 0; $i < strlen($string); $i++) {
  299. $return .= sprintf("%08b", ord($string[$i]));
  300. }
  301. return $return;
  302. }
  303. }