Skip to content

Commit 6700ed3

Browse files
committed
Add Action Cable implementation, FixityCheckChannel, and tests
1 parent 7472f38 commit 6700ed3

22 files changed

+469
-23
lines changed

.rubocop.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ Lint/MissingCopEnableDirective:
2222

2323
Metrics/MethodLength:
2424
Exclude:
25-
- lib/check_please/aws/object_fixity_verifier.rb
25+
- lib/check_please/aws/object_fixity_checker.rb
2626

2727
RSpec/VerifiedDoubles:
2828
Exclude:
29-
- spec/check_please/aws/object_fixity_verifier_spec.rb
29+
- spec/check_please/aws/object_fixity_checker_spec.rb

Gemfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ gem 'best_type', '~> 1.0'
1616
gem 'bootsnap', require: false
1717
# Add CRC32C support to the Ruby Digest module
1818
gem 'digest-crc', '~> 0.6.5'
19+
# Client library for connecting to a websocket endpoint
20+
gem 'faye-websocket', '~> 0.11.3'
1921
# Google Cloud Storage SDK
2022
gem 'google-cloud-storage', '~> 1.49'
2123
# Use JavaScript with ESM import maps [https://github.com/rails/importmap-rails]

Gemfile.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ GEM
164164
drb (2.2.1)
165165
ed25519 (1.3.0)
166166
erubi (1.12.0)
167+
eventmachine (1.2.7)
167168
factory_bot (6.4.6)
168169
activesupport (>= 5.0.0)
169170
factory_bot_rails (6.4.3)
@@ -173,6 +174,9 @@ GEM
173174
faraday-net_http (>= 2.0, < 3.2)
174175
faraday-net_http (3.1.0)
175176
net-http
177+
faye-websocket (0.11.3)
178+
eventmachine (>= 0.12.0)
179+
websocket-driver (>= 0.5.1)
176180
ffi (1.16.3)
177181
globalid (1.2.1)
178182
activesupport (>= 6.1)
@@ -518,6 +522,7 @@ DEPENDENCIES
518522
devise
519523
digest-crc (~> 0.6.5)
520524
factory_bot_rails
525+
faye-websocket (~> 0.11.3)
521526
google-cloud-storage (~> 1.49)
522527
importmap-rails
523528
jbuilder

app/channels/application_cable/connection.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,19 @@
22

33
module ApplicationCable
44
class Connection < ActionCable::Connection::Base
5+
identified_by :uuid
6+
7+
def connect
8+
authenticate! # reject connections that do not successfully authenticate
9+
self.uuid = SecureRandom.uuid # assign a random uuid value when a user connects
10+
end
11+
12+
private
13+
14+
def authenticate!
15+
return if request.authorization&.split(' ')&.at(1) == CHECK_PLEASE['remote_request_api_key']
16+
17+
reject_unauthorized_connection
18+
end
519
end
620
end

app/channels/fixity_check_channel.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# frozen_string_literal: true
2+
3+
class FixityCheckChannel < ApplicationCable::Channel
4+
FIXITY_CHECK_STREAM_PREFIX = "#{CHECK_PLEASE['action_cable_stream_prefix']}fixity_check:".freeze
5+
6+
# A websocket client subscribes by sending this message:
7+
# {
8+
# "command" => "subscribe",
9+
# "identifier" => { "channel" => "FixityCheckChannel", "job_identifier" => "cool-job-id1" }.to_json
10+
# }
11+
def subscribed
12+
return if params[:job_identifier].blank?
13+
14+
stream_name = "#{FIXITY_CHECK_STREAM_PREFIX}#{params[:job_identifier]}"
15+
Rails.logger.debug "A client has started streaming from: #{stream_name}"
16+
stream_from stream_name
17+
end
18+
19+
def unsubscribed
20+
# Any cleanup needed when channel is unsubscribed
21+
return if params[:job_identifier].blank?
22+
23+
stream_name = "#{FIXITY_CHECK_STREAM_PREFIX}#{params[:job_identifier]}"
24+
Rails.logger.debug "A client has stopped streaming from: #{stream_name}"
25+
stop_stream_from stream_name
26+
end
27+
28+
# A websocket client runs this command by sending this message:
29+
# {
30+
# "command" => "run_fixity_check_for_s3_object",
31+
# "identifier" => { "channel" => "FixityCheckChannel", "job_identifier" => "cool-job-id1" }.to_json,
32+
# "data" => {
33+
# "action" => "run_fixity_check_for_s3_object", "bucket_name" => "some-bucket",
34+
# "object_path" => "path/to/object.png", "checksum_algorithm_name" => "sha256"
35+
# }.to_json
36+
# }
37+
def run_fixity_check_for_s3_object(data)
38+
Rails.logger.debug("run_fixity_check_for_s3_object action received with job_identifier: #{params[:job_identifier]}")
39+
job_identifier = params[:job_identifier]
40+
bucket_name = data['bucket_name']
41+
object_path = data['object_path']
42+
checksum_algorithm_name = data['checksum_algorithm_name']
43+
44+
AwsCheckFixityJob.perform_later(job_identifier, bucket_name, object_path, checksum_algorithm_name)
45+
end
46+
end

app/jobs/aws_check_fixity_job.rb

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# frozen_string_literal: true
2+
3+
# rubocop:disable Metrics/MethodLength
4+
5+
class AwsCheckFixityJob < ApplicationJob
6+
queue_as CheckPlease::Queues::CHECK_FIXITY
7+
8+
def perform(job_identifier, bucket_name, object_path, checksum_algorithm_name)
9+
response_stream_name = "#{FixityCheckChannel::FIXITY_CHECK_STREAM_PREFIX}#{job_identifier}"
10+
progress_report_lambda = lambda { |_chunk, _bytes_read, chunk_counter|
11+
return unless (chunk_counter % 100).zero?
12+
13+
# TODO: Broadcast a message to indicate that the processing is still happening.
14+
# This way, clients will know if a job has stalled and will not wait indefinitely for results.
15+
ActionCable.server.broadcast(
16+
response_stream_name,
17+
{ type: 'fixity_check_in_progress' }.to_json
18+
)
19+
}
20+
21+
checksum_hexdigest, object_size = CheckPlease::Aws::ObjectFixityChecker.check(
22+
bucket_name,
23+
object_path,
24+
checksum_algorithm_name,
25+
on_chunk: progress_report_lambda
26+
)
27+
28+
# Broadcast message when job is complete
29+
broadcast_fixity_check_complete(
30+
response_stream_name, bucket_name, object_path, checksum_algorithm_name, checksum_hexdigest, object_size
31+
)
32+
rescue StandardError => e
33+
broadcast_fixity_check_error(response_stream_name, e.message, bucket_name, object_path, checksum_algorithm_name)
34+
end
35+
36+
def broadcast_fixity_check_complete(
37+
response_stream_name, bucket_name, object_path, checksum_algorithm_name, checksum_hexdigest, object_size
38+
)
39+
ActionCable.server.broadcast(
40+
response_stream_name,
41+
{
42+
type: 'fixity_check_complete',
43+
data: {
44+
bucket_name: bucket_name, object_path: object_path,
45+
checksum_algorithm_name: checksum_algorithm_name,
46+
checksum_hexdigest: checksum_hexdigest, object_size: object_size
47+
}
48+
}.to_json
49+
)
50+
end
51+
52+
def broadcast_fixity_check_error(
53+
response_stream_name, error_message, bucket_name, object_path, checksum_algorithm_name
54+
)
55+
ActionCable.server.broadcast(
56+
response_stream_name,
57+
{
58+
type: 'fixity_check_error',
59+
data: {
60+
error_message: error_message, bucket_name: bucket_name,
61+
object_path: object_path, checksum_algorithm_name: checksum_algorithm_name
62+
}
63+
}.to_json
64+
)
65+
end
66+
end

config/cable.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
development:
2+
# adapter: async
23
adapter: redis
34
url: redis://localhost:6379/1
45

config/environments/deployed.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
# config.action_cable.mount_path = nil
4545
# config.action_cable.url = 'wss://example.com/cable'
4646
# config.action_cable.allowed_request_origins = [ 'http://example.com', /http:\/\/example.*/ ]
47+
# Allow Action Cable access from any origin.
48+
config.action_cable.disable_request_forgery_protection = true
4749

4850
# Force all access to the app over SSL, use Strict-Transport-Security, and use secure cookies.
4951
# config.force_ssl = true

config/environments/development.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
# config.action_view.annotate_rendered_view_with_filenames = true
7272

7373
# Uncomment if you wish to allow Action Cable access from any origin.
74-
# config.action_cable.disable_request_forgery_protection = true
74+
config.action_cable.disable_request_forgery_protection = true
7575

7676
# Raise error when a before_action's only/except options reference missing actions
7777
config.action_controller.raise_on_missing_callback_actions = true

config/environments/test.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,8 @@
6363

6464
# Raise error when a before_action's only/except options reference missing actions
6565
config.action_controller.raise_on_missing_callback_actions = true
66+
67+
# Allow Action Cable access from any origin (so that it works in Capybara tests)
68+
config.action_cable.disable_request_forgery_protection = true
69+
# config.action_cable.allowed_request_origins = ['https://rubyonrails.com', %r{http://ruby.*}]
6670
end

config/routes.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,7 @@
2525

2626
# Defines the root path route ("/")
2727
root 'pages#home'
28+
29+
# Mount ActionCable Websocket route
30+
mount ActionCable.server => '/cable'
2831
end
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
development:
22
run_queued_jobs_inline: true
3+
remote_request_api_key: changethis
4+
action_cable_stream_prefix: '<%= "#{Rails.application.class.module_parent_name}:#{Rails.env}:" %>'
5+
36
test:
4-
run_queued_jobs_inline: true
7+
run_queued_jobs_inline: true
8+
remote_request_api_key: changethis
9+
action_cable_stream_prefix: '<%= "#{Rails.application.class.module_parent_name}:#{Rails.env}:" %>'

lib/check_please/aws/object_fixity_verifier.rb renamed to lib/check_please/aws/object_fixity_checker.rb

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# frozen_string_literal: true
22

3-
module CheckPlease::Aws::ObjectFixityVerifier
3+
module CheckPlease::Aws::ObjectFixityChecker
44
def self.digester_for_checksum_algorithm!(checksum_algorithm_name)
55
case checksum_algorithm_name
66
when 'sha256'
@@ -16,17 +16,24 @@ def self.digester_for_checksum_algorithm!(checksum_algorithm_name)
1616
end
1717
end
1818

19-
def self.verify(bucket_name, object_path, checksum_algorithm_name, print_memory_stats: false)
19+
# Checks the specified object and returns
20+
# @param bucket_name [String] The name of the S3 bucket
21+
# @param object_path [String] The object path in the S3 bucket
22+
# @param checksum_algorithm_name [String] A checksum algorithm name.
23+
# Allowed values include: sha256, sha512, md5, crc32c
24+
# @param on_chunk [lambda] A lambda that is called once per data chunk read, during the fixity check.
25+
# @return [Array] An with two elements, the first being a hex digest of the object's bytes and the second
26+
# being the object size in bytes.
27+
def self.check(bucket_name, object_path, checksum_algorithm_name, on_chunk: nil)
2028
digester_for_checksum_algorithm = digester_for_checksum_algorithm!(checksum_algorithm_name)
2129
bytes_read = 0
22-
memory_monitoring_counter = 0
30+
chunk_counter = 0
2331

2432
obj = S3_CLIENT.get_object({ bucket: bucket_name, key: object_path }) do |chunk, _headers|
2533
digester_for_checksum_algorithm.update(chunk)
2634
bytes_read += chunk.bytesize
27-
28-
memory_monitoring_counter += 1
29-
collect_and_print_memory_stats(bytes_read) if print_memory_stats && (memory_monitoring_counter % 100).zero?
35+
chunk_counter += 1
36+
on_chunk&.call(chunk, bytes_read, chunk_counter)
3037
end
3138

3239
# The bytes_read sum should equal the AWS-reported obj.content_length,
@@ -45,9 +52,4 @@ def self.verify_read_byte_count!(bytes_read, expected_total_byte_count)
4552
raise CheckPlease::Exceptions::ReportedFileSizeMismatchError,
4653
"S3 reported an object size of #{expected_total_byte_count} bytes, but we only received #{bytes_read} bytes"
4754
end
48-
49-
def self.collect_and_print_memory_stats(bytes_read)
50-
pid, size = `ps ax -o pid,rss | grep -E "^[[:space:]]*#{$PROCESS_ID}"`.strip.split.map(&:to_i)
51-
puts "Read: #{bytes_read / 1.megabyte} MB. Memory usage for pid #{pid}: #{size.to_f / 1.kilobyte} MB." # rubocop:disable Rails/Output
52-
end
5355
end

lib/check_please/queues.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# frozen_string_literal: true
22

33
module CheckPlease::Queues
4+
CHECK_FIXITY = 'check_fixity'
45
end

lib/tasks/check_please/rubocop.rake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ if ['development', 'test'].include?(Rails.env)
1919
'Layout/SpaceAroundKeyword',
2020
'Layout/SpaceAroundOperators',
2121
'Layout/SpaceBeforeBlockBraces',
22+
'Layout/SpaceBeforeFirstArg',
2223
'Layout/SpaceInsideArrayLiteralBrackets',
2324
'Layout/SpaceInsideBlockBraces',
2425
'Layout/SpaceInsideHashLiteralBraces',

lib/tasks/check_please/verification.rake

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
BUFFER_SIZE = 5.megabytes
22

3+
memory_stat_lambda = lambda { |_chunk, bytes_read, chunk_counter|
4+
return unless (chunk_counter % 100).zero?
5+
pid, size = `ps ax -o pid,rss | grep -E "^[[:space:]]*#{$PROCESS_ID}"`.strip.split.map(&:to_i)
6+
puts "Read: #{bytes_read / 1.megabyte} MB. Memory usage for pid #{pid}: #{size.to_f / 1.kilobyte} MB." # rubocop:disable Rails/Output
7+
}
8+
39
namespace :check_please do
410
namespace :verification do
511
desc 'Verify the checksum for the file at the given bucket_name and object_path'
@@ -9,12 +15,14 @@ namespace :check_please do
915
checksum_algorithm_name = ENV['checksum_algorithm_name']
1016
print_memory_stats = ENV['print_memory_stats'] == 'true'
1117

12-
checksum, object_size = CheckPlease::Aws::ObjectFixityVerifier.verify(
18+
memory_monitoring_counter = 0
19+
checksum, object_size = CheckPlease::Aws::ObjectFixityChecker.check(
1320
bucket_name,
1421
object_path,
1522
checksum_algorithm_name,
16-
print_memory_stats: print_memory_stats
23+
on_chunk: print_memory_stats ? memory_stat_lambda : nil
1724
)
25+
1826
puts "#{bucket_name}: #{object_path}"
1927
puts "#{checksum_algorithm_name} checksum is: #{checksum}"
2028
puts "object_size is: #{object_size}"
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# frozen_string_literal: true
2+
3+
require 'rails_helper'
4+
5+
RSpec.describe ApplicationCable::Connection, type: :channel do
6+
let(:uuid_regex) { /[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/ }
7+
let(:invalid_authorization_header_value) { "Bearer: invalid-#{CHECK_PLEASE['remote_request_api_key']}" }
8+
let(:valid_authorization_header_value) { "Bearer: #{CHECK_PLEASE['remote_request_api_key']}" }
9+
10+
it 'rejects a connection when no authorization header is given' do
11+
expect { connect '/cable' }.to have_rejected_connection
12+
end
13+
14+
it 'rejects a connection when an invalid authorization header value is given' do
15+
expect {
16+
connect '/cable', headers: { 'Authorization' => invalid_authorization_header_value }
17+
}.to have_rejected_connection
18+
end
19+
20+
it "successfully connects and assigns a uuid value to the connection's uuid field" do
21+
connect '/cable', headers: { 'Authorization' => valid_authorization_header_value }
22+
expect(connection.uuid).to match(uuid_regex)
23+
end
24+
end

0 commit comments

Comments
 (0)