Skip to content

Commit a0ae428

Browse files
authored
Merge pull request #391 from qiniu/features/resumable-upload-v2
新版分片上传
2 parents c4fb290 + 5013303 commit a0ae428

File tree

5 files changed

+212
-40
lines changed

5 files changed

+212
-40
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,5 @@ nosetests.xml
4444
.mr.developer.cfg
4545
.project
4646
.pydevproject
47+
/.idea
48+
/.venv

qiniu/config.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
API_HOST = 'http://api.qiniu.com' # 数据处理操作Host
88
UC_HOST = 'https://uc.qbox.me' # 获取空间信息Host
99

10-
_BLOCK_SIZE = 1024 * 1024 * 4 # 断点续上传分块大小,该参数为接口规格,暂不支持修改
10+
_BLOCK_SIZE = 1024 * 1024 * 4 # 断点续传分块大小,该参数为接口规格,暂不支持修改
1111

1212
_config = {
1313
'default_zone': zone.Zone(),
@@ -18,6 +18,7 @@
1818
'connection_timeout': 30, # 链接超时为时间为30s
1919
'connection_retries': 3, # 链接重试次数为3次
2020
'connection_pool': 10, # 链接池个数为10
21+
'default_upload_threshold': 2 * _BLOCK_SIZE # put_file上传方式的临界默认值
2122
}
2223

2324

@@ -28,7 +29,7 @@ def get_default(key):
2829
def set_default(
2930
default_zone=None, connection_retries=None, connection_pool=None,
3031
connection_timeout=None, default_rs_host=None, default_uc_host=None,
31-
default_rsf_host=None, default_api_host=None):
32+
default_rsf_host=None, default_api_host=None, default_upload_threshold=None):
3233
if default_zone:
3334
_config['default_zone'] = default_zone
3435
if default_rs_host:
@@ -45,3 +46,5 @@ def set_default(
4546
_config['connection_pool'] = connection_pool
4647
if connection_timeout:
4748
_config['connection_timeout'] = connection_timeout
49+
if default_upload_threshold:
50+
_config['default_upload_threshold'] = default_upload_threshold

qiniu/http.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ def _post_with_token(url, data, token):
104104
return _post(url, data, None, _TokenAuth(token))
105105

106106

107+
def _post_with_token_and_headers(url, data, token, headers):
108+
return _post(url, data, None, _TokenAuth(token), headers)
109+
110+
107111
def _post_file(url, data, files):
108112
return _post(url, data, files, None)
109113

@@ -132,6 +136,10 @@ def _put_with_auth(url, data, auth):
132136
return _put(url, data, None, qiniu.auth.RequestsAuth(auth))
133137

134138

139+
def _put_with_token_and_headers(url, data, auth, headers):
140+
return _put(url, data, None, _TokenAuth(auth), headers)
141+
142+
135143
def _put_with_auth_and_headers(url, data, auth, headers):
136144
return _put(url, data, None, qiniu.auth.RequestsAuth(auth), headers)
137145

qiniu/services/storage/uploader.py

Lines changed: 150 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# -*- coding: utf-8 -*-
22

3+
import hashlib
4+
import json
35
import os
46
import time
57

@@ -45,7 +47,8 @@ def put_data(
4547

4648
def put_file(up_token, key, file_path, params=None,
4749
mime_type='application/octet-stream', check_crc=False,
48-
progress_handler=None, upload_progress_recorder=None, keep_last_modified=False, hostscache_dir=None):
50+
progress_handler=None, upload_progress_recorder=None, keep_last_modified=False, hostscache_dir=None,
51+
part_size=None, version=None, bucket_name=None):
4952
"""上传文件到七牛
5053
5154
Args:
@@ -58,22 +61,25 @@ def put_file(up_token, key, file_path, params=None,
5861
progress_handler: 上传进度
5962
upload_progress_recorder: 记录上传进度,用于断点续传
6063
hostscache_dir: host请求 缓存文件保存位置
64+
version 分片上传版本 目前支持v1/v2版本 默认v1
65+
part_size 分片上传v2必传字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
66+
bucket_name 分片上传v2字段必传字段 空间名称
6167
6268
Returns:
6369
一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}
6470
一个ResponseInfo对象
6571
"""
6672
ret = {}
6773
size = os.stat(file_path).st_size
68-
# fname = os.path.basename(file_path)
6974
with open(file_path, 'rb') as input_stream:
7075
file_name = os.path.basename(file_path)
7176
modify_time = int(os.path.getmtime(file_path))
72-
if size > config._BLOCK_SIZE * 2:
77+
if size > config.get_default('default_upload_threshold'):
7378
ret, info = put_stream(up_token, key, input_stream, file_name, size, hostscache_dir, params,
7479
mime_type, progress_handler,
7580
upload_progress_recorder=upload_progress_recorder,
76-
modify_time=modify_time, keep_last_modified=keep_last_modified)
81+
modify_time=modify_time, keep_last_modified=keep_last_modified,
82+
part_size=part_size, version=version, bucket_name=bucket_name)
7783
else:
7884
crc = file_crc32(file_path)
7985
ret, info = _form_put(up_token, key, input_stream, params, mime_type,
@@ -129,9 +135,11 @@ def _form_put(up_token, key, data, params, mime_type, crc, hostscache_dir=None,
129135

130136
def put_stream(up_token, key, input_stream, file_name, data_size, hostscache_dir=None, params=None,
131137
mime_type=None, progress_handler=None,
132-
upload_progress_recorder=None, modify_time=None, keep_last_modified=False):
138+
upload_progress_recorder=None, modify_time=None, keep_last_modified=False,
139+
part_size=None, version=None, bucket_name=None):
133140
task = _Resume(up_token, key, input_stream, file_name, data_size, hostscache_dir, params, mime_type,
134-
progress_handler, upload_progress_recorder, modify_time, keep_last_modified)
141+
progress_handler, upload_progress_recorder, modify_time, keep_last_modified,
142+
part_size, version, bucket_name)
135143
return task.upload()
136144

