-
-
Notifications
You must be signed in to change notification settings - Fork 353
add concurrency to multipart_save method #739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -328,18 +328,34 @@ def multipart_save(options) | |
# Store ETags of upload parts | ||
part_tags = [] | ||
|
||
# Calculate total size and ensure we don't exceed part limit | ||
total_size = Fog::Storage.get_body_size(body) | ||
parts_count = (total_size.to_f / multipart_chunk_size).ceil | ||
|
||
# AWS S3 has a hard limit of 10,000 parts, make sure we are below this limit for large objects | ||
if parts_count > 10000 | ||
self.multipart_chunk_size = (total_size.to_f / 10000).ceil | ||
parts_count = 10000 | ||
end | ||
|
||
# Upload each part | ||
# TODO: optionally upload chunks in parallel using threads | ||
# (may cause network performance problems with many small chunks) | ||
# TODO: Support large chunk sizes without reading the chunk into memory | ||
if body.respond_to?(:rewind) | ||
body.rewind rescue nil | ||
end | ||
while (chunk = body.read(multipart_chunk_size)) do | ||
part_upload = service.upload_part(directory.key, key, upload_id, part_tags.size + 1, chunk, part_headers(chunk)) | ||
part_tags << part_upload.headers["ETag"] | ||
body.rewind rescue nil | ||
end | ||
|
||
pending = PartList.new( | ||
(1..parts_count).map do |part_number| | ||
UploadPartData.new(part_number, {}, nil) | ||
end | ||
) | ||
thread_count = self.concurrency | ||
completed = PartList.new | ||
errors = upload_parts_in_threads(directory.key, key, upload_id, pending, completed, thread_count) | ||
|
||
raise errors.first if errors.any? | ||
|
||
part_tags = completed.to_a.sort_by { |part| part.part_number }.map(&:etag) | ||
|
||
if part_tags.empty? #it is an error to have a multipart upload with no parts | ||
part_upload = service.upload_part(directory.key, key, upload_id, 1, '', part_headers('')) | ||
part_tags << part_upload.headers["ETag"] | ||
|
@@ -460,6 +476,49 @@ def upload_in_threads(target_directory_key, target_file_key, upload_id, pending, | |
|
||
threads.map(&:value).compact | ||
end | ||
|
||
def upload_parts_in_threads(directory_key, target_file_key, upload_id, pending, completed, thread_count) | ||
mutex = Mutex.new | ||
threads = [] | ||
|
||
thread_count.times do | ||
thread = Thread.new do | ||
begin | ||
while part = pending.shift | ||
# Determine byte range for this part | ||
start_pos = (part.part_number - 1) * multipart_chunk_size | ||
|
||
# Safely read the chunk from body | ||
chunk = nil | ||
mutex.synchronize do | ||
if body.respond_to?(:seek) | ||
body.seek(start_pos) rescue nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hello, it is me, a little bottleneck right here! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah. This should prevent multiple threads from trying to read the file at the same time I guess. They would take turns leading it into memory, but then actually do the upload in parallel (which is the slower part). I think mutexing the file read is probably necessary if I recall properly. Is that what you were thinking? Happy to discuss further. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this will definitely be a bottleneck on a block storage side. Multiple threads trying to seek through blocks and run IO operations from one file, that is a reason why mutex is there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any ideas for alternatives or workarounds? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this implementation it's implied that we could fine-tune this bottleneck using two existing properties, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't try this without the mutex. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using some sort of buffering in addition to threads wouldn't make it any better. |
||
end | ||
chunk = body.read(multipart_chunk_size) | ||
end | ||
|
||
# Upload the chunk | ||
if chunk | ||
part_upload = service.upload_part(directory_key, target_file_key, upload_id, part.part_number, chunk, part_headers(chunk)) | ||
part.etag = part_upload.headers["ETag"] | ||
completed.push(part) | ||
# Release memory | ||
chunk = nil | ||
end | ||
end | ||
nil | ||
rescue => error | ||
pending.clear! | ||
error | ||
end | ||
end | ||
|
||
thread.abort_on_exception = true | ||
threads << thread | ||
end | ||
|
||
threads.map(&:value).compact | ||
end | ||
end | ||
end | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no similar logic in
multipart_copy()
method 🤔There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably an oversight I guess?