Skip to content

Commit

Permalink
Add http checksum endpoints to support polling
Browse files Browse the repository at this point in the history
  • Loading branch information
elohanlon committed Oct 6, 2024
1 parent 7bfa349 commit 7285aa2
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 36 deletions.
5 changes: 5 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ AllCops:
Lint/MissingCopEnableDirective:
Enabled: false

Metrics/AbcSize:
Exclude:
- app/jobs/aws_check_fixity_job.rb

Metrics/MethodLength:
Exclude:
- lib/check_please/aws/object_fixity_checker.rb
- app/controllers/fixity_checks_controller.rb
- app/jobs/aws_check_fixity_job.rb

RSpec/VerifiedDoubles:
Exclude:
Expand Down
8 changes: 7 additions & 1 deletion app/channels/fixity_check_channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def run_fixity_check_for_s3_object(data)
object_path = data['object_path']
checksum_algorithm_name = data['checksum_algorithm_name']

AwsCheckFixityJob.perform_later(job_identifier, bucket_name, object_path, checksum_algorithm_name)
fixity_check = FixityCheck.create!(
job_identifier: job_identifier,
bucket_name: bucket_name,
object_path: object_path,
checksum_algorithm_name: checksum_algorithm_name
)
AwsCheckFixityJob.perform_later(fixity_check.id)
end
end
33 changes: 29 additions & 4 deletions app/controllers/fixity_checks_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,40 @@ def run_fixity_check_for_s3_object
bucket_name, object_path, checksum_algorithm_name
)

render plain: {
render json: {
bucket_name: bucket_name, object_path: object_path, checksum_algorithm_name: checksum_algorithm_name,
checksum_hexdigest: checksum_hexdigest, object_size: object_size
}.to_json
}
rescue StandardError => e
render plain: {
render json: {
error_message: e.message,
bucket_name: bucket_name, object_path: object_path, checksum_algorithm_name: checksum_algorithm_name
}.to_json, status: :bad_request
}, status: :bad_request
end

def create
bucket_name = fixity_check_params['bucket_name']
object_path = fixity_check_params['object_path']
checksum_algorithm_name = fixity_check_params['checksum_algorithm_name']
fixity_check = FixityCheck.create!(
# User does not need to supply a job_identifier param when using the create endpoint.
# We'll just use a ranom UUID here.
job_identifier: SecureRandom.uuid,
bucket_name: bucket_name,
object_path: object_path,
checksum_algorithm_name: checksum_algorithm_name
)
AwsCheckFixityJob.perform_later(fixity_check.id)
render json: fixity_check
rescue StandardError => e
render json: {
error_message: e.message
}, status: :bad_request
end

# GET /fixity_checks/1
def show
render json: FixityCheck.find(params[:id])
end

private
Expand Down
41 changes: 32 additions & 9 deletions app/jobs/aws_check_fixity_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,41 @@
class AwsCheckFixityJob < ApplicationJob
queue_as CheckPlease::Queues::CHECK_FIXITY

def perform(job_identifier, bucket_name, object_path, checksum_algorithm_name)
response_stream_name = "#{FixityCheckChannel::FIXITY_CHECK_STREAM_PREFIX}#{job_identifier}"
def perform(fixity_check_id)
fixity_check = FixityCheck.find(fixity_check_id)
response_stream_name = "#{FixityCheckChannel::FIXITY_CHECK_STREAM_PREFIX}#{fixity_check.job_identifier}"

# Begin calculating checksum and file size
fixity_check.in_progress!
checksum_hexdigest, object_size = CheckPlease::Aws::ObjectFixityChecker.check(
bucket_name,
object_path,
checksum_algorithm_name,
on_chunk: progress_report_lambda(response_stream_name)
fixity_check.bucket_name,
fixity_check.object_path,
fixity_check.checksum_algorithm_name,
on_chunk: progress_report_lambda(fixity_check, response_stream_name)
)

