-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[DO NOT MERGE] TransferManager: DirectoryUploader & DirectoryDownloader #3288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: version-3
Are you sure you want to change the base?
Changes from all commits
6cb37fe
d31ae4d
74ed189
4e8db17
098049d
8f387d2
7749ba5
99f0de6
441fa82
c792439
adce496
012c2bc
ee9c9da
75df844
e5d3245
173f5e4
cf88ff2
2758c4d
b92d3b3
6afb495
86b53e8
d587ae1
eae3814
14010ef
8ab4edc
face84d
36a1e87
7dd9f98
77ab1ba
e843137
009127d
39912fd
f9fb117
d307555
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# frozen_string_literal: true | ||
|
||
module Aws | ||
module S3 | ||
# @api private | ||
class DefaultExecutor | ||
def initialize(options = {}) | ||
@queue = Queue.new | ||
@max_threads = options[:max_threads] || 10 | ||
@pool = [] | ||
@running = true | ||
@mutex = Mutex.new | ||
end | ||
|
||
def post(*args, &block) | ||
raise 'Executor is not running' unless @running | ||
|
||
@queue << [args, block] | ||
ensure_worker_available | ||
end | ||
|
||
def shutdown | ||
@running = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we track state of running -> shutting down -> shutdown? There is a gap in between calling shutdown and being fully shutdown (Where tasks cannot be submitted but tasks are still completing). Additionally - we should probably offer a "kill" method that will immediately kill all threads. And finally - shutdown methods usually offer the ability to specify a timeout after which remaining tasks are killed (I think you can use the limit value in |
||
@max_threads.times { @queue << :shutdown } | ||
@pool.each(&:join) | ||
@pool.clear | ||
true | ||
end | ||
|
||
def running? | ||
@running | ||
end | ||
|
||
private | ||
|
||
def ensure_worker_available | ||
@mutex.synchronize do | ||
@pool.select!(&:alive?) | ||
@pool << spawn_worker if @pool.size < @max_threads | ||
end | ||
end | ||
|
||
def spawn_worker | ||
Thread.new do | ||
while (job = @queue.shift) | ||
break if job == :shutdown | ||
|
||
args, block = job | ||
block.call(*args) | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
# frozen_string_literal: true | ||
|
||
module Aws | ||
module S3 | ||
# Raised when DirectoryDownloader fails to download objects from S3 bucket | ||
class DirectoryDownloadError < StandardError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By convention we were putting these in separate files right? If you want to promote the other two (multipart errors) to the files where they are used that's fine too, but let's stay consistent. |
||
def initialize(message, errors = []) | ||
@errors = errors | ||
super(message) | ||
end | ||
|
||
# @return [Array<StandardError>] The list of errors encountered when downloading objects | ||
attr_reader :errors | ||
end | ||
|
||
# @api private | ||
class DirectoryDownloader | ||
def initialize(options = {}) | ||
@client = options[:client] || Client.new | ||
@executor = options[:executor] || DefaultExecutor.new | ||
@options = options | ||
@abort_download = false | ||
end | ||
|
||
attr_reader :client, :abort_download | ||
|
||
# TODO: need to add progress tracker | ||
def download(destination, bucket:, **options) | ||
if File.exist?(destination) | ||
raise ArgumentError 'invalid destination, expected a directory' unless File.directory?(destination) | ||
else | ||
FileUtils.mkdir_p(destination) | ||
end | ||
|
||
download_opts = options.dup | ||
@bucket = bucket | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's odd to set instance state like bucket in a method. Wouldn't you pass bucket, errors, configuration, etc, down to relevant methods and return back errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'd agree. I'm not against this being a "one-shot" class - IE, for each directory download, you must create this object again - in which case bucket/destination would be set on initialization instead. Either way though - setting it as a member variable here is a little weird. |
||
@ignore_failure = download_opts.delete(:ignore_failure) || false | ||
@errors = [] | ||
|
||
downloader = FileDownloader.new(client: client, executor: @executor) | ||
producer = ObjectProducer.new(destination, build_producer_opts(download_opts)) | ||
producer.run | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems a little weird to need to call |
||
|
||
downloads = process_download_queue(producer, downloader, download_opts) | ||
build_result(downloads) | ||
ensure | ||
@executor.shutdown unless @options[:executor] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should always assume an executor I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is an internal/private api - I think I'd agree that its reasonable to always require an executor to be provided, and then we don't ever shut it down |
||
end | ||
|
||
def build_producer_opts(opts) | ||
{ | ||
directory_downloader: self, | ||
client: @client, | ||
bucket: @bucket, | ||
s3_prefix: opts.delete(:s3_prefix), | ||
ignore_failure: @ignore_failure, | ||
filter_callback: opts.delete(:filter_callback), | ||
errors: @errors | ||
} | ||
end | ||
|
||
def build_result(download_count) | ||
downloads = [download_count - @errors.count, 0].max | ||
|
||
if @abort_download | ||
msg = "failed to download directory: downloaded #{downloads} files " \ | ||
"and failed to download #{@errors.count} files." | ||
raise DirectoryDownloadError.new(msg, @errors) | ||
else | ||
result = { completed_downloads: downloads, failed_downloads: @errors.count } | ||
result[:errors] = @errors if @errors.any? | ||
result | ||
end | ||
end | ||
|
||
def process_download_queue(producer, downloader, opts) | ||
download_attempts = 0 | ||
completion_queue = Queue.new | ||
queue_executor = DefaultExecutor.new | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need an executor here? I know this needs to be done async/in a thread, but would it work to just spawn a single thread here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Edit: NVM, I understand why we're doing this to limit the max concurrent downloads at one time. Maybe add comments or have a parameter/constant for that? |
||
while (object = producer.object_queue.shift) != :done | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion on the Producer interface - I think the object_queue should be an internal detail here. I'd lean towards having this implement enumerable, so here you would just do:
calling |
||
break if @abort_download | ||
|
||
download_attempts += 1 | ||
queue_executor.post(object) do |o| | ||
dir_path = File.dirname(o[:path]) | ||
FileUtils.mkdir_p(dir_path) unless dir_path == @destination || Dir.exist?(dir_path) | ||
|
||
downloader.download(o[:path], opts.merge(bucket: @bucket, key: o[:key])) | ||
rescue StandardError => e | ||
@errors << e | ||
@abort_download = true unless @ignore_failure | ||
ensure | ||
completion_queue << :done | ||
end | ||
end | ||
download_attempts.times { completion_queue.pop } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'm probably missing something here - what is download_attemps - what is it tracking and how is it used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're using this to wait for all tasks to complete - could we use the shutdown method of the executor instead? |
||
download_attempts | ||
ensure | ||
queue_executor.shutdown | ||
end | ||
|
||
# @api private | ||
class ObjectProducer | ||
def initialize(destination_dir, options = {}) | ||
@destination_dir = destination_dir | ||
@client = options[:client] | ||
@bucket = options[:bucket] | ||
@s3_prefix = options[:s3_prefix] | ||
@ignore_failure = options[:ignore_failure] | ||
@filter_callback = options[:filter_callback] | ||
@errors = options[:errors] | ||
@directory_downloader = options[:directory_downloader] | ||
@object_queue = SizedQueue.new(100) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make this a constant? |
||
end | ||
|
||
attr_reader :object_queue | ||
|
||
def run | ||
Thread.new do | ||
stream_objects | ||
@object_queue << :done | ||
end | ||
end | ||
|
||
private | ||
|
||
def build_object_entry(key) | ||
{ path: File.join(@destination_dir, normalize_key(key)), key: key } | ||
end | ||
|
||
# TODO: need to add filter callback, double check handling of objects that ends with / | ||
def stream_objects(continuation_token: nil) | ||
resp = @client.list_objects_v2(bucket: @bucket, continuation_token: continuation_token) | ||
resp.contents.each do |o| | ||
break if @directory_downloader.abort_download | ||
next if o.key.end_with?('/') | ||
|
||
@object_queue << build_object_entry(o.key) | ||
rescue StandardError => e | ||
@errors << e | ||
@abort_download = true unless @ignore_failure | ||
end | ||
stream_objects(continuation_token: resp.next_continuation_token) if resp.next_continuation_token | ||
end | ||
|
||
def normalize_key(key) | ||
key = key.delete_prefix(@s3_prefix) if @s3_prefix | ||
return key if File::SEPARATOR == '/' | ||
|
||
key.tr('/', File::SEPARATOR) | ||
end | ||
end | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does running state default to true in the initializer for other implementations of this in ruby-concurrency?