Copy.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. <?php
  2. namespace Qcloud\Cos;
  3. use GuzzleHttp\Psr7\Request;
  4. use GuzzleHttp\Pool;
  5. class Copy {
  6. const MIN_PART_SIZE = 1048576;
  7. const MAX_PART_SIZE = 5368709120;
  8. const DEFAULT_PART_SIZE = 52428800;
  9. const MAX_PARTS = 10000;
  10. private $client;
  11. private $copySource;
  12. private $options;
  13. private $partSize;
  14. private $parts;
  15. private $size;
  16. private $commandList = [];
  17. private $requestList = [];
  18. public function __construct($client, $source, $options = array()) {
  19. $minPartSize = $options['PartSize'];
  20. unset($options['PartSize']);
  21. $this->client = $client;
  22. $this->copySource = $source;
  23. $this->options = $options;
  24. $this->size = $source['ContentLength'];
  25. unset($source['ContentLength']);
  26. $this->partSize = $this->calculatePartSize($minPartSize);
  27. $this->concurrency = isset($options['Concurrency']) ? $options['Concurrency'] : 10;
  28. $this->retry = isset($options['Retry']) ? $options['Retry'] : 5;
  29. }
  30. public function copy() {
  31. $uploadId= $this->initiateMultipartUpload();
  32. for ($i = 0; $i < $this->retry; $i += 1) {
  33. $rt = $this->uploadParts($uploadId);
  34. if ($rt == 0) {
  35. break;
  36. }
  37. sleep(1 << $i);
  38. }
  39. foreach ( $this->parts as $key => $row ){
  40. $num1[$key] = $row ['PartNumber'];
  41. $num2[$key] = $row ['ETag'];
  42. }
  43. array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
  44. return $this->client->completeMultipartUpload(array(
  45. 'Bucket' => $this->options['Bucket'],
  46. 'Key' => $this->options['Key'],
  47. 'UploadId' => $uploadId,
  48. 'Parts' => $this->parts)
  49. );
  50. }
  51. public function uploadParts($uploadId) {
  52. $copyRequests = function ($uploadId) {
  53. $offset = 0;
  54. $partNumber = 1;
  55. $partSize = $this->partSize;
  56. $finishedNum = 0;
  57. $this->parts = array();
  58. for ($index = 1; ; $index ++) {
  59. if ($offset + $partSize >= $this->size)
  60. {
  61. $partSize = $this->size - $offset;
  62. }
  63. $copySourcePath = $this->copySource['Bucket']. '.cos.'. $this->copySource['Region'].
  64. ".myqcloud.com/". $this->copySource['Key']. "?versionId=". $this->copySource['VersionId'];
  65. $params = array(
  66. 'Bucket' => $this->options['Bucket'],
  67. 'Key' => $this->options['Key'],
  68. 'UploadId' => $uploadId,
  69. 'PartNumber' => $partNumber,
  70. 'CopySource'=> $copySourcePath,
  71. 'CopySourceRange' => 'bytes='.((string)$offset).'-'.(string)($offset+$partSize - 1),
  72. );
  73. if(!isset($this->parts[$partNumber])) {
  74. $command = $this->client->getCommand('uploadPartCopy', $params);
  75. $request = $this->client->commandToRequestTransformer($command);
  76. $this->commandList[$index] = $command;
  77. $this->requestList[$index] = $request;
  78. yield $request;
  79. }
  80. ++$partNumber;
  81. $offset += $partSize;
  82. if ($this->size == $offset) {
  83. break;
  84. }
  85. }
  86. };
  87. $pool = new Pool($this->client->httpClient, $copyRequests($uploadId), [
  88. 'concurrency' => $this->concurrency,
  89. 'fulfilled' => function ($response, $index) {
  90. $index = $index + 1;
  91. $response = $this->client->responseToResultTransformer($response, $this->requestList[$index], $this->commandList[$index]);
  92. $part = array('PartNumber' => $index, 'ETag' => $response['ETag']);
  93. $this->parts[$index] = $part;
  94. },
  95. 'rejected' => function ($reason, $index) {
  96. $index = $index += 1;
  97. $retry = 2;
  98. for ($i = 1; $i <= $retry; $i++) {
  99. try {
  100. $rt =$this->client->execute($this->commandList[$index]);
  101. $part = array('PartNumber' => $index, 'ETag' => $rt['ETag']);
  102. $this->parts[$index] = $part;
  103. } catch(\Exception $e) {
  104. if ($i == $retry) {
  105. throw($e);
  106. }
  107. }
  108. }
  109. },
  110. ]);
  111. // Initiate the transfers and create a promise
  112. $promise = $pool->promise();
  113. // Force the pool of requests to complete.
  114. $promise->wait();
  115. }
  116. private function calculatePartSize($minPartSize)
  117. {
  118. $partSize = intval(ceil(($this->size / self::MAX_PARTS)));
  119. $partSize = max($minPartSize, $partSize);
  120. $partSize = min($partSize, self::MAX_PART_SIZE);
  121. $partSize = max($partSize, self::MIN_PART_SIZE);
  122. return $partSize;
  123. }
  124. private function initiateMultipartUpload() {
  125. $result = $this->client->createMultipartUpload($this->options);
  126. return $result['UploadId'];
  127. }
  128. }