DecodingEventStreamIterator.php 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. <?php
  2. namespace Aws\Api\Parser;
  3. use \Iterator;
  4. use Aws\Api\DateTimeResult;
  5. use GuzzleHttp\Psr7;
  6. use Psr\Http\Message\StreamInterface;
  7. use Aws\Api\Parser\Exception\ParserException;
  8. /**
  9. * @internal Implements a decoder for a binary encoded event stream that will
  10. * decode, validate, and provide individual events from the stream.
  11. */
  12. class DecodingEventStreamIterator implements Iterator
  13. {
  14. const HEADERS = 'headers';
  15. const PAYLOAD = 'payload';
  16. const LENGTH_TOTAL = 'total_length';
  17. const LENGTH_HEADERS = 'headers_length';
  18. const CRC_PRELUDE = 'prelude_crc';
  19. const BYTES_PRELUDE = 12;
  20. const BYTES_TRAILING = 4;
  21. private static $preludeFormat = [
  22. self::LENGTH_TOTAL => 'decodeUint32',
  23. self::LENGTH_HEADERS => 'decodeUint32',
  24. self::CRC_PRELUDE => 'decodeUint32',
  25. ];
  26. private static $lengthFormatMap = [
  27. 1 => 'decodeUint8',
  28. 2 => 'decodeUint16',
  29. 4 => 'decodeUint32',
  30. 8 => 'decodeUint64',
  31. ];
  32. private static $headerTypeMap = [
  33. 0 => 'decodeBooleanTrue',
  34. 1 => 'decodeBooleanFalse',
  35. 2 => 'decodeInt8',
  36. 3 => 'decodeInt16',
  37. 4 => 'decodeInt32',
  38. 5 => 'decodeInt64',
  39. 6 => 'decodeBytes',
  40. 7 => 'decodeString',
  41. 8 => 'decodeTimestamp',
  42. 9 => 'decodeUuid',
  43. ];
  44. /** @var StreamInterface Stream of eventstream shape to parse. */
  45. private $stream;
  46. /** @var array Currently parsed event. */
  47. private $currentEvent;
  48. /** @var int Current in-order event key. */
  49. private $key;
  50. /** @var resource|\HashContext CRC32 hash context for event validation */
  51. private $hashContext;
  52. /** @var int $currentPosition */
  53. private $currentPosition;
  54. /**
  55. * DecodingEventStreamIterator constructor.
  56. *
  57. * @param StreamInterface $stream
  58. */
  59. public function __construct(StreamInterface $stream)
  60. {
  61. $this->stream = $stream;
  62. $this->rewind();
  63. }
  64. private function parseHeaders($headerBytes)
  65. {
  66. $headers = [];
  67. $bytesRead = 0;
  68. while ($bytesRead < $headerBytes) {
  69. list($key, $numBytes) = $this->decodeString(1);
  70. $bytesRead += $numBytes;
  71. list($type, $numBytes) = $this->decodeUint8();
  72. $bytesRead += $numBytes;
  73. $f = self::$headerTypeMap[$type];
  74. list($value, $numBytes) = $this->{$f}();
  75. $bytesRead += $numBytes;
  76. if (isset($headers[$key])) {
  77. throw new ParserException('Duplicate key in event headers.');
  78. }
  79. $headers[$key] = $value;
  80. }
  81. return [$headers, $bytesRead];
  82. }
  83. private function parsePrelude()
  84. {
  85. $prelude = [];
  86. $bytesRead = 0;
  87. $calculatedCrc = null;
  88. foreach (self::$preludeFormat as $key => $decodeFunction) {
  89. if ($key === self::CRC_PRELUDE) {
  90. $hashCopy = hash_copy($this->hashContext);
  91. $calculatedCrc = hash_final($this->hashContext, true);
  92. $this->hashContext = $hashCopy;
  93. }
  94. list($value, $numBytes) = $this->{$decodeFunction}();
  95. $bytesRead += $numBytes;
  96. $prelude[$key] = $value;
  97. }
  98. if (unpack('N', $calculatedCrc)[1] !== $prelude[self::CRC_PRELUDE]) {
  99. throw new ParserException('Prelude checksum mismatch.');
  100. }
  101. return [$prelude, $bytesRead];
  102. }
  103. private function parseEvent()
  104. {
  105. $event = [];
  106. if ($this->stream->tell() < $this->stream->getSize()) {
  107. $this->hashContext = hash_init('crc32b');
  108. $bytesLeft = $this->stream->getSize() - $this->stream->tell();
  109. list($prelude, $numBytes) = $this->parsePrelude();
  110. if ($prelude[self::LENGTH_TOTAL] > $bytesLeft) {
  111. throw new ParserException('Message length too long.');
  112. }
  113. $bytesLeft -= $numBytes;
  114. if ($prelude[self::LENGTH_HEADERS] > $bytesLeft) {
  115. throw new ParserException('Headers length too long.');
  116. }
  117. list(
  118. $event[self::HEADERS],
  119. $numBytes
  120. ) = $this->parseHeaders($prelude[self::LENGTH_HEADERS]);
  121. $event[self::PAYLOAD] = Psr7\Utils::streamFor(
  122. $this->readAndHashBytes(
  123. $prelude[self::LENGTH_TOTAL] - self::BYTES_PRELUDE
  124. - $numBytes - self::BYTES_TRAILING
  125. )
  126. );
  127. $calculatedCrc = hash_final($this->hashContext, true);
  128. $messageCrc = $this->stream->read(4);
  129. if ($calculatedCrc !== $messageCrc) {
  130. throw new ParserException('Message checksum mismatch.');
  131. }
  132. }
  133. return $event;
  134. }
  135. // Iterator Functionality
  136. /**
  137. * @return array
  138. */
  139. #[\ReturnTypeWillChange]
  140. public function current()
  141. {
  142. return $this->currentEvent;
  143. }
  144. /**
  145. * @return int
  146. */
  147. #[\ReturnTypeWillChange]
  148. public function key()
  149. {
  150. return $this->key;
  151. }
  152. #[\ReturnTypeWillChange]
  153. public function next()
  154. {
  155. $this->currentPosition = $this->stream->tell();
  156. if ($this->valid()) {
  157. $this->key++;
  158. $this->currentEvent = $this->parseEvent();
  159. }
  160. }
  161. #[\ReturnTypeWillChange]
  162. public function rewind()
  163. {
  164. $this->stream->rewind();
  165. $this->key = 0;
  166. $this->currentPosition = 0;
  167. $this->currentEvent = $this->parseEvent();
  168. }
  169. /**
  170. * @return bool
  171. */
  172. #[\ReturnTypeWillChange]
  173. public function valid()
  174. {
  175. return $this->currentPosition < $this->stream->getSize();
  176. }
  177. // Decoding Utilities
  178. private function readAndHashBytes($num)
  179. {
  180. $bytes = $this->stream->read($num);
  181. hash_update($this->hashContext, $bytes);
  182. return $bytes;
  183. }
  184. private function decodeBooleanTrue()
  185. {
  186. return [true, 0];
  187. }
  188. private function decodeBooleanFalse()
  189. {
  190. return [false, 0];
  191. }
  192. private function uintToInt($val, $size)
  193. {
  194. $signedCap = pow(2, $size - 1);
  195. if ($val > $signedCap) {
  196. $val -= (2 * $signedCap);
  197. }
  198. return $val;
  199. }
  200. private function decodeInt8()
  201. {
  202. $val = (int)unpack('C', $this->readAndHashBytes(1))[1];
  203. return [$this->uintToInt($val, 8), 1];
  204. }
  205. private function decodeUint8()
  206. {
  207. return [unpack('C', $this->readAndHashBytes(1))[1], 1];
  208. }
  209. private function decodeInt16()
  210. {
  211. $val = (int)unpack('n', $this->readAndHashBytes(2))[1];
  212. return [$this->uintToInt($val, 16), 2];
  213. }
  214. private function decodeUint16()
  215. {
  216. return [unpack('n', $this->readAndHashBytes(2))[1], 2];
  217. }
  218. private function decodeInt32()
  219. {
  220. $val = (int)unpack('N', $this->readAndHashBytes(4))[1];
  221. return [$this->uintToInt($val, 32), 4];
  222. }
  223. private function decodeUint32()
  224. {
  225. return [unpack('N', $this->readAndHashBytes(4))[1], 4];
  226. }
  227. private function decodeInt64()
  228. {
  229. $val = $this->unpackInt64($this->readAndHashBytes(8))[1];
  230. return [$this->uintToInt($val, 64), 8];
  231. }
  232. private function decodeUint64()
  233. {
  234. return [$this->unpackInt64($this->readAndHashBytes(8))[1], 8];
  235. }
  236. private function unpackInt64($bytes)
  237. {
  238. if (version_compare(PHP_VERSION, '5.6.3', '<')) {
  239. $d = unpack('N2', $bytes);
  240. return [1 => $d[1] << 32 | $d[2]];
  241. }
  242. return unpack('J', $bytes);
  243. }
  244. private function decodeBytes($lengthBytes=2)
  245. {
  246. if (!isset(self::$lengthFormatMap[$lengthBytes])) {
  247. throw new ParserException('Undefined variable length format.');
  248. }
  249. $f = self::$lengthFormatMap[$lengthBytes];
  250. list($len, $bytes) = $this->{$f}();
  251. return [$this->readAndHashBytes($len), $len + $bytes];
  252. }
  253. private function decodeString($lengthBytes=2)
  254. {
  255. if (!isset(self::$lengthFormatMap[$lengthBytes])) {
  256. throw new ParserException('Undefined variable length format.');
  257. }
  258. $f = self::$lengthFormatMap[$lengthBytes];
  259. list($len, $bytes) = $this->{$f}();
  260. return [$this->readAndHashBytes($len), $len + $bytes];
  261. }
  262. private function decodeTimestamp()
  263. {
  264. list($val, $bytes) = $this->decodeInt64();
  265. return [
  266. DateTimeResult::createFromFormat('U.u', $val / 1000),
  267. $bytes
  268. ];
  269. }
  270. private function decodeUuid()
  271. {
  272. $val = unpack('H32', $this->readAndHashBytes(16))[1];
  273. return [
  274. substr($val, 0, 8) . '-'
  275. . substr($val, 8, 4) . '-'
  276. . substr($val, 12, 4) . '-'
  277. . substr($val, 16, 4) . '-'
  278. . substr($val, 20, 12),
  279. 16
  280. ];
  281. }
  282. }