diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 2b431e5..89c07b5 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -117,41 +117,6 @@ def convert_subscriber_prefetch(options) def resolve_subscriber_routing_keys(channel, operation); end - # def register_subscription(subscriber_klass, bindings) - # consumer_proxy = consumer_proxy_for(bindings) - - # consumer_proxy.on_delivery do |delivery_info, metadata, payload| - # on_receive_message( - # subscriber_klass, - # delivery_info, - # metadata, - # payload - # ) - # end - - # subscribe_consumer(consumer_proxy) - # end - - # def subscribe_consumer(consumer_proxy) - # @subject.subscribe_with(consumer_proxy) - # @consumers.push(consumer_proxy) - # end - - # def consumer_proxy_for(bindings) - # operation_bindings = convert_to_consumer_options(bindings[:amqp]) - - # logger.debug 'consumer proxy options:' - # logger.debug operation_bindings.inspect - - # BunnyConsumerProxy.new( - # @subject.channel, - # @subject, - # '', - # operation_bindings[:no_ack], - # operation_bindings[:exclusive] - # ) - # end - def on_receive_message( subscriber_klass, delivery_info, @@ -164,17 +129,7 @@ def on_receive_message( logger.debug metadata.inspect logger.debug payload.inspect - if delivery_info.routing_key - routing_key = [app_name, delivery_info.routing_key].join(delimiter) - executable = subscriber_klass.executable_for(routing_key) - end - - unless executable - routing_key = [app_name, exchange_name].join(delimiter) - executable = subscriber_klass.executable_for(routing_key) - end - - logger.debug "routing_key: #{routing_key}" + executable = find_executable(subscriber_klass, delivery_info) return unless executable subscriber = subscriber_klass.new @@ -196,6 +151,13 @@ def on_receive_message( subscriber = nil end + def find_executable(subscriber_klass, delivery_info) + subscriber_suffix = subscriber_klass_name_to_suffix(subscriber_klass) + + find_executable_for_routing_key(subscriber_klass, delivery_info, subscriber_suffix) || + find_default_executable(subscriber_klass, subscriber_suffix) + end + def respond_to_missing?(name, include_private); end # Forward all missing method calls to the Bunny::Queue instance @@ -205,6 +167,22 @@ def method_missing(name, *args) private + def subscriber_klass_name_to_suffix(subscriber_klass) + subscriber_klass.name.downcase.gsub("::", '_') + end + + def find_executable_for_routing_key(subscriber_klass, delivery_info, subscriber_suffix) + return unless delivery_info.routing_key + + routing_key = [app_name, delivery_info.routing_key].join(delimiter) + subscriber_klass.executable_for(routing_key + "_#{subscriber_suffix}") + end + + def find_default_executable(subscriber_klass, subscriber_suffix) + default_routing_key = [app_name, exchange_name].join(delimiter) + subscriber_klass.executable_for(default_routing_key + "_#{subscriber_suffix}") + end + def delimiter EventSource.delimiter(:amqp) end diff --git a/lib/event_source/subscriber.rb b/lib/event_source/subscriber.rb index d3b1cd0..d62257a 100644 --- a/lib/event_source/subscriber.rb +++ b/lib/event_source/subscriber.rb @@ -100,11 +100,15 @@ def subscribe(subscription_name, &block) unless formatted_publisher_key.gsub(delimiter, '_') == identifier unique_key_elements.push(identifier) end - logger.debug "Subscriber#susbcribe Unique_key #{unique_key_elements.join(delimiter)}" + return unless block_given? + subscriber_suffix = self.name.downcase.gsub('::', '_') + subscriber_unique_key = unique_key_elements.join(delimiter) + "_#{subscriber_suffix}" + logger.debug "Subscriber#susbcribe Unique key #{subscriber_unique_key}" + EventSource::Subscriber.executable_container[ - unique_key_elements.join(delimiter) + subscriber_unique_key ] = block end diff --git a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb index 7f4aeab..a1758b3 100644 --- a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb @@ -168,4 +168,157 @@ # end # end end + + context "executable lookup with subscriber suffix" do + let(:connection_manager) { EventSource::ConnectionManager.instance } + let!(:connection) { connection_manager.add_connection(my_server) } + + let(:event_log_subscriber) do + Pathname.pwd.join( + "spec", + "rails_app", + "app", + "event_source", + "subscribers", + "event_log_subscriber.rb" + ) + end + + let(:enterprise_subscriber) do + Pathname.pwd.join( + "spec", + "rails_app", + "app", + "event_source", + "subscribers", + "enterprise_subscriber.rb" + ) + end + + let(:publish_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call( + path: + Pathname.pwd.join( + "spec", + "support", + "asyncapi", + "amqp_audit_log_publish.yml" + ) + ) + .success + end + + let(:subscribe_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call( + path: + Pathname.pwd.join( + "spec", + "support", + "asyncapi", + "amqp_audit_log_subscribe.yml" + ) + ) + .success + end + + let(:subscribe_two_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call( + path: + Pathname.pwd.join( + "spec", + "support", + "asyncapi", + "amqp_enterprise_subscribe.yml" + ) + ) + .success + end + + let(:publish_channel) do + connection.add_channel( + "enroll.audit_log.events.created", + publish_resource.channels.first + ) + end + let(:subscribe_channel) do + connection.add_channel( + "on_enroll.enroll.audit_log.events", + subscribe_resource.channels.first + ) + end + let(:subscribe_two_channel) do + connection.add_channel( + "on_enroll.enroll.enterprise.events", + subscribe_two_resource.channels.first + ) + end + + let(:load_subscribers) do + [event_log_subscriber, enterprise_subscriber].each do |file| + require file.to_s + end + end + + before do + allow(EventSource).to receive(:app_name).and_return("enroll") + connection.start unless connection.active? + publish_channel + subscribe_channel + subscribe_two_channel + load_subscribers + allow(subject).to receive(:exchange_name) { exchange_name } + end + + let(:audit_log_proc) do + EventSource::Subscriber.executable_container[ + "enroll.enroll.audit_log.events_subscribers_eventlogsubscriber" + ] + end + + let(:enterprise_advance_day_proc) do + EventSource::Subscriber.executable_container[ + "enroll.enroll.enterprise.events.date_advanced_subscribers_enterprisesubscriber" + ] + end + + context "when routing key based executable is not found" do + let(:delivery_info) do + double(routing_key: "enroll.enterprise.events.date_advanced") + end + + let(:exchange_name) { "enroll.audit_log.events" } + + it "should return default audit log proc" do + executable = + subject.find_executable( + Subscribers::EventLogSubscriber, + delivery_info + ) + expect(executable).to match(audit_log_proc) + end + end + + context "when routing key based executable is found" do + let(:delivery_info) do + double(routing_key: "enroll.enterprise.events.date_advanced") + end + + let(:exchange_name) { "enroll.enterprise.events" } + + it "should return executable for the routing key" do + executable = + subject.find_executable( + Subscribers::EnterpriseSubscriber, + delivery_info + ) + expect(executable).to match(enterprise_advance_day_proc) + end + end + end end diff --git a/spec/rails_app/app/event_source/subscribers/enterprise_subscriber.rb b/spec/rails_app/app/event_source/subscribers/enterprise_subscriber.rb new file mode 100644 index 0000000..c36b638 --- /dev/null +++ b/spec/rails_app/app/event_source/subscribers/enterprise_subscriber.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Subscribers + # Subscriber will receive Enterprise requests like date change + class EnterpriseSubscriber + include ::EventSource::Subscriber[amqp: "enroll.enterprise.events"] + + subscribe(:on_date_advanced) do |delivery_info, metadata, response| + logger.info "-" * 100 unless Rails.env.test? + logger.info "EnterpriseSubscriber#on_date_advanced, response: #{response}" + + ack(delivery_info.delivery_tag) + rescue StandardError, SystemStackError => e + ack(delivery_info.delivery_tag) + end + + subscribe( + :on_enroll_enterprise_events + ) do |delivery_info, _metadata, response| + logger.info "-" * 100 unless Rails.env.test? + logger.info "EnterpriseSubscriber#on_enroll_enterprise_events, response: #{response}" + + ack(delivery_info.delivery_tag) + rescue StandardError, SystemStackError => e + ack(delivery_info.delivery_tag) + end + end +end diff --git a/spec/rails_app/app/event_source/subscribers/event_log_subscriber.rb b/spec/rails_app/app/event_source/subscribers/event_log_subscriber.rb new file mode 100644 index 0000000..9dea894 --- /dev/null +++ b/spec/rails_app/app/event_source/subscribers/event_log_subscriber.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Subscribers + # Subscriber will receive Audit Log events + class EventLogSubscriber + include EventSource::Logging + include ::EventSource::Subscriber[amqp: "enroll.audit_log.events"] + + subscribe( + :on_enroll_audit_log_events + ) do |delivery_info, metadata, response| + logger.info "-" * 100 unless Rails.env.test? + + subscriber_logger.info "EventLogEventsSubscriber#on_enroll_audit_log_events, response: #{response}" + + ack(delivery_info.delivery_tag) + rescue StandardError, SystemStackError => e + ack(delivery_info.delivery_tag) + end + + private + + def subscriber_logger + @subscriber_logger ||= + Logger.new("#{Rails.root}/log/on_enroll_audit_log_events.log") + end + end +end diff --git a/spec/support/asyncapi/amqp_enterprise_subscribe.yml b/spec/support/asyncapi/amqp_enterprise_subscribe.yml new file mode 100644 index 0000000..1842ed0 --- /dev/null +++ b/spec/support/asyncapi/amqp_enterprise_subscribe.yml @@ -0,0 +1,55 @@ +--- +asyncapi: 2.0.0 +info: + title: Enroll App + version: 0.1.0 + description: AMQP Subsribe configuration for the Enroll App services + contact: + name: IdeaCrew + url: https://ideacrew.com + email: info@ideacrew.com + license: + name: MIT + url: https://opensource.org/licenses/MIT + +servers: + production: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Production Server + development: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Test Server + test: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Test Server +channels: + on_enroll.enroll.enterprise.events: + bindings: + amqp: + is: queue + queue: + name: on_enroll.enroll.enterprise.events + durable: true + exclusive: false + auto_delete: false + vhost: "/" + subscribe: + bindings: + amqp: + ack: true + exclusive: false + routing_key: enroll.# + prefetch: 1 + bindingVersion: "0.2.0" + operationId: on_enroll.enroll.enterprise.events + description: Events - for system wide changes + +tags: + - name: linter_tag + description: placeholder that satisfies the linter \ No newline at end of file