fixity_check.update!(
checksum_hexdigest: checksum_hexdigest,
object_size: object_size,
status: :success
)

# Broadcast message when job is complete
broadcast_fixity_check_complete(
response_stream_name, bucket_name, object_path, checksum_algorithm_name, checksum_hexdigest, object_size
response_stream_name, fixity_check.bucket_name, fixity_check.object_path,
fixity_check.checksum_algorithm_name, checksum_hexdigest, object_size
)
rescue StandardError => e
broadcast_fixity_check_error(response_stream_name, e.message, bucket_name, object_path, checksum_algorithm_name)
fixity_check.update!(
checksum_hexdigest: checksum_hexdigest,
object_size: object_size,
status: :failure,
error_message: "An unexpected error occurred: #{e.class.name} -> #{e.message}"
)
broadcast_fixity_check_error(
response_stream_name, e.message, fixity_check.bucket_name,
fixity_check.object_path, fixity_check.checksum_algorithm_name
)
end

def broadcast_fixity_check_complete(
Expand Down Expand Up @@ -52,10 +71,14 @@ def broadcast_fixity_check_error(
)
end

def progress_report_lambda(response_stream_name)
def progress_report_lambda(fixity_check, response_stream_name)
lambda do |_chunk, _bytes_read, chunk_counter|
# Only provide an update once per 100 chunks processed
return unless (chunk_counter % 100).zero?

# Update the updated_at attribute for this FixityCheck record
fixity_check.touch # rubocop:disable Rails/SkipsModelValidations

# We periodically broadcast a message to indicate that the processing is still happening.
# This is so that a client can check whether a job has stalled.
ActionCable.server.broadcast(
Expand Down
5 changes: 5 additions & 0 deletions app/models/fixity_check.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# frozen_string_literal: true

class FixityCheck < ApplicationRecord
enum status: { pending: 0, in_progress: 1, success: 2, failure: 3 }
end
6 changes: 5 additions & 1 deletion config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
# Defines the root path route ("/")
root 'pages#home'

post '/fixity_checks/run_fixity_check_for_s3_object', to: 'fixity_checks#run_fixity_check_for_s3_object'
resources :fixity_checks, only: [:create, :show], defaults: { format: 'json' } do
collection do
post 'run_fixity_check_for_s3_object'
end
end

# Mount ActionCable Websocket route
mount ActionCable.server => '/cable'
Expand Down
17 changes: 17 additions & 0 deletions db/migrate/20241005213933_create_fixity_checks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
class CreateFixityChecks < ActiveRecord::Migration[7.1]
def change
create_table :fixity_checks do |t|
t.string :job_identifier, null: false, limit: 255
t.string :bucket_name, null: false
t.string :object_path, null: false
t.string :checksum_algorithm_name, null: false
t.string :checksum_hexdigest, null: true
t.bigint :object_size, null: true
t.integer :status, null: false, default: 0
t.text :error_message, null: true
t.timestamps
end
add_index :fixity_checks, :job_identifier, unique: true
add_index :fixity_checks, :updated_at
end
end
17 changes: 16 additions & 1 deletion db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 42 additions & 19 deletions spec/aws_check_fixity_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,46 @@
let(:object_size) { example_content.bytesize }
let(:stream_name) { "#{FixityCheckChannel::FIXITY_CHECK_STREAM_PREFIX}#{job_identifier}" }
let(:error_message) { 'oh no!' }
let(:fixity_check) do
FactoryBot.create(
:fixity_check,
job_identifier: job_identifier,
bucket_name: bucket_name,
object_path: object_path,
checksum_algorithm_name: checksum_algorithm_name
)
end

describe '#perform' do
it 'works as expected' do
allow(CheckPlease::Aws::ObjectFixityChecker).to receive(:check).with(
bucket_name,
object_path,
checksum_algorithm_name,
on_chunk: Proc
).and_return([checksum_hexdigest, object_size])
expect(aws_check_fixity_job).to receive(:broadcast_fixity_check_complete).with(
stream_name,
bucket_name,
object_path,
checksum_algorithm_name,
checksum_hexdigest,
object_size
)
aws_check_fixity_job.perform(job_identifier, bucket_name, object_path, checksum_algorithm_name)
context 'a successful run' do
before do
allow(CheckPlease::Aws::ObjectFixityChecker).to receive(:check).with(
bucket_name,
object_path,
checksum_algorithm_name,
on_chunk: Proc
).and_return([checksum_hexdigest, object_size])
end

it 'broadcasts a fixity check complete message' do
expect(aws_check_fixity_job).to receive(:broadcast_fixity_check_complete).with(
stream_name,
bucket_name,
object_path,
checksum_algorithm_name,
checksum_hexdigest,
object_size
)
aws_check_fixity_job.perform(fixity_check.id)
end

it 'saves the FixityCheck result in the database' do
aws_check_fixity_job.perform(fixity_check.id)
FixityCheck.first.tap do |fixity_check|
expect(fixity_check.checksum_hexdigest).to eq(checksum_hexdigest)
expect(fixity_check.object_size).to eq(object_size)
end
end
end

it 'broadcasts a fixity check error message when an error occurs during processing' do
Expand All @@ -45,16 +67,17 @@
object_path,
checksum_algorithm_name
)
aws_check_fixity_job.perform(job_identifier, bucket_name, object_path, checksum_algorithm_name)
aws_check_fixity_job.perform(fixity_check.id)
end
end

describe '#progress_report_lambda' do
let(:chunk) { 'a chunk of content' }
let(:bytes_read) { 12_345 }

it 'broadcasts an Action Cable message at the expected interval' do
progress_report_lambda = aws_check_fixity_job.progress_report_lambda(stream_name)
it 'broadcasts an Action Cable message at the expected interval, and touches the FixityCheck record' do
progress_report_lambda = aws_check_fixity_job.progress_report_lambda(fixity_check, stream_name)
expect(fixity_check).to receive(:touch).exactly(10).times
expect(ActionCable.server).to receive(:broadcast).exactly(10).times
(1..1000).each do |i|
progress_report_lambda.call(chunk, bytes_read, i)
Expand Down
11 changes: 10 additions & 1 deletion spec/channels/fixity_check_channel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@
let(:file_content) { 'A' * 1024 }
let(:checksum_hexdigest) { Digest::SHA256.hexdigest(file_content) }
let(:object_size) { file_content.bytesize }
let(:fixity_check) do
FactoryBot.create(
:fixity_check,
job_identifier: job_identifier,
bucket_name: bucket_name,
object_path: object_path,
checksum_algorithm_name: checksum_algorithm_name
)
end

before do
allow(CheckPlease::Aws::ObjectFixityChecker).to receive(:check).with(
Expand All @@ -50,7 +59,7 @@
it 'initiates a checksum calculation, which queues a background job and '\
'responds with a fixity_check_complete broadcast' do
expect(AwsCheckFixityJob).to receive(:perform_later).with(
job_identifier, bucket_name, object_path, checksum_algorithm_name
FixityCheck.count + 1
).and_call_original

expect {
Expand Down
26 changes: 26 additions & 0 deletions spec/factories/fixity_checks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

FactoryBot.define do
factory :fixity_check do
job_identifier { SecureRandom.uuid }
bucket_name { 'sample_bucket' }
object_path { 'some/object/path' }
checksum_algorithm_name { 'sha256' }

trait :in_progress do
status { 'in_progress' }
end

trait :failure do
status { 'failure' }
error_message { 'An error occurred and this is the message associated with it.' }
end

trait :success do
status { 'success' }
example_content = 'example'
checksum_hexdigest { Digest::SHA256.hexdigest(example_content) }
object_size { example_content.bytesize }
end
end
end
Loading

0 comments on commit 7285aa2

Please sign in to comment.