diff --git a/lib/fog/aws/models/storage/file.rb b/lib/fog/aws/models/storage/file.rb index 9ce53a0c2..35b5a9224 100644 --- a/lib/fog/aws/models/storage/file.rb +++ b/lib/fog/aws/models/storage/file.rb @@ -328,18 +328,34 @@ def multipart_save(options) # Store ETags of upload parts part_tags = [] + # Calculate total size and ensure we don't exceed part limit + total_size = Fog::Storage.get_body_size(body) + parts_count = (total_size.to_f / multipart_chunk_size).ceil + + # AWS S3 has a hard limit of 10,000 parts, make sure we are below this limit for large objects + if parts_count > 10000 + self.multipart_chunk_size = (total_size.to_f / 10000).ceil + parts_count = 10000 + end + # Upload each part - # TODO: optionally upload chunks in parallel using threads - # (may cause network performance problems with many small chunks) - # TODO: Support large chunk sizes without reading the chunk into memory if body.respond_to?(:rewind) - body.rewind rescue nil - end - while (chunk = body.read(multipart_chunk_size)) do - part_upload = service.upload_part(directory.key, key, upload_id, part_tags.size + 1, chunk, part_headers(chunk)) - part_tags << part_upload.headers["ETag"] + body.rewind rescue nil end + pending = PartList.new( + (1..parts_count).map do |part_number| + UploadPartData.new(part_number, {}, nil) + end + ) + thread_count = self.concurrency + completed = PartList.new + errors = upload_parts_in_threads(directory.key, key, upload_id, pending, completed, thread_count) + + raise errors.first if errors.any? + + part_tags = completed.to_a.sort_by { |part| part.part_number }.map(&:etag) + if part_tags.empty? #it is an error to have a multipart upload with no parts part_upload = service.upload_part(directory.key, key, upload_id, 1, '', part_headers('')) part_tags << part_upload.headers["ETag"] @@ -460,6 +476,49 @@ def upload_in_threads(target_directory_key, target_file_key, upload_id, pending, threads.map(&:value).compact end + + def upload_parts_in_threads(directory_key, target_file_key, upload_id, pending, completed, thread_count) + mutex = Mutex.new + threads = [] + + thread_count.times do + thread = Thread.new do + begin + while part = pending.shift + # Determine byte range for this part + start_pos = (part.part_number - 1) * multipart_chunk_size + + # Safely read the chunk from body + chunk = nil + mutex.synchronize do + if body.respond_to?(:seek) + body.seek(start_pos) rescue nil + end + chunk = body.read(multipart_chunk_size) + end + + # Upload the chunk + if chunk + part_upload = service.upload_part(directory_key, target_file_key, upload_id, part.part_number, chunk, part_headers(chunk)) + part.etag = part_upload.headers["ETag"] + completed.push(part) + # Release memory + chunk = nil + end + end + nil + rescue => error + pending.clear! + error + end + end + + thread.abort_on_exception = true + threads << thread + end + + threads.map(&:value).compact + end end end end