diff --git a/app/jobs/aws_check_fixity_job.rb b/app/jobs/aws_check_fixity_job.rb index ea9d798..5b31887 100644 --- a/app/jobs/aws_check_fixity_job.rb +++ b/app/jobs/aws_check_fixity_job.rb @@ -1,28 +1,16 @@ # frozen_string_literal: true -# rubocop:disable Metrics/MethodLength - class AwsCheckFixityJob < ApplicationJob queue_as CheckPlease::Queues::CHECK_FIXITY def perform(job_identifier, bucket_name, object_path, checksum_algorithm_name) response_stream_name = "#{FixityCheckChannel::FIXITY_CHECK_STREAM_PREFIX}#{job_identifier}" - progress_report_lambda = lambda { |_chunk, _bytes_read, chunk_counter| - return unless (chunk_counter % 100).zero? - - # TODO: Broadcast a message to indicate that the processing is still happening. - # This way, clients will know if a job has stalled and will not wait indefinitely for results. - ActionCable.server.broadcast( - response_stream_name, - { type: 'fixity_check_in_progress' }.to_json - ) - } checksum_hexdigest, object_size = CheckPlease::Aws::ObjectFixityChecker.check( bucket_name, object_path, checksum_algorithm_name, - on_chunk: progress_report_lambda + on_chunk: progress_report_lambda(response_stream_name) ) # Broadcast message when job is complete @@ -63,4 +51,17 @@ def broadcast_fixity_check_error( }.to_json ) end + + def progress_report_lambda(response_stream_name) + lambda do |_chunk, _bytes_read, chunk_counter| + return unless (chunk_counter % 100).zero? + + # We periodically broadcast a message to indicate that the processing is still happening. + # This is so that a client can check whether a job has stalled. + ActionCable.server.broadcast( + response_stream_name, + { type: 'fixity_check_in_progress' }.to_json + ) + end + end end diff --git a/spec/aws_check_fixity_job_spec.rb b/spec/aws_check_fixity_job_spec.rb new file mode 100644 index 0000000..621c0c9 --- /dev/null +++ b/spec/aws_check_fixity_job_spec.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +# rubocop:disable RSpec/ExampleLength + +require 'rails_helper' + +describe AwsCheckFixityJob do + let(:aws_check_fixity_job) { described_class.new } + let(:job_identifier) { 'great-job' } + let(:bucket_name) { 'example-bucket' } + let(:object_path) { 'path/to/object.png' } + let(:checksum_algorithm_name) { 'sha256' } + let(:example_content) { 'example' } + let(:checksum_hexdigest) { Digest::SHA256.hexdigest(example_content) } + let(:object_size) { example_content.bytesize } + let(:stream_name) { "#{FixityCheckChannel::FIXITY_CHECK_STREAM_PREFIX}#{job_identifier}" } + let(:error_message) { 'oh no!' } + + describe '#perform' do + it 'works as expected' do + allow(CheckPlease::Aws::ObjectFixityChecker).to receive(:check).with( + bucket_name, + object_path, + checksum_algorithm_name, + on_chunk: Proc + ).and_return([checksum_hexdigest, object_size]) + expect(aws_check_fixity_job).to receive(:broadcast_fixity_check_complete).with( + stream_name, + bucket_name, + object_path, + checksum_algorithm_name, + checksum_hexdigest, + object_size + ) + aws_check_fixity_job.perform(job_identifier, bucket_name, object_path, checksum_algorithm_name) + end + + it 'broadcasts a fixity check error message when an error occurs during processing' do + allow(CheckPlease::Aws::ObjectFixityChecker).to receive(:check).and_raise(StandardError, error_message) + + expect(aws_check_fixity_job).to receive(:broadcast_fixity_check_error).with( + stream_name, + error_message, + bucket_name, + object_path, + checksum_algorithm_name + ) + aws_check_fixity_job.perform(job_identifier, bucket_name, object_path, checksum_algorithm_name) + end + end + + describe '#progress_report_lambda' do + let(:chunk) { 'a chunk of content' } + let(:bytes_read) { 12_345 } + + it 'broadcasts an Action Cable message at the expected interval' do + progress_report_lambda = aws_check_fixity_job.progress_report_lambda(stream_name) + expect(ActionCable.server).to receive(:broadcast).exactly(10).times + (1..1000).each do |i| + progress_report_lambda.call(chunk, bytes_read, i) + end + end + end + + describe '#broadcast_fixity_check_complete' do + it 'results in the expected broadcast' do + expect(ActionCable.server).to receive(:broadcast).with( + stream_name, + { + type: 'fixity_check_complete', + data: { + bucket_name: bucket_name, object_path: object_path, + checksum_algorithm_name: checksum_algorithm_name, + checksum_hexdigest: checksum_hexdigest, object_size: object_size + } + }.to_json + ) + aws_check_fixity_job.broadcast_fixity_check_complete( + stream_name, bucket_name, object_path, checksum_algorithm_name, checksum_hexdigest, object_size + ) + end + end + + describe '#broadcast_fixity_check_error' do + it 'results in the expected broadcast' do + expect(ActionCable.server).to receive(:broadcast).with( + stream_name, + { + type: 'fixity_check_error', + data: { + error_message: error_message, bucket_name: bucket_name, + object_path: object_path, checksum_algorithm_name: checksum_algorithm_name + } + }.to_json + ) + aws_check_fixity_job.broadcast_fixity_check_error( + stream_name, error_message, bucket_name, object_path, checksum_algorithm_name + ) + end + end +end diff --git a/spec/channels/fixity_check_channel_spec.rb b/spec/channels/fixity_check_channel_spec.rb index c4c4eef..fcd4619 100644 --- a/spec/channels/fixity_check_channel_spec.rb +++ b/spec/channels/fixity_check_channel_spec.rb @@ -47,7 +47,8 @@ ).and_return([checksum_hexdigest, object_size]) end - it 'initiates a checksum calculation, which queues a background job' do + it 'initiates a checksum calculation, which queues a background job and '\ + 'responds with a fixity_check_complete broadcast' do expect(AwsCheckFixityJob).to receive(:perform_later).with( job_identifier, bucket_name, object_path, checksum_algorithm_name ).and_call_original