MultipartUpload.php 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. <?php
  2. namespace Qcloud\Cos;
  3. use Qcloud\Cos\Exception\CosException;
  4. use GuzzleHttp\Pool;
  5. class MultipartUpload {
  6. const MIN_PART_SIZE = 1048576;
  7. const MAX_PART_SIZE = 5368709120;
  8. const DEFAULT_PART_SIZE = 52428800;
  9. const MAX_PARTS = 10000;
  10. private $client;
  11. private $options;
  12. private $partSize;
  13. private $parts;
  14. private $body;
  15. public function __construct($client, $body, $options = array()) {
  16. $minPartSize = $options['PartSize'];
  17. unset($options['PartSize']);
  18. $this->body = $body;
  19. $this->client = $client;
  20. $this->options = $options;
  21. $this->partSize = $this->calculatePartSize($minPartSize);
  22. $this->concurrency = isset($options['Concurrency']) ? $options['Concurrency'] : 10;
  23. $this->parts = [];
  24. $this->partNumberList = [];
  25. }
  26. public function performUploading() {
  27. $uploadId= $this->initiateMultipartUpload();
  28. $this->uploadParts($uploadId);
  29. foreach ( $this->parts as $key => $row ){
  30. $num1[$key] = $row ['PartNumber'];
  31. $num2[$key] = $row ['ETag'];
  32. }
  33. array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
  34. return $this->client->completeMultipartUpload(array(
  35. 'Bucket' => $this->options['Bucket'],
  36. 'Key' => $this->options['Key'],
  37. 'UploadId' => $uploadId,
  38. 'Parts' => $this->parts)
  39. );
  40. }
  41. public function uploadParts($uploadId) {
  42. $uploadRequests = function ($uploadId) {
  43. $partNumber = 1;
  44. $index = 1;
  45. for ( ; ; $partNumber ++) {
  46. if ($this->body->eof()) {
  47. break;
  48. }
  49. $body = $this->body->read($this->partSize);
  50. if (empty($body)) {
  51. break;
  52. }
  53. if (isset($this->parts[$partNumber])) {
  54. continue;
  55. }
  56. $this->partNumberList[$index] = $partNumber;
  57. $params = array(
  58. 'Bucket' => $this->options['Bucket'],
  59. 'Key' => $this->options['Key'],
  60. 'UploadId' => $uploadId,
  61. 'PartNumber' => $partNumber,
  62. 'Body' => $body
  63. );
  64. if(!isset($this->parts[$partNumber])) {
  65. $command = $this->client->getCommand('uploadPart', $params);
  66. $request = $this->client->commandToRequestTransformer($command);
  67. $index ++;
  68. yield $request;
  69. }
  70. }
  71. };
  72. $pool = new Pool($this->client->httpClient, $uploadRequests($uploadId), [
  73. 'concurrency' => $this->concurrency,
  74. 'fulfilled' => function ($response, $index) {
  75. $index = $index + 1;
  76. $partNumber = $this->partNumberList[$index];
  77. $etag = $response->getHeaders()["ETag"][0];
  78. $part = array('PartNumber' => $partNumber, 'ETag' => $etag);
  79. $this->parts[$partNumber] = $part;
  80. },
  81. 'rejected' => function ($reason, $index) {
  82. throw($reason);
  83. }
  84. ]);
  85. $promise = $pool->promise();
  86. $promise->wait();
  87. }
  88. public function resumeUploading() {
  89. $uploadId = $this->options['UploadId'];
  90. $rt = $this->client->ListParts(
  91. array('UploadId' => $uploadId,
  92. 'Bucket'=>$this->options['Bucket'],
  93. 'Key'=>$this->options['Key']));
  94. $parts = array();
  95. if (count($rt['Parts']) > 0) {
  96. foreach ($rt['Parts'] as $part) {
  97. $this->parts[$part['PartNumber']] = array('PartNumber' => $part['PartNumber'], 'ETag' => $part['ETag']);
  98. }
  99. }
  100. $this->uploadParts($uploadId);
  101. foreach ( $this->parts as $key => $row ){
  102. $num1[$key] = $row ['PartNumber'];
  103. $num2[$key] = $row ['ETag'];
  104. }
  105. array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
  106. return $this->client->completeMultipartUpload(array(
  107. 'Bucket' => $this->options['Bucket'],
  108. 'Key' => $this->options['Key'],
  109. 'UploadId' => $uploadId,
  110. 'Parts' => $this->parts)
  111. );
  112. }
  113. private function calculatePartSize($minPartSize)
  114. {
  115. $partSize = intval(ceil(($this->body->getSize() / self::MAX_PARTS)));
  116. $partSize = max($minPartSize, $partSize);
  117. $partSize = min($partSize, self::MAX_PART_SIZE);
  118. $partSize = max($partSize, self::MIN_PART_SIZE);
  119. return $partSize;
  120. }
  121. private function initiateMultipartUpload() {
  122. $result = $this->client->createMultipartUpload($this->options);
  123. return $result['UploadId'];
  124. }
  125. }