Skip to content

Commit

Permalink
Add Action Cable implementation, FixityCheckChannel, and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
elohanlon committed Jun 3, 2024
1 parent 7472f38 commit 8fee850
Show file tree
Hide file tree
Showing 22 changed files with 470 additions and 23 deletions.
4 changes: 2 additions & 2 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ Lint/MissingCopEnableDirective:

Metrics/MethodLength:
Exclude:
- lib/check_please/aws/object_fixity_verifier.rb
- lib/check_please/aws/object_fixity_checker.rb

RSpec/VerifiedDoubles:
Exclude:
- spec/check_please/aws/object_fixity_verifier_spec.rb
- spec/check_please/aws/object_fixity_checker_spec.rb
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ gem 'best_type', '~> 1.0'
gem 'bootsnap', require: false
# Add CRC32C support to the Ruby Digest module
gem 'digest-crc', '~> 0.6.5'
# Client library for connecting to a websocket endpoint
gem 'faye-websocket', '~> 0.11.3'
# Google Cloud Storage SDK
gem 'google-cloud-storage', '~> 1.49'
# Use JavaScript with ESM import maps [https://github.com/rails/importmap-rails]
Expand Down
5 changes: 5 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ GEM
drb (2.2.1)
ed25519 (1.3.0)
erubi (1.12.0)
eventmachine (1.2.7)
factory_bot (6.4.6)
activesupport (>= 5.0.0)
factory_bot_rails (6.4.3)
Expand All @@ -173,6 +174,9 @@ GEM
faraday-net_http (>= 2.0, < 3.2)
faraday-net_http (3.1.0)
net-http
faye-websocket (0.11.3)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
ffi (1.16.3)
globalid (1.2.1)
activesupport (>= 6.1)
Expand Down Expand Up @@ -518,6 +522,7 @@ DEPENDENCIES
devise
digest-crc (~> 0.6.5)
factory_bot_rails
faye-websocket (~> 0.11.3)
google-cloud-storage (~> 1.49)
importmap-rails
jbuilder
Expand Down
14 changes: 14 additions & 0 deletions app/channels/application_cable/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,19 @@

module ApplicationCable
class Connection < ActionCable::Connection::Base
identified_by :uuid

def connect
authenticate! # reject connections that do not successfully authenticate
self.uuid = SecureRandom.uuid # assign a random uuid value when a user connects
end

private

def authenticate!
return if request.authorization&.split(' ')&.at(1) == CHECK_PLEASE['remote_request_api_key']

reject_unauthorized_connection
end
end
end
46 changes: 46 additions & 0 deletions app/channels/fixity_check_channel.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# frozen_string_literal: true

class FixityCheckChannel < ApplicationCable::Channel
FIXITY_CHECK_STREAM_PREFIX = "#{CHECK_PLEASE['action_cable_stream_prefix']}fixity_check:".freeze

# A websocket client subscribes by sending this message:
# {
# "command" => "subscribe",
# "identifier" => { "channel" => "FixityCheckChannel", "job_identifier" => "cool-job-id1" }.to_json
# }
def subscribed
return if params[:job_identifier].blank?

stream_name = "#{FIXITY_CHECK_STREAM_PREFIX}#{params[:job_identifier]}"
Rails.logger.debug "A client has started streaming from: #{stream_name}"
stream_from stream_name
end

def unsubscribed
# Any cleanup needed when channel is unsubscribed
return if params[:job_identifier].blank?

stream_name = "#{FIXITY_CHECK_STREAM_PREFIX}#{params[:job_identifier]}"
Rails.logger.debug "A client has stopped streaming from: #{stream_name}"
stop_stream_from stream_name
end

# A websocket client runs this command by sending this message:
# {
# "command" => "run_fixity_check_for_s3_object",
# "identifier" => { "channel" => "FixityCheckChannel", "job_identifier" => "cool-job-id1" }.to_json,
# "data" => {
# "action" => "run_fixity_check_for_s3_object", "bucket_name" => "some-bucket",
# "object_path" => "path/to/object.png", "checksum_algorithm_name" => "sha256"
# }.to_json
# }
def run_fixity_check_for_s3_object(data)
Rails.logger.debug("run_fixity_check_for_s3_object action received with job_identifier: #{params[:job_identifier]}")
job_identifier = params[:job_identifier]
bucket_name = data['bucket_name']
object_path = data['object_path']
checksum_algorithm_name = data['checksum_algorithm_name']

AwsCheckFixityJob.perform_later(job_identifier, bucket_name, object_path, checksum_algorithm_name)
end
end
66 changes: 66 additions & 0 deletions app/jobs/aws_check_fixity_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# frozen_string_literal: true

# rubocop:disable Metrics/MethodLength

class AwsCheckFixityJob < ApplicationJob
queue_as CheckPlease::Queues::CHECK_FIXITY