137145

@@ -153,10 +161,14 @@ class _Resume(object):
153161
upload_progress_recorder: 记录上传进度,用于断点续传
154162
modify_time: 上传文件修改日期
155163
hostscache_dir: host请求 缓存文件保存位置
164+
version 分片上传版本 目前支持v1/v2版本 默认v1
165+
part_size 分片上传v2必传字段 分片大小范围为1 MB - 1 GB
166+
bucket_name 分片上传v2字段必传字段 空间名称
156167
"""
157168

158169
def __init__(self, up_token, key, input_stream, file_name, data_size, hostscache_dir, params, mime_type,
159-
progress_handler, upload_progress_recorder, modify_time, keep_last_modified):
170+
progress_handler, upload_progress_recorder, modify_time, keep_last_modified, part_size=None,
171+
version=None, bucket_name=None):
160172
"""初始化断点续上传"""
161173
self.up_token = up_token
162174
self.key = key
@@ -170,46 +182,87 @@ def __init__(self, up_token, key, input_stream, file_name, data_size, hostscache
170182
self.upload_progress_recorder = upload_progress_recorder or UploadProgressRecorder()
171183
self.modify_time = modify_time or time.time()
172184
self.keep_last_modified = keep_last_modified
173-
# print(self.modify_time)
174-
# print(modify_time)
185+
self.version = version or 'v1'
186+
self.part_size = part_size or config._BLOCK_SIZE
187+
self.bucket_name = bucket_name
175188

176189
def record_upload_progress(self, offset):
177190
record_data = {
178191
'size': self.size,
179192
'offset': offset,
180-
'contexts': [block['ctx'] for block in self.blockStatus]
181193
}
194+
if self.version == 'v1':
195+
record_data['contexts'] = [block['ctx'] for block in self.blockStatus]
196+
elif self.version == 'v2':
197+
record_data['etags'] = self.blockStatus
198+
record_data['expired_at'] = self.expiredAt
199+
record_data['upload_id'] = self.uploadId
182200
if self.modify_time:
183201
record_data['modify_time'] = self.modify_time
184-
# print(record_data)
185202
self.upload_progress_recorder.set_upload_record(self.file_name, self.key, record_data)
186203

187204
def recovery_from_record(self):
188205
record = self.upload_progress_recorder.get_upload_record(self.file_name, self.key)
189206
if not record:
190-
return 0
191-
207+
if self.version == 'v1':
208+
return 0
209+
elif self.version == 'v2':
210+
return 0, None, None
192211
try:
193212
if not record['modify_time'] or record['size'] != self.size or \
194-
record['modify_time'] != self.modify_time:
195-
return 0
213+
record['modify_time'] != self.modify_time:
214+
if self.version == 'v1':
215+
return 0
216+
elif self.version == 'v2':
217+
return 0, None, None
196218
except KeyError:
197-
return 0
198-
self.blockStatus = [{'ctx': ctx} for ctx in record['contexts']]
199-
return record['offset']
219+
if self.version == 'v1':
220+
return 0
221+
elif self.version == 'v2':
222+
return 0, None, None
223+
if self.version == 'v1':
224+
if not record.__contains__('contexts') or len(record['contexts']) == 0:
225+
return 0
226+
self.blockStatus = [{'ctx': ctx} for ctx in record['contexts']]
227+
return record['offset']
228+
elif self.version == 'v2':
229+
if not record.__contains__('etags') or len(record['etags']) == 0 or \
230+
not record.__contains__('expired_at') or float(record['expired_at']) < time.time() or \
231+
not record.__contains__('upload_id'):
232+
return 0, None, None
233+
self.blockStatus = record['etags']
234+
return record['offset'], record['upload_id'], record['expired_at']
200235

201236
def upload(self):
202237
"""上传操作"""
203238
self.blockStatus = []
204-
if config.get_default('default_zone').up_host:
205-
host = config.get_default('default_zone').up_host
239+
self.recovery_index = 1
240+
self.expiredAt = None
241+
self.uploadId = None
242+
host = self.get_up_host()
243+
if self.version == 'v1':
244+
offset = self.recovery_from_record()
245+
self.part_size = config._BLOCK_SIZE
246+
elif self.version == 'v2':
247+
offset, self.uploadId, self.expiredAt = self.recovery_from_record()
248+
if offset > 0 and self.blockStatus != [] and self.uploadId is not None \
249+
and self.expiredAt is not None:
250+
self.recovery_index = self.blockStatus[-1]['partNumber'] + 1
251+
else:
252+
self.recovery_index = 1
253+
init_url = self.block_url_v2(host, self.bucket_name)
254+
self.uploadId, self.expiredAt = self.init_upload_task(init_url)
206255
else:
207-
host = config.get_default('default_zone').get_up_host_by_token(self.up_token, self.hostscache_dir)
208-
offset = self.recovery_from_record()
209-
for block in _file_iter(self.input_stream, config._BLOCK_SIZE, offset):
256+
raise ValueError("version must choose v1 or v2 !")
257+
for index, block in enumerate(_file_iter(self.input_stream, self.part_size, offset)):
210258
length = len(block)
211-
crc = crc32(block)
212-
ret, info = self.make_block(block, length, host)
259+
if self.version == 'v1':
260+
crc = crc32(block)
261+
ret, info = self.make_block(block, length, host)
262+
elif self.version == 'v2':
263+
index_ = index + self.recovery_index
264+
url = self.block_url_v2(host, self.bucket_name) + '/%s/%d' % (self.uploadId, index_)
265+
ret, info = self.make_block_v2(block, url)
213266
if ret is None and not info.need_retry():
214267
return ret, info
215268
if info.connect_failed():
@@ -218,28 +271,77 @@ def upload(self):
218271
else:
219272
host = config.get_default('default_zone').get_up_host_backup_by_token(self.up_token,
220273
self.hostscache_dir)
221-
if info.need_retry() or crc != ret['crc32']:
222-
ret, info = self.make_block(block, length, host)
223-
if ret is None or crc != ret['crc32']:
224-
return ret, info
274+
if self.version == 'v1':
275+
if info.need_retry() or crc != ret['crc32']:
276+
ret, info = self.make_block(block, length, host)
277+
if ret is None or crc != ret['crc32']:
278+
return ret, info
279+
elif self.version == 'v2':
280+
if info.need_retry():
281+
url = self.block_url_v2(host, self.bucket_name) + '/%s/%d' % (self.uploadId, index + 1)
282+
ret, info = self.make_block_v2(block, url)
283+
if ret is None:
284+
return ret, info
285+
del ret['md5']
286+
ret['partNumber'] = index_
225287
self.blockStatus.append(ret)
226288
offset += length
227289
self.record_upload_progress(offset)
228290
if (callable(self.progress_handler)):
229-
self.progress_handler(((len(self.blockStatus) - 1) * config._BLOCK_SIZE) + length, self.size)
230-
return self.make_file(host)
291+
self.progress_handler(((len(self.blockStatus) - 1) * self.part_size) + len(block), self.size)
292+
if self.version == 'v1':
293+
return self.make_file(host)
294+
elif self.version == 'v2':
295+
make_file_url = self.block_url_v2(host, self.bucket_name) + '/%s' % self.uploadId
296+
return self.make_file_v2(self.blockStatus, make_file_url, self.file_name,
297+
self.mime_type, self.params)
298+
299+
def make_file_v2(self, block_status, url, file_name=None, mime_type=None, customVars=None):
300+
"""completeMultipartUpload"""
301+
parts = self.get_parts(block_status)
302+
headers = {
303+
'Content-Type': 'application/json',
304+
}
305+
data = {
306+
'parts': parts,
307+
'fname': file_name,
308+
'mimeType': mime_type,
309+
'customVars': customVars
310+
}
311+
ret, info = self.post_with_headers(url, json.dumps(data), headers=headers)
312+
if ret is not None and ret != {}:
313+
if ret['hash'] and ret['key']:
314+
self.upload_progress_recorder.delete_upload_record(self.file_name, self.key)
315+
return ret, info
316+
317+
def get_up_host(self):
318+
if config.get_default('default_zone').up_host:
319+
host = config.get_default('default_zone').up_host
320+
else:
321+
host = config.get_default('default_zone').get_up_host_by_token(self.up_token, self.hostscache_dir)
322+
return host
231323

232324
def make_block(self, block, block_size, host):
233325
"""创建块"""
234326
url = self.block_url(host, block_size)
235327
return self.post(url, block)
236328

329+
def make_block_v2(self, block, url):
330+
headers = {
331+
'Content-Type': 'application/octet-stream',
332+
'Content-MD5': hashlib.md5(block).hexdigest(),
333+
}
334+
return self.put(url, block, headers)
335+
237336
def block_url(self, host, size):
238337
return '{0}/mkblk/{1}'.format(host, size)
239338

339+
def block_url_v2(self, host, bucket_name):
340+
encoded_object_name = urlsafe_base64_encode(self.key) if self.key is not None else '~'
341+
return '{0}/buckets/{1}/objects/{2}/uploads'.format(host, bucket_name, encoded_object_name)
342+
240343
def file_url(self, host):
241344
url = ['{0}/mkfile/{1}'.format(host, self.size)]
242-
243345
if self.mime_type:
244346
url.append('mimeType/{0}'.format(urlsafe_base64_encode(self.mime_type)))
245347

@@ -259,7 +361,6 @@ def file_url(self, host):
259361
"x-qn-meta-!Last-Modified/{0}".format(urlsafe_base64_encode(rfc_from_timestamp(self.modify_time))))
260362

261363
url = '/'.join(url)
262-
# print url
263364
return url
264365

265366
def make_file(self, host):
@@ -269,5 +370,21 @@ def make_file(self, host):
269370
self.upload_progress_recorder.delete_upload_record(self.file_name, self.key)
270371
return self.post(url, body)
271372

373+
def init_upload_task(self, url):
374+
body, resp = self.post(url, '')
375+
if body is not None:
376+
return body['uploadId'], body['expireAt']
377+
else:
378+
return None, None
379+
272380
def post(self, url, data):
273381
return http._post_with_token(url, data, self.up_token)
382+
383+
def post_with_headers(self, url, data, headers):
384+
return http._post_with_token_and_headers(url, data, self.up_token, headers)
385+
386+
def put(self, url, data, headers):
387+
return http._put_with_token_and_headers(url, data, self.up_token, headers)
388+
389+
def get_parts(self, block_status):
390+
return sorted(block_status, key=lambda i: i['partNumber'])

0 commit comments

Comments
 (0)