Skip to content

Commit 1d127d9

Browse files
committed
Don't use a thread pool in S3#concatenate
tusd doesn't use a thread pool, so it's obviously good enough. This simplifies the implementation.
1 parent b32ff7c commit 1d127d9

File tree

2 files changed

+24
-54
lines changed

2 files changed

+24
-54
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## HEAD
2+
3+
* Deprecate `:concurrency` option in S3 storage (@janko)
4+
5+
* Don't use a thread pool in `S3#concatenate` (@janko)
6+
17
## 2.3.0 (2019-05-14)
28

39
* Allow uploading files larger than 50 GB to S3 storage by scaling the part size according to upload length (@janko)

lib/tus/storage/s3.rb

+18-54
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@ class S3
2121
MAX_MULTIPART_PARTS = 10_000
2222
MAX_OBJECT_SIZE = 5 * 1024 * 1024 * 1024 * 1024
2323

24-
attr_reader :client, :bucket, :prefix, :upload_options, :limits, :concurrency
24+
attr_reader :client, :bucket, :prefix, :upload_options, :limits
2525

2626
# Initializes an aws-sdk-s3 client with the given credentials.
2727
def initialize(bucket:, prefix: nil, upload_options: {}, limits: {}, concurrency: {}, thread_count: nil, **client_options)
2828
fail ArgumentError, "the :bucket option was nil" unless bucket
2929

3030
if thread_count
31-
warn "[Tus-Ruby-Server] :thread_count is deprecated and will be removed in the next major version, use :concurrency instead, e.g `concurrency: { concatenation: 20 }`"
32-
concurrency[:concatenation] = thread_count
31+
warn "[Tus-Ruby-Server] :thread_count option is obsolete and will be removed in the next major version"
32+
elsif concurrency.any?
33+
warn "[Tus-Ruby-Server] :concurrency option is obsolete and will be removed in the next major version"
3334
end
3435

3536
resource = Aws::S3::Resource.new(**client_options)
@@ -39,7 +40,6 @@ def initialize(bucket:, prefix: nil, upload_options: {}, limits: {}, concurrency
3940
@prefix = prefix
4041
@upload_options = upload_options
4142
@limits = limits
42-
@concurrency = concurrency
4343
end
4444

4545
# Initiates multipart upload for the given upload, and stores its
@@ -262,64 +262,28 @@ def delete(objects)
262262
end
263263

264264
# Creates multipart parts for the specified multipart upload by copying
265-
# given objects into them. It uses a queue and a fixed-size thread pool
266-
# which consumes that queue.
265+
# given objects into them.
267266
def copy_parts(objects, multipart_upload)
268-
parts = compute_parts(objects, multipart_upload)
269-
input = Queue.new
270-
results = Queue.new
271-
272-
parts.each { |part| input << part }
273-
input.close
274-
275-
thread_count = concurrency[:concatenation] || 10
276-
threads = thread_count.times.map { copy_part_thread(input, results) }
277-
278-
errors = threads.map(&:value).compact
279-
fail errors.first if errors.any?
280-
281-
part_results = Array.new(results.size) { results.pop } # convert Queue into an Array
282-
part_results.sort_by { |part| part.fetch("part_number") }
283-
end
284-
285-
# Computes data required for copying objects into new multipart parts.
286-
def compute_parts(objects, multipart_upload)
287-
objects.map.with_index do |object, idx|
288-
{
289-
bucket: multipart_upload.bucket_name,
290-
key: multipart_upload.object_key,
291-
upload_id: multipart_upload.id,
292-
copy_source: [object.bucket_name, object.key].join("/"),
293-
part_number: idx + 1,
294-
}
267+
threads = objects.map.with_index do |object, idx|
268+
Thread.new { copy_part(object, idx + 1, multipart_upload) }
295269
end
296-
end
297270

298-
# Consumes the queue for new multipart part information and issues the
299-
# copy requests.
300-
def copy_part_thread(input, results)
301-
Thread.new do
302-
begin
303-
loop do
304-
part = input.pop or break
305-
part_result = copy_part(part)
306-
results << part_result
307-
end
308-
nil
309-
rescue => error
310-
input.clear # clear other work
311-
error
312-
end
313-
end
271+
threads.map(&:value)
314272
end
315273

316274
# Creates a new multipart part by copying the object specified in the
317275
# given data. Returns part number and ETag that will be required later
318276
# for completing the multipart upload.
319-
def copy_part(part)
320-
response = client.upload_part_copy(part)
321-
322-
{ "part_number" => part[:part_number], "etag" => response.copy_part_result.etag }
277+
def copy_part(object, part_number, multipart_upload)
278+
response = client.upload_part_copy(
279+
bucket: multipart_upload.bucket_name,
280+
key: multipart_upload.object_key,
281+
upload_id: multipart_upload.id,
282+
copy_source: [object.bucket_name, object.key].join("/"),
283+
part_number: part_number,
284+
)
285+
286+
{ "part_number" => part_number, "etag" => response.copy_part_result.etag }
323287
end
324288

325289
# Retuns an Aws::S3::Object with the prefix applied.

0 commit comments

Comments
 (0)