def perform(job_identifier, bucket_name, object_path, checksum_algorithm_name)
response_stream_name = "#{FixityCheckChannel::FIXITY_CHECK_STREAM_PREFIX}#{job_identifier}"
progress_report_lambda = lambda { |_chunk, _bytes_read, chunk_counter|
return unless (chunk_counter % 100).zero?

# TODO: Broadcast a message to indicate that the processing is still happening.
# This way, clients will know if a job has stalled and will not wait indefinitely for results.
ActionCable.server.broadcast(
response_stream_name,
{ type: 'fixity_check_in_progress' }.to_json
)
}

checksum_hexdigest, object_size = CheckPlease::Aws::ObjectFixityChecker.check(
bucket_name,
object_path,
checksum_algorithm_name,
on_chunk: progress_report_lambda
)

# Broadcast message when job is complete
broadcast_fixity_check_complete(
response_stream_name, bucket_name, object_path, checksum_algorithm_name, checksum_hexdigest, object_size
)
rescue StandardError => e
broadcast_fixity_check_error(response_stream_name, e.message, bucket_name, object_path, checksum_algorithm_name)
end

def broadcast_fixity_check_complete(
response_stream_name, bucket_name, object_path, checksum_algorithm_name, checksum_hexdigest, object_size
)
ActionCable.server.broadcast(
response_stream_name,
{
type: 'fixity_check_complete',
data: {
bucket_name: bucket_name, object_path: object_path,
checksum_algorithm_name: checksum_algorithm_name,
checksum_hexdigest: checksum_hexdigest, object_size: object_size
}
}.to_json
)
end

def broadcast_fixity_check_error(
response_stream_name, error_message, bucket_name, object_path, checksum_algorithm_name
)
ActionCable.server.broadcast(
response_stream_name,
{
type: 'fixity_check_error',
data: {
error_message: error_message, bucket_name: bucket_name,
object_path: object_path, checksum_algorithm_name: checksum_algorithm_name
}
}.to_json
)
end
end
1 change: 1 addition & 0 deletions config/cable.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
development:
# adapter: async
adapter: redis
url: redis://localhost:6379/1

Expand Down
2 changes: 2 additions & 0 deletions config/environments/deployed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
# config.action_cable.mount_path = nil
# config.action_cable.url = 'wss://example.com/cable'
# config.action_cable.allowed_request_origins = [ 'http://example.com', /http:\/\/example.*/ ]
# Allow Action Cable access from any origin.
config.action_cable.disable_request_forgery_protection = true

# Force all access to the app over SSL, use Strict-Transport-Security, and use secure cookies.
# config.force_ssl = true
Expand Down
2 changes: 1 addition & 1 deletion config/environments/development.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
# config.action_view.annotate_rendered_view_with_filenames = true

# Uncomment if you wish to allow Action Cable access from any origin.
# config.action_cable.disable_request_forgery_protection = true
config.action_cable.disable_request_forgery_protection = true

# Raise error when a before_action's only/except options reference missing actions
config.action_controller.raise_on_missing_callback_actions = true
Expand Down
4 changes: 4 additions & 0 deletions config/environments/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,8 @@

# Raise error when a before_action's only/except options reference missing actions
config.action_controller.raise_on_missing_callback_actions = true

# Allow Action Cable access from any origin (so that it works in Capybara tests)
config.action_cable.disable_request_forgery_protection = true
# config.action_cable.allowed_request_origins = ['https://rubyonrails.com', %r{http://ruby.*}]
end
3 changes: 3 additions & 0 deletions config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@

# Defines the root path route ("/")
root 'pages#home'

# Mount ActionCable Websocket route
mount ActionCable.server => '/cable'
end
7 changes: 6 additions & 1 deletion config/templates/check_please.template.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
development:
run_queued_jobs_inline: true
remote_request_api_key: changethis
action_cable_stream_prefix: '<%= "#{Rails.application.class.module_parent_name}:#{Rails.env}:" %>'

test:
run_queued_jobs_inline: true
run_queued_jobs_inline: true
remote_request_api_key: changethis
action_cable_stream_prefix: '<%= "#{Rails.application.class.module_parent_name}:#{Rails.env}:" %>'
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

module CheckPlease::Aws::ObjectFixityVerifier
module CheckPlease::Aws::ObjectFixityChecker
def self.digester_for_checksum_algorithm!(checksum_algorithm_name)
case checksum_algorithm_name
when 'sha256'
Expand All @@ -16,17 +16,24 @@ def self.digester_for_checksum_algorithm!(checksum_algorithm_name)
end
end

