MultipartUpload.php 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. <?php
  2. namespace Qcloud\Cos;
  3. use GuzzleHttp\Pool;
  4. class MultipartUpload {
  5. const MIN_PART_SIZE = 1048576;
  6. const MAX_PART_SIZE = 5368709120;
  7. const DEFAULT_PART_SIZE = 5242880;
  8. const MAX_PARTS = 10000;
  9. private $client;
  10. private $options;
  11. private $partSize;
  12. private $parts;
  13. private $body;
  14. private $progress;
  15. private $totalSize;
  16. private $uploadedSize;
  17. public function __construct($client, $body, $options = array()) {
  18. $minPartSize = $options['PartSize'];
  19. unset($options['PartSize']);
  20. $this->body = $body;
  21. $this->client = $client;
  22. $this->options = $options;
  23. $this->partSize = $this->calculatePartSize($minPartSize);
  24. $this->concurrency = isset($options['Concurrency']) ? $options['Concurrency'] : 10;
  25. $this->progress = isset($options['Progress']) ? $options['Progress'] : function($totalSize, $uploadedSize) {};
  26. $this->parts = [];
  27. $this->partNumberList = [];
  28. $this->uploadedSize = 0;
  29. $this->totalSize = $this->body->getSize();
  30. $this->needMd5 = isset($options['ContentMD5']) ? $options['ContentMD5'] : true;
  31. $this->retry = isset($options['Retry']) ? $options['Retry'] : 3;
  32. }
  33. public function performUploading() {
  34. $uploadId= $this->initiateMultipartUpload();
  35. $this->uploadParts($uploadId);
  36. foreach ( $this->parts as $key => $row ){
  37. $num1[$key] = $row ['PartNumber'];
  38. $num2[$key] = $row ['ETag'];
  39. }
  40. array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
  41. return $this->client->completeMultipartUpload(array(
  42. 'Bucket' => $this->options['Bucket'],
  43. 'Key' => $this->options['Key'],
  44. 'UploadId' => $uploadId,
  45. 'Parts' => $this->parts)
  46. );
  47. }
  48. public function uploadParts($uploadId) {
  49. $uploadRequests = function ($uploadId) {
  50. $partNumber = 1;
  51. $index = 1;
  52. $offset = 0;
  53. $partSize = 0;
  54. for ( ; ; $partNumber ++) {
  55. if ($this->body->eof()) {
  56. break;
  57. }
  58. $body = $this->body->read($this->partSize);
  59. $partSize = $this->partSize;
  60. if ($offset + $this->partSize >= $this->totalSize) {
  61. $partSize = $this->totalSize - $offset;
  62. }
  63. $offset += $partSize;
  64. if (empty($body)) {
  65. break;
  66. }
  67. if (isset($this->parts[$partNumber])) {
  68. continue;
  69. }
  70. $this->partNumberList[$index]['PartNumber'] = $partNumber;
  71. $this->partNumberList[$index]['PartSize'] = $partSize;
  72. $params = array(
  73. 'Bucket' => $this->options['Bucket'],
  74. 'Key' => $this->options['Key'],
  75. 'UploadId' => $uploadId,
  76. 'PartNumber' => $partNumber,
  77. 'Body' => $body,
  78. 'ContentMD5' => $this->needMd5
  79. );
  80. if ($this->needMd5 == false) {
  81. unset($params["ContentMD5"]);
  82. }
  83. if (!isset($this->parts[$partNumber])) {
  84. $command = $this->client->getCommand('uploadPart', $params);
  85. $request = $this->client->commandToRequestTransformer($command);
  86. $index ++;
  87. yield $request;
  88. }
  89. }
  90. };
  91. $pool = new Pool($this->client->httpClient, $uploadRequests($uploadId), [
  92. 'concurrency' => $this->concurrency,
  93. 'fulfilled' => function ($response, $index) {
  94. $index = $index + 1;
  95. $partNumber = $this->partNumberList[$index]['PartNumber'];
  96. $partSize = $this->partNumberList[$index]['PartSize'];
  97. //兼容两种写法,防止index为undefined
  98. if (array_key_exists('etag', $response->getHeaders())) {
  99. $etag = $response->getHeaders()["etag"][0];
  100. }
  101. if (array_key_exists('ETag', $response->getHeaders())) {
  102. $etag = $response->getHeaders()["ETag"][0];
  103. }
  104. $part = array('PartNumber' => $partNumber, 'ETag' => $etag);
  105. $this->parts[$partNumber] = $part;
  106. $this->uploadedSize += $partSize;
  107. call_user_func_array($this->progress, [$this->totalSize, $this->uploadedSize]);
  108. },
  109. 'rejected' => function ($reason, $index) {
  110. printf("part [%d] upload failed, reason: %s\n", $index, $reason);
  111. throw($reason);
  112. }
  113. ]);
  114. $promise = $pool->promise();
  115. $promise->wait();
  116. }
  117. public function resumeUploading() {
  118. $uploadId = $this->options['UploadId'];
  119. $rt = $this->client->ListParts(
  120. array('UploadId' => $uploadId,
  121. 'Bucket'=>$this->options['Bucket'],
  122. 'Key'=>$this->options['Key']));
  123. $parts = array();
  124. if (count($rt['Parts']) > 0) {
  125. foreach ($rt['Parts'] as $part) {
  126. $this->parts[$part['PartNumber']] = array('PartNumber' => $part['PartNumber'], 'ETag' => $part['ETag']);
  127. }
  128. }
  129. $this->uploadParts($uploadId);
  130. foreach ( $this->parts as $key => $row ){
  131. $num1[$key] = $row ['PartNumber'];
  132. $num2[$key] = $row ['ETag'];
  133. }
  134. array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
  135. return $this->client->completeMultipartUpload(array(
  136. 'Bucket' => $this->options['Bucket'],
  137. 'Key' => $this->options['Key'],
  138. 'UploadId' => $uploadId,
  139. 'Parts' => $this->parts)
  140. );
  141. }
  142. private function calculatePartSize($minPartSize)
  143. {
  144. $partSize = intval(ceil(($this->body->getSize() / self::MAX_PARTS)));
  145. $partSize = max($minPartSize, $partSize);
  146. $partSize = min($partSize, self::MAX_PART_SIZE);
  147. $partSize = max($partSize, self::MIN_PART_SIZE);
  148. return $partSize;
  149. }
  150. private function initiateMultipartUpload() {
  151. $result = $this->client->createMultipartUpload($this->options);
  152. return $result['UploadId'];
  153. }
  154. }