Skip to content

Commit

Permalink
Use proper threading to encourage work completion of AMQP subscribers.
Browse files Browse the repository at this point in the history
  • Loading branch information
TreyE committed Sep 13, 2023
1 parent 47b1942 commit 0016215
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
10 changes: 10 additions & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
require 'event_source/version'
require 'event_source/ruby_versions'
require 'event_source/error'
require 'event_source/threaded'
require 'event_source/inflector'
require 'event_source/logging'
require 'event_source/uris/uri'
Expand Down Expand Up @@ -63,6 +64,7 @@ def configure
end

def initialize!
boot_threading!
load_protocols
create_connections
load_async_api_resources
Expand All @@ -84,6 +86,14 @@ def build_async_api_resource(resource)
.call(resource)
.success
end

def boot_threading!
@threaded = EventSource::Threaded.new
end

def threaded
@threaded
end
end

class EventSourceLogger
Expand Down
3 changes: 2 additions & 1 deletion lib/event_source/protocols/amqp/bunny_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,12 @@ def on_receive_message(
payload,
&executable
)

EventSource.threaded.amqp_consumer_lock.mon_enter
subscription_handler.run
rescue Bunny::Exception => e
logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}"
ensure
EventSource.threaded.amqp_consumer_lock.mon_exit
subscriber = nil
end

Expand Down
14 changes: 14 additions & 0 deletions lib/event_source/threaded.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module EventSource
# Manages concurrent resource access in a threaded environment.
class Threaded

attr_reader :amqp_consumer_lock, :worker_lock

def initialize
@amqp_consumer_lock = ::Monitor.new
@worker_lock = ::Monitor.new
end
end
end

0 comments on commit 0016215

Please sign in to comment.