def self.verify(bucket_name, object_path, checksum_algorithm_name, print_memory_stats: false)
# Checks the specified object and returns
# @param bucket_name [String] The name of the S3 bucket
# @param object_path [String] The object path in the S3 bucket
# @param checksum_algorithm_name [String] A checksum algorithm name.
# Allowed values include: sha256, sha512, md5, crc32c
# @param on_chunk [lambda] A lambda that is called once per data chunk read, during the fixity check.
# @return [Array] An with two elements, the first being a hex digest of the object's bytes and the second
# being the object size in bytes.
def self.check(bucket_name, object_path, checksum_algorithm_name, on_chunk: nil)
digester_for_checksum_algorithm = digester_for_checksum_algorithm!(checksum_algorithm_name)
bytes_read = 0
memory_monitoring_counter = 0
chunk_counter = 0

obj = S3_CLIENT.get_object({ bucket: bucket_name, key: object_path }) do |chunk, _headers|
digester_for_checksum_algorithm.update(chunk)
bytes_read += chunk.bytesize

memory_monitoring_counter += 1
collect_and_print_memory_stats(bytes_read) if print_memory_stats && (memory_monitoring_counter % 100).zero?
chunk_counter += 1
on_chunk&.call(chunk, bytes_read, chunk_counter)
end

# The bytes_read sum should equal the AWS-reported obj.content_length,
Expand All @@ -45,9 +52,4 @@ def self.verify_read_byte_count!(bytes_read, expected_total_byte_count)
raise CheckPlease::Exceptions::ReportedFileSizeMismatchError,
"S3 reported an object size of #{expected_total_byte_count} bytes, but we only received #{bytes_read} bytes"
end

def self.collect_and_print_memory_stats(bytes_read)
pid, size = `ps ax -o pid,rss | grep -E "^[[:space:]]*#{$PROCESS_ID}"`.strip.split.map(&:to_i)
puts "Read: #{bytes_read / 1.megabyte} MB. Memory usage for pid #{pid}: #{size.to_f / 1.kilobyte} MB." # rubocop:disable Rails/Output
end
end
1 change: 1 addition & 0 deletions lib/check_please/queues.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# frozen_string_literal: true

module CheckPlease::Queues
CHECK_FIXITY = 'check_fixity'
end
1 change: 1 addition & 0 deletions lib/tasks/check_please/rubocop.rake
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ if ['development', 'test'].include?(Rails.env)
'Layout/SpaceAroundKeyword',
'Layout/SpaceAroundOperators',
'Layout/SpaceBeforeBlockBraces',
'Layout/SpaceBeforeFirstArg',
'Layout/SpaceInsideArrayLiteralBrackets',
'Layout/SpaceInsideBlockBraces',
'Layout/SpaceInsideHashLiteralBraces',
Expand Down
12 changes: 10 additions & 2 deletions lib/tasks/check_please/verification.rake
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
BUFFER_SIZE = 5.megabytes

memory_stat_lambda = lambda { |_chunk, bytes_read, chunk_counter|
return unless (chunk_counter % 100).zero?
pid, size = `ps ax -o pid,rss | grep -E "^[[:space:]]*#{$PROCESS_ID}"`.strip.split.map(&:to_i)
puts "Read: #{bytes_read / 1.megabyte} MB. Memory usage for pid #{pid}: #{size.to_f / 1.kilobyte} MB." # rubocop:disable Rails/Output
}

namespace :check_please do
namespace :verification do
desc 'Verify the checksum for the file at the given bucket_name and object_path'
Expand All @@ -9,12 +15,14 @@ namespace :check_please do
checksum_algorithm_name = ENV['checksum_algorithm_name']
print_memory_stats = ENV['print_memory_stats'] == 'true'

checksum, object_size = CheckPlease::Aws::ObjectFixityVerifier.verify(
memory_monitoring_counter = 0
checksum, object_size = CheckPlease::Aws::ObjectFixityChecker.check(
bucket_name,
object_path,
checksum_algorithm_name,
print_memory_stats: print_memory_stats
on_chunk: print_memory_stats ? memory_stat_lambda : nil
)

puts "#{bucket_name}: #{object_path}"
puts "#{checksum_algorithm_name} checksum is: #{checksum}"
puts "object_size is: #{object_size}"
Expand Down
24 changes: 24 additions & 0 deletions spec/channels/application_cable/connection_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe ApplicationCable::Connection, type: :channel do
let(:uuid_regex) { /[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/ }
let(:invalid_authorization_header_value) { "Bearer: invalid-#{CHECK_PLEASE['remote_request_api_key']}" }
let(:valid_authorization_header_value) { "Bearer: #{CHECK_PLEASE['remote_request_api_key']}" }

it 'rejects a connection when no authorization header is given' do
expect { connect '/cable' }.to have_rejected_connection
end

it 'rejects a connection when an invalid authorization header value is given' do
expect {
connect '/cable', headers: { 'Authorization' => invalid_authorization_header_value }
}.to have_rejected_connection
end

it "successfully connects and assigns a uuid value to the connection's uuid field" do
connect '/cable', headers: { 'Authorization' => valid_authorization_header_value }
expect(connection.uuid).to match(uuid_regex)
end
end
Loading

0 comments on commit 8fee850

Please sign in to comment.