From 6b4cf6b66356042eb32d8a799f19771b7c5eab9c Mon Sep 17 00:00:00 2001 From: Sai Kumar Kotagiri Date: Tue, 7 Nov 2023 11:40:05 -0500 Subject: [PATCH 1/8] adds ruby 2.6 series testing to the testing matrix (#97) * adds ruby 2.6 series testing to the testing matrix * Set nokogiri requirement to the Enroll version. * try to load different versions of nokogiri based on Ruby version * removes nokogiri from Gemfile.lock * reverts unwanted changes * relaxes dependency versions --------- Co-authored-by: Trey Evans --- .github/workflows/rspec.yml | 2 +- Gemfile.lock | 7 ++----- event_source.gemspec | 4 ++-- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/.github/workflows/rspec.yml b/.github/workflows/rspec.yml index c8580a1..198903e 100644 --- a/.github/workflows/rspec.yml +++ b/.github/workflows/rspec.yml @@ -6,7 +6,7 @@ jobs: strategy: fail-fast: false matrix: - ruby_version: ["2.7.5", "3.0.5", "3.1.4", "3.2.2"] + ruby_version: ['2.6.3', '2.7.5', '3.0.5', '3.1.4', '3.2.2'] runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 diff --git a/Gemfile.lock b/Gemfile.lock index 822a51c..e254695 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -5,12 +5,12 @@ PATH addressable (>= 2.8.0) bunny (>= 2.14) deep_merge (~> 1.2.0) - dry-configurable (= 0.12.1) + dry-configurable (~> 0.12) dry-events (~> 0.3) dry-inflector (~> 0.2) dry-initializer (~> 3.0) dry-monads (~> 1.3) - dry-schema (= 1.6.2) + dry-schema (~> 1.6) dry-struct (~> 1.4) dry-types (~> 1.5) dry-validation (~> 1.6) @@ -213,9 +213,6 @@ GEM mustermann (1.1.1) ruby2_keywords (~> 0.0.1) nio4r (2.5.8) - nokogiri (1.14.2) - mini_portile2 (~> 2.8.0) - racc (~> 1.4) oj (3.13.9) ox (2.14.5) parallel (1.20.1) diff --git a/event_source.gemspec b/event_source.gemspec index 081a909..51b829e 100644 --- a/event_source.gemspec +++ b/event_source.gemspec @@ -38,7 +38,7 @@ Gem::Specification.new do |spec| spec.add_dependency 'addressable', '>= 2.8.0' spec.add_dependency 'bunny', '>= 2.14' spec.add_dependency 'deep_merge', '~> 1.2.0' - spec.add_dependency 'dry-configurable', '0.12.1' + spec.add_dependency 'dry-configurable', '~> 0.12' spec.add_dependency 'dry-events', '~> 0.3' spec.add_dependency 'dry-inflector', '~> 0.2' spec.add_dependency 'dry-initializer', '~> 3.0' @@ -46,7 +46,7 @@ Gem::Specification.new do |spec| spec.add_dependency 'dry-struct', '~> 1.4' spec.add_dependency 'dry-types', '~> 1.5' spec.add_dependency 'dry-validation', '~> 1.6' - spec.add_dependency 'dry-schema', '1.6.2' + spec.add_dependency 'dry-schema', '~> 1.6' spec.add_dependency 'faraday', '~> 1.4.1' spec.add_dependency 'faraday_middleware', '~> 1.0' spec.add_dependency 'logging', '~> 2.3.0' From e2cb69b284df7f73aeff05ab967549765a037d4c Mon Sep 17 00:00:00 2001 From: vishal kalletla Date: Wed, 27 Dec 2023 13:48:21 -0600 Subject: [PATCH 2/8] removes unused gem resque-bus (#103) --- Gemfile.lock | 37 -------------- event_source.gemspec | 1 - lib/event_source/tasks/event_source.rake | 19 ------- .../adapters/parties/queue_bus_adapter.rb | 49 ------------------- .../config/initializers/event_source.rb | 5 +- 5 files changed, 1 insertion(+), 110 deletions(-) delete mode 100644 lib/event_source/tasks/event_source.rake delete mode 100644 spec/rails_app/app/event_source/adapters/parties/queue_bus_adapter.rb diff --git a/Gemfile.lock b/Gemfile.lock index e254695..996e4e1 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -21,7 +21,6 @@ PATH nokogiri (>= 1.13.0) oj (~> 3.11) ox (~> 2.14) - resque-bus (~> 0.7.0) typhoeus (~> 1.4.0) GEM @@ -154,8 +153,6 @@ GEM dry-initializer (~> 3.0) dry-schema (~> 1.5, >= 1.5.2) erubi (1.10.0) - et-orbi (1.2.6) - tzinfo ethon (0.15.0) ffi (>= 1.15.0) faker (2.18.0) @@ -176,9 +173,6 @@ GEM faraday_middleware (1.2.0) faraday (~> 1.0) ffi (1.15.4) - fugit (1.5.2) - et-orbi (~> 1.1, >= 1.1.8) - raabro (~> 1.4) globalid (0.5.2) activesupport (>= 5.0) hashdiff (1.0.1) @@ -207,7 +201,6 @@ GEM mongoid (7.3.0) activemodel (>= 5.1, < 6.2) mongo (>= 2.10.5, < 3.0.0) - mono_logger (1.1.1) multi_json (1.15.0) multipart-post (2.1.1) mustermann (1.1.1) @@ -225,10 +218,6 @@ GEM byebug (~> 11.0) pry (~> 0.10) public_suffix (4.0.6) - queue-bus (0.12.0) - multi_json - redis - raabro (1.4.0) racc (1.6.2) rack (2.2.3) rack-protection (2.1.0) @@ -264,29 +253,7 @@ GEM rainbow (3.0.0) rake (13.0.6) rbtree (0.4.6) - redis (4.5.1) - redis-namespace (1.8.1) - redis (>= 3.0.4) regexp_parser (2.1.1) - resque (1.27.4) - mono_logger (~> 1.0) - multi_json (~> 1.0) - redis-namespace (~> 1.3) - sinatra (>= 0.9.2) - vegas (~> 0.1.2) - resque-bus (0.7.0) - queue-bus (>= 0.7, < 1) - resque (>= 1.10.0, < 2.0) - resque-retry - resque-scheduler (>= 2.0.1) - resque-retry (1.7.6) - resque (>= 1.25, < 3.0) - resque-scheduler (~> 4.0) - resque-scheduler (4.5.0) - mono_logger (~> 1.0) - redis (>= 3.3) - resque (>= 1.27) - rufus-scheduler (~> 3.2, < 3.7) rexml (3.2.5) rspec-core (3.10.1) rspec-support (~> 3.10.0) @@ -318,8 +285,6 @@ GEM parser (>= 3.0.1.1) ruby-progressbar (1.11.0) ruby2_keywords (0.0.4) - rufus-scheduler (3.6.0) - fugit (~> 1.1, >= 1.1.6) set (1.0.2) sinatra (2.1.0) mustermann (~> 1.0) @@ -343,8 +308,6 @@ GEM tzinfo (2.0.4) concurrent-ruby (~> 1.0) unicode-display_width (2.0.0) - vegas (0.1.11) - rack (>= 1.0.0) webmock (3.13.0) addressable (>= 2.3.6) crack (>= 0.3.2) diff --git a/event_source.gemspec b/event_source.gemspec index 51b829e..f5bdc6a 100644 --- a/event_source.gemspec +++ b/event_source.gemspec @@ -54,7 +54,6 @@ Gem::Specification.new do |spec| spec.add_dependency 'mime-types' spec.add_dependency 'oj', '~> 3.11' spec.add_dependency 'ox', '~> 2.14' - spec.add_dependency 'resque-bus', '~> 0.7.0' spec.add_dependency 'typhoeus', '~> 1.4.0' # TODO: Change to development dependency diff --git a/lib/event_source/tasks/event_source.rake b/lib/event_source/tasks/event_source.rake deleted file mode 100644 index 750c0a6..0000000 --- a/lib/event_source/tasks/event_source.rake +++ /dev/null @@ -1,19 +0,0 @@ -# frozen_string_literal: true - -# require 'resque_bus/tasks' -# will give you these tasks - -require "queue_bus/tasks" -require "resque_bus/tasks" -require "resque/tasks" - -namespace :event_source do - desc "Subscribes this application to Event Source events" - task :subscribe => ["queuebus:preload", "queuebus:subscribe"] - - desc "Start a Event Source worker for subscription queues" - task :work => ["queuebus:preload", "queuebus:setup", "resque:work"] - - desc "Start a Event Source worker for incoming driver queue" - task :driver => ["queuebus:preload", "queuebus:driver", "resque:work"] -end \ No newline at end of file diff --git a/spec/rails_app/app/event_source/adapters/parties/queue_bus_adapter.rb b/spec/rails_app/app/event_source/adapters/parties/queue_bus_adapter.rb deleted file mode 100644 index 23d1903..0000000 --- a/spec/rails_app/app/event_source/adapters/parties/queue_bus_adapter.rb +++ /dev/null @@ -1,49 +0,0 @@ -# frozen_string_literal: true -# # frozen_string_literal: true - -# module Parties -# class QueueBusAdapter < EventSource::Adapter - -# def enabled! -# require 'resque-bus' - -# # event_source_root = Rails.root.join('app', 'event_source') - -# # publishers_dir = event_source_root.join('publishers') -# # Dir[publishers_dir.join('parties', '*.rb')].each {|file| require file } -# # EventSource::Publisher.register_publishers(publishers_dir) - -# # require 'active_support/notifications' -# # called the first time we know we are using this adapter -# # it would be a good spot to require the libraries you're using -# # and modify EventSource::Worker as needed -# # raise NotImplementedError -# end - -# def enqueue(event) -# QueueBus.publish(event.event_key, event.payload) -# end - -# def dequeue(queue, event_name, subscriber, block) -# method_name = "on_#{event_name.gsub('.', '_')}" - -# EeventSource.channel('faa').send(queue, event_name, block) -# # QueueBus::Subscriber.subscribe_queue(queue, method_name) - -# # publisher = publisher_by(queue) -# # puts "--------->>>> queue #{queue} event #{event_name} --- #{block.inspect}" - -# # if block.present? -# # publisher.subscribe(event_name) do |event| -# # block.call(event) -# # end -# # else -# # subscribe_listener(queue, subscriber) -# # end - -# # ActiveSupport::Notifications.subscribe(key) do |name, started, finished, unique_id, data| -# # block.call(data) -# # end -# end -# end -# end diff --git a/spec/rails_app/config/initializers/event_source.rb b/spec/rails_app/config/initializers/event_source.rb index 384187c..346d942 100644 --- a/spec/rails_app/config/initializers/event_source.rb +++ b/spec/rails_app/config/initializers/event_source.rb @@ -112,13 +112,10 @@ # { # url: ENV['RABBITMQ_SERVER'] # }, - # resque_bus: { - # protocol: :resque_bus - # } # ] # config.asyncapi_resources = AcaEntities::AsyncApi::Mitc - # config.asyncapi_resources = AcaEntities.find_resources_for(:enroll, %w[amqp resque_bus]) # will give you resouces in array of hashes form + # config.asyncapi_resources = AcaEntities.find_resources_for(:enroll, %w[amqp]) # will give you resouces in array of hashes form # AcaEntities::Operations::AsyncApi::FindResource.new.call(self) end From f71404eb41d256ff4fcb388afee9198569eedbb9 Mon Sep 17 00:00:00 2001 From: Sai Praveen Gudimetla Date: Wed, 27 Dec 2023 16:24:27 -0600 Subject: [PATCH 3/8] remove rake task reference in event_source railitie (#104) --- lib/event_source/railtie.rb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/event_source/railtie.rb b/lib/event_source/railtie.rb index 5994b70..ea97127 100644 --- a/lib/event_source/railtie.rb +++ b/lib/event_source/railtie.rb @@ -3,9 +3,6 @@ module EventSource # :nodoc: class Railtie < Rails::Railtie - - rake_tasks do - load 'event_source/tasks/event_source.rake' - end + end end \ No newline at end of file From e83d72ec5abecc1e6e9388782b65a9541d37d073 Mon Sep 17 00:00:00 2001 From: Raghu Ram Date: Wed, 3 Jan 2024 12:28:07 -0500 Subject: [PATCH 4/8] create message along with event (#100) * create message along with event * removed unused references * build message fixes * build async api message object with session * append event log attributes to headers * updates for message options and session include --- Gemfile.lock | 2 +- lib/event_source.rb | 5 + lib/event_source/async_api/message.rb | 20 +-- lib/event_source/error.rb | 1 + lib/event_source/event.rb | 13 +- lib/event_source/message.rb | 37 +++++ lib/event_source/operations/build_message.rb | 31 ++++ .../operations/build_message_options.rb | 55 +++++++ lib/event_source/operations/create_message.rb | 33 +++++ lib/event_source/operations/fetch_session.rb | 47 ++++++ .../protocols/amqp/bunny_exchange_proxy.rb | 1 + lib/event_source/publisher.rb | 5 +- lib/event_source/version.rb | 2 +- spec/event_source/event_spec.rb | 140 ++++++++++++------ spec/event_source/message_spec.rb | 84 +++++++++++ .../operations/build_message_options_spec.rb | 87 +++++++++++ .../operations/build_message_spec.rb | 48 ++++++ .../operations/fetch_session_spec.rb | 80 ++++++++++ 18 files changed, 621 insertions(+), 70 deletions(-) create mode 100644 lib/event_source/message.rb create mode 100644 lib/event_source/operations/build_message.rb create mode 100644 lib/event_source/operations/build_message_options.rb create mode 100644 lib/event_source/operations/create_message.rb create mode 100644 lib/event_source/operations/fetch_session.rb create mode 100644 spec/event_source/message_spec.rb create mode 100644 spec/event_source/operations/build_message_options_spec.rb create mode 100644 spec/event_source/operations/build_message_spec.rb create mode 100644 spec/event_source/operations/fetch_session_spec.rb diff --git a/Gemfile.lock b/Gemfile.lock index 996e4e1..4a70de8 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - event_source (0.5.7) + event_source (0.5.8) addressable (>= 2.8.0) bunny (>= 2.14) deep_merge (~> 1.2.0) diff --git a/lib/event_source.rb b/lib/event_source.rb index 6761531..0956d8c 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -31,11 +31,16 @@ require 'event_source/worker' require 'event_source/publish_operation' require 'event_source/subscribe_operation' +require 'event_source/message' require 'event_source/command' require 'event_source/publisher' require 'event_source/event' require 'event_source/subscriber' require 'event_source/operations/codec64' +require 'event_source/operations/create_message' +require 'event_source/operations/fetch_session' +require 'event_source/operations/build_message_options' +require 'event_source/operations/build_message' # Event source provides ability to compose, publish and subscribe to events module EventSource diff --git a/lib/event_source/async_api/message.rb b/lib/event_source/async_api/message.rb index dc28c38..ab3ffb8 100644 --- a/lib/event_source/async_api/message.rb +++ b/lib/event_source/async_api/message.rb @@ -20,30 +20,14 @@ class Message < Dry::Struct # @!attribute [r] headers # Schema definition of the application headers. Schema must be of type "object". # It must not define the protocol headers. - # @return [Schema] - attribute :headers, Schema.meta(omittable: true) + # @return [Types::Any] + attribute :headers, Types::Any.meta(omittable: true) # @!attribute [r] payload # Definition of the message payload. It can be of any type but defaults to Schema object # @return [Types::Any] attribute :payload, Types::Any.meta(omittable: true) - # @!attribute [r] correlation_id - # Definition of the correlation ID used for message tracing or matching - # @return [String] - attribute :correlation_id do - # @!attribute [r] description - # An optional description of the identifier. - # CommonMark syntax can be used for rich text representation - # @return [String] - attribute :description, Types::String.meta(omittable: true) - - # @!attribute [r] location - # Required. A runtime expression that specifies the location of the correlation ID - # @return [String] - attribute :location, Types::String.meta(omittable: true) - end.meta(omittable: true) - # @!attribute [r] schema_format # A string containing the name of the schema format used to define the message payload. # If omitted, implementations should parse the payload as a Schema object. Check out the diff --git a/lib/event_source/error.rb b/lib/event_source/error.rb index 01f75ef..ade7918 100644 --- a/lib/event_source/error.rb +++ b/lib/event_source/error.rb @@ -31,5 +31,6 @@ class Error < StandardError ConnectionNotFound = Class.new(Error) ServerConfigurationNotFound = Class.new(Error) ServerConfigurationInvalid = Class.new(Error) + MessageBuildError = Class.new(Error) end end diff --git a/lib/event_source/event.rb b/lib/event_source/event.rb index 02ae58b..fdc77ae 100644 --- a/lib/event_source/event.rb +++ b/lib/event_source/event.rb @@ -8,7 +8,7 @@ class Event # @attr_reader [Array] attribute_keys optional list of attributes that must be included in { Payload } # @attr_reader [String] publisher_path namespaced key indicating the class that registers event for publishing # @attr_reader [String] payload attribute/value pairs for the message that accompanies the event - attr_reader :attribute_keys, :publisher_path, :payload, :headers, :metadata + attr_reader :attribute_keys, :publisher_path, :payload, :headers, :metadata, :message HeaderDefaults = { version: '3.0', @@ -26,12 +26,22 @@ def initialize(options = {}) send(:headers=, options[:headers] || {}) @publisher_path = klass_var_for(:publisher_path) || nil + build_message(options) if headers.delete(:build_message) + if @publisher_path.eql?(nil) raise EventSource::Error::PublisherKeyMissing, "add 'publisher_path' to #{self.class.name}" end end + def build_message(options) + @message = EventSource::Message.new( + headers: options[:headers], + payload: options[:attributes], + event_name: name + ) + end + # Set payload # @overload payload=(payload) # @param [Hash] payload New payload @@ -127,6 +137,7 @@ def set_instance_variable_for(element, value) def validate_attribute_presence return unless attribute_keys.present? + gapped_keys = attribute_keys - payload.keys @event_errors = [] event_errors.push("missing required keys: #{gapped_keys}") unless gapped_keys.empty? diff --git a/lib/event_source/message.rb b/lib/event_source/message.rb new file mode 100644 index 0000000..4666bef --- /dev/null +++ b/lib/event_source/message.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require "forwardable" + +module EventSource + # Construct async api message object + class Message + extend Forwardable + + def initialize(options = {}) + message_options = build_message(options) + @message = create_message(message_options) + end + + def_delegators :@message, :payload, :headers + + private + + def build_message(options) + result = EventSource::Operations::BuildMessageOptions.new.call(options) + unless result.success? + raise EventSource::Error::MessageBuildError, + "unable to build message options due to #{result.failure}" + end + result.success + end + + def create_message(options) + result = EventSource::Operations::CreateMessage.new.call(options) + unless result.success? + raise EventSource::Error::MessageBuildError, + "unable to create message due to #{result.failure}" + end + result.success + end + end +end diff --git a/lib/event_source/operations/build_message.rb b/lib/event_source/operations/build_message.rb new file mode 100644 index 0000000..1a9f1cc --- /dev/null +++ b/lib/event_source/operations/build_message.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" + +module EventSource + module Operations + # create message + class BuildMessage + include Dry::Monads[:result, :do] + + def call(params) + values = yield build_options(params) + message = yield create_message(values) + + Success(message) + end + + private + + def build_options(params) + result = BuildMessageOptions.new.call(params) + result.success? ? result : Failure(result.errors.to_h) + end + + def create_message(values) + CreateMessage.new.call(values) + end + end + end +end diff --git a/lib/event_source/operations/build_message_options.rb b/lib/event_source/operations/build_message_options.rb new file mode 100644 index 0000000..e641c54 --- /dev/null +++ b/lib/event_source/operations/build_message_options.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" +require "securerandom" + +module EventSource + module Operations + # extract message options + class BuildMessageOptions + include Dry::Monads[:result, :do] + + def call(params) + headers = yield build_headers(params) + payload = yield build_payload(params) + headers = yield append_session_details(headers) + + Success(headers: headers, payload: payload) + end + + private + + def build_headers(params) + headers = params[:headers]&.symbolize_keys || {} + headers[:correlation_id] ||= SecureRandom.uuid + headers[:message_id] ||= SecureRandom.uuid + headers[:event_name] ||= params[:event_name] + headers[:event_time] = headers[:event_time]&.utc + + Success(headers) + end + + def build_payload(params) + payload = params[:payload]&.symbolize_keys || {} + + Success(payload) + end + + def append_session_details(headers) + output = FetchSession.new.call + + if output.success? + session, current_user = output.value! + headers[:session] = session&.symbolize_keys + else + # Create system account user when session is not available + current_user = system_account if defined?(system_account) + end + headers[:account_id] = current_user&.id&.to_s + + Success(headers) + end + end + end +end diff --git a/lib/event_source/operations/create_message.rb b/lib/event_source/operations/create_message.rb new file mode 100644 index 0000000..f0d0b41 --- /dev/null +++ b/lib/event_source/operations/create_message.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" + +module EventSource + module Operations + # create message + class CreateMessage + include Dry::Monads[:result, :do] + + def call(params) + values = yield build(params) + message = yield create(values) + + Success(message) + end + + private + + def build(params) + result = + ::EventSource::AsyncApi::Contracts::MessageContract.new.call(params) + + result.success? ? Success(result.to_h) : Failure(result.errors.to_h) + end + + def create(values) + Success(EventSource::AsyncApi::Message.new(values)) + end + end + end +end diff --git a/lib/event_source/operations/fetch_session.rb b/lib/event_source/operations/fetch_session.rb new file mode 100644 index 0000000..9e72d28 --- /dev/null +++ b/lib/event_source/operations/fetch_session.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" + +module EventSource + module Operations + # fetch session + class FetchSession + include Dry::Monads[:result, :do] + + def call + helper = yield include_session_helper + session = yield fetch_session + current_user = yield fetch_current_user + + Success([session, current_user]) + end + + private + + def include_session_helper + self.class.include(::SessionConcern) + + Success(::SessionConcern) + rescue NameError => e + Failure(e.to_s) + end + + def fetch_session + if respond_to?(:session) + Success(session) + else + Failure("session is not defined") + end + end + + def fetch_current_user + if respond_to?(:current_user) + Success(current_user) + else + Failure("current_user is not defined") + end + end + end + end +end diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 1bf6263..28e735c 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -45,6 +45,7 @@ def bunny_exchange_for(bindings) def publish(payload:, publish_bindings:, headers: {}) bunny_publish_bindings = sanitize_bindings((publish_bindings || {}).to_h) bunny_publish_bindings[:correlation_id] = headers.delete(:correlation_id) if headers[:correlation_id] + bunny_publish_bindings[:message_id] = headers.delete(:message_id) if headers[:message_id] bunny_publish_bindings[:headers] = headers unless headers.empty? logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}" diff --git a/lib/event_source/publisher.rb b/lib/event_source/publisher.rb index 5032a0b..d5aa9b3 100644 --- a/lib/event_source/publisher.rb +++ b/lib/event_source/publisher.rb @@ -66,7 +66,10 @@ def publish(event) logger.debug "Publisher#publish publish_operation_name: #{publish_operation_name}" publish_operation = find_publish_operation_for(publish_operation_name) - publish_operation.call(event.payload, {headers: event.headers}) + payload = event.message&.payload || event.payload + headers = event.message&.headers || event.headers + + publish_operation.call(payload, {headers: headers}) end def channel_name diff --git a/lib/event_source/version.rb b/lib/event_source/version.rb index a298977..4bdf4d9 100644 --- a/lib/event_source/version.rb +++ b/lib/event_source/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module EventSource - VERSION = "0.5.7" + VERSION = "0.5.8" end diff --git a/spec/event_source/event_spec.rb b/spec/event_source/event_spec.rb index c67b0d8..802a64b 100644 --- a/spec/event_source/event_spec.rb +++ b/spec/event_source/event_spec.rb @@ -16,26 +16,26 @@ module EventSource class MyValidEvent < EventSource::Event - publisher_path 'parties.organization_publisher' + publisher_path "parties.organization_publisher" end class MyEvent < EventSource::Event - publisher_path 'parties.organization_publisher' + publisher_path "parties.organization_publisher" attribute_keys :hbx_id, :fein, :entity_kind end class MyEventTwo < EventSource::Event - publisher_path 'parties.organization_publisher' + publisher_path "parties.organization_publisher" end class MyEventThree < EventSource::Event - publisher_path 'parties.organization_publisher' + publisher_path "parties.organization_publisher" attribute_keys :hbx_id, :entity_kind, :fein, :legal_name end end RSpec.describe EventSource::Event do - context 'A new Event class' do + context "A new Event class" do # context "and a required publisher_path isn't provided" do # let(:empty_event_class) do # class MyEmptyEvent < EventSource::Event @@ -67,7 +67,7 @@ class MyEventThree < EventSource::Event # end # end - context 'and the required publisher_path provided is valid' do + context "and the required publisher_path provided is valid" do let(:valid_event_class) { EventSource::MyValidEvent } subject { valid_event_class.new } @@ -76,22 +76,22 @@ class MyEventThree < EventSource::Event # expect(subject.publisher_class).to be_a(Parties::OrganizationPublisher) # end - it 'should have an event_key' do - expect(subject.name).to eq 'event_source.my_valid_event' + it "should have an event_key" do + expect(subject.name).to eq "event_source.my_valid_event" end end - context 'with a defined contract_class' do + context "with a defined contract_class" do context "and the contract_class isn't defined" do - it 'should raise an EventSource::Errors::ContractNotDefined' + it "should raise an EventSource::Errors::ContractNotDefined" end end end - context 'An initialized Event class with defined attribute_keys' do + context "An initialized Event class with defined attribute_keys" do let(:event_class) { EventSource::MyEvent } - it 'keys should be initialized for each attribute' do + it "keys should be initialized for each attribute" do expect(event_class.new.attribute_keys).to eq %i[hbx_id fein entity_kind] end @@ -105,12 +105,12 @@ class MyEventThree < EventSource::Event # end # end - context 'and all attribute values are present' do + context "and all attribute values are present" do let(:attributes) do - { hbx_id: '553234', entity_kind: 'c_corp', fein: '546232323' } + { hbx_id: "553234", entity_kind: "c_corp", fein: "546232323" } end - it '#valid? should return true' do + it "#valid? should return true" do expect(subject.valid?).to be_truthy end end @@ -137,82 +137,82 @@ class MyEventThree < EventSource::Event # end end - context 'An initialized Event class with no attribute_keys' do + context "An initialized Event class with no attribute_keys" do let(:event_class) { EventSource::MyEventTwo } subject { event_class.new } - it 'attribute_keys should be empty' do + it "attribute_keys should be empty" do expect(subject.attribute_keys).to be_empty end - context 'with no attributes passed' do - it '#event_errors should be empty' do + context "with no attributes passed" do + it "#event_errors should be empty" do expect(subject.event_errors).to be_empty end - it '#valid? should return true' do + it "#valid? should return true" do expect(subject.valid?).to be_truthy end - it 'attributes should be an empty hash' do + it "attributes should be an empty hash" do expect(subject.payload).to be_empty end end - context 'and with attributes passed' do + context "and with attributes passed" do let(:attributes) do { - hbx_id: '553234', - entity_kind: 'c_corp', - fein: '546232323', - legal_name: 'Test Corp LLC' + hbx_id: "553234", + entity_kind: "c_corp", + fein: "546232323", + legal_name: "Test Corp LLC" } end subject { event_class.new(attributes: attributes) } - it '#event_errors should be empty' do + it "#event_errors should be empty" do expect(subject.event_errors).to be_empty end - it '#valid? should return true' do + it "#valid? should return true" do expect(subject.valid?).to be_truthy end - it 'should have all attributes' do + it "should have all attributes" do expect(subject.payload).to eq attributes end end end - context 'An initialized Event class with attribute_keys' do + context "An initialized Event class with attribute_keys" do let(:event_class) { EventSource::MyEventThree } - context 'with attributes passed' do + context "with attributes passed" do let(:attributes) do { - hbx_id: '553234', - entity_kind: 'c_corp', - fein: '546232323', - legal_name: 'Test Corp LLC' + hbx_id: "553234", + entity_kind: "c_corp", + fein: "546232323", + legal_name: "Test Corp LLC" } end subject { event_class.new(attributes: attributes) } - it 'attribute_keys should be present' do + it "attribute_keys should be present" do expect(subject.attribute_keys).to eq %i[ - hbx_id - entity_kind - fein - legal_name - ] + hbx_id + entity_kind + fein + legal_name + ] end - it '#event_errors should be empty' do + it "#event_errors should be empty" do expect(subject.event_errors).to be_empty end - it '#valid? should return true' do + it "#valid? should return true" do expect(subject.valid?).to be_truthy end - it 'should have all attributes' do + it "should have all attributes" do expect(subject.payload).to eq attributes end end @@ -236,17 +236,61 @@ class MyEventThree < EventSource::Event # end # end - context 'with attribute getter' do - let(:attributes) { { hbx_id: '553234', fein: '546232323' } } + context "with attribute getter" do + let(:attributes) { { hbx_id: "553234", fein: "546232323" } } subject { event_class.new(attributes: attributes) } - context 'when attribute name is passed' do - it 'should return the value' do + context "when attribute name is passed" do + it "should return the value" do expect(subject[:fein]).to eq attributes[:fein] expect(subject[:hbx_id]).to eq attributes[:hbx_id] end end end end + + describe "message composite event" do + module EventSource + class MyCustomEvent < EventSource::Event + + publisher_path "parties.organization_publisher" + end + end + + module SessionConcern + def current_user + OpenStruct.new(id: 1) + end + + def session + { "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" } + end + end + + context "when event is message composite" do + let(:options) do + { + payload: { + subject_id: "gid://enroll/Person/53e693d7eb899ad9ca01e734", + category: "hc4cc eligibility", + event_time: DateTime.now + }, + headers: { + correlation_id: "edf0e41b-891a-42b1-a4b6-2dbd97d085e4", + build_message: true + } + } + end + + subject { EventSource::MyCustomEvent.new(options) } + + it "should build message" do + expect(subject.message).to be_present + expect(subject.message.headers[:session]).to include( + :session_id + ) + end + end + end end diff --git a/spec/event_source/message_spec.rb b/spec/event_source/message_spec.rb new file mode 100644 index 0000000..702a317 --- /dev/null +++ b/spec/event_source/message_spec.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe EventSource::Message do + module SessionConcern + def current_user + OpenStruct.new(id: 1) + end + + def session + { + "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb", + "portal" => "enroll/families/home", + "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" + } + end + end + + context "input params passed" do + let(:input_params) do + { + payload: { + record: double + }, + headers: { + event_name: "events.hc4cc.eligibilities.created", + event_category: "hc4cc_eligibility", + event_time: DateTime.now, + event_outcome: "eligibility created", + subject_id: "gid://enroll/Person/53e693d7eb899ad9ca01e734", + resource_id: "gid://enroll/Eligibility/53e693d7eb899ad9ca01e734", + market_kind: "individual" + }, + event_name: "enroll.events.person.hc4cc_eligibility.created" + } + end + + context "when params passed" do + it "should create message entity" do + message = described_class.new(input_params) + + expect(message).to be_a(EventSource::Message) + end + + it "should have headers on the message" do + message = described_class.new(input_params) + + expect(message.headers).to be_a(Hash) + expect(message.headers.keys).to match_array( + %i[ + correlation_id + subject_id + resource_id + event_category + message_id + event_name + event_time + event_outcome + market_kind + account_id + session + ] + ) + end + + it "should have payload on the message" do + message = described_class.new(input_params) + + expect(message.payload).to be_a(Hash) + expect(message.payload.keys).to match_array(%i[record]) + end + + it "should have payload with session on the message" do + message = described_class.new(input_params) + + expect(message.headers[:session]).to be_a(Hash) + expect(message.headers[:session].keys).to match_array( + %i[session_id portal login_session_id] + ) + end + end + end +end diff --git a/spec/event_source/operations/build_message_options_spec.rb b/spec/event_source/operations/build_message_options_spec.rb new file mode 100644 index 0000000..3cb8028 --- /dev/null +++ b/spec/event_source/operations/build_message_options_spec.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe EventSource::Operations::BuildMessageOptions do + module SessionConcern + def current_user + OpenStruct.new(id: 1) + end + + def session + { + "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb", + "portal" => "enroll/families/home", + "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" + } + end + end + + context "input params passed" do + let(:input_params) do + { + payload: { + record: double + }, + headers: { + event_name: "events.hc4cc.eligibilities.created", + event_category: "hc4cc_eligibility", + event_time: DateTime.now, + event_outcome: "eligibility created", + subject_id: "gid://enroll/Person/53e693d7eb899ad9ca01e734", + resource_id: "gid://enroll/Eligibility/53e693d7eb899ad9ca01e734", + market_kind: "individual" + }, + event_name: "enroll.events.person.hc4cc_eligibility.created" + } + end + + context "when params passed" do + it "should return success" do + result = subject.call(input_params) + + expect(result.success?).to be_truthy + end + + it "should build headers" do + result = subject.call(input_params) + message_options = result.value! + + expect(message_options[:headers]).to be_a(Hash) + expect(message_options[:headers].keys).to match_array( + %i[ + correlation_id + subject_id + resource_id + event_category + message_id + event_name + event_time + event_outcome + market_kind + account_id + session + ] + ) + end + + it "should build payload" do + result = subject.call(input_params) + message_options = result.value! + + expect(message_options[:payload]).to be_a(Hash) + expect(message_options[:payload].keys).to match_array(%i[record]) + end + + it "should build session options" do + result = subject.call(input_params) + message_options = result.value! + + expect(message_options[:headers][:session]).to be_a(Hash) + expect(message_options[:headers][:session].keys).to match_array( + %i[session_id portal login_session_id] + ) + end + end + end +end diff --git a/spec/event_source/operations/build_message_spec.rb b/spec/event_source/operations/build_message_spec.rb new file mode 100644 index 0000000..fc41cbf --- /dev/null +++ b/spec/event_source/operations/build_message_spec.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe EventSource::Operations::BuildMessage do + module SessionConcern + def current_user + OpenStruct.new(id: 1) + end + + def session + { + "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb", + "portal" => "enroll/families/home", + "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" + } + end + end + + context "input params passed" do + let(:input_params) do + { + payload: { + record: double + }, + headers: { + event_name: "events.hc4cc.eligibilities.created", + event_category: "hc4cc_eligibility", + event_time: DateTime.now, + event_outcome: "eligibility created", + subject_id: "gid://enroll/Person/53e693d7eb899ad9ca01e734", + resource_id: "gid://enroll/Eligibility/53e693d7eb899ad9ca01e734", + market_kind: "individual" + }, + event_name: "enroll.events.person.hc4cc_eligibility.created" + } + end + + context "when session available" do + it "should message options session options" do + result = subject.call(input_params) + + expect(result.success?).to be_truthy + expect(result.success).to be_a(EventSource::AsyncApi::Message) + end + end + end +end diff --git a/spec/event_source/operations/fetch_session_spec.rb b/spec/event_source/operations/fetch_session_spec.rb new file mode 100644 index 0000000..568df59 --- /dev/null +++ b/spec/event_source/operations/fetch_session_spec.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe EventSource::Operations::FetchSession do + let(:fetch_session) { described_class.new } + + describe "when session helper not defined" do + + context "when current user not defined" do + before do + allow(fetch_session).to receive(:respond_to?).with(:session).and_return( + true + ) + allow(fetch_session).to receive(:respond_to?).with( + :current_user + ).and_return(false) + end + + it "should fail" do + result = fetch_session.call + + expect(result.success?).to be_falsey + expect(result.failure).to eq "current_user is not defined" + end + end + + context "when session not defined" do + before do + allow(fetch_session).to receive(:respond_to?).with(:session).and_return( + false + ) + allow(fetch_session).to receive(:respond_to?).with( + :current_user + ).and_return(true) + end + + it "should fail" do + result = fetch_session.call + + expect(result.success?).to be_falsey + expect(result.failure).to eq "session is not defined" + end + end + end + + describe "when session helper defined" do + context "when operation called" do + module SessionConcern + def current_user + OpenStruct.new(id: 1) + end + + def session + { + "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb", + "portal" => "enroll/families/home", + "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" + } + end + end + + it "should return session and current user" do + result = fetch_session.call + + expect(result.success?).to be_truthy + expect(result.value!).to eq( + [ + { + "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb", + "portal" => "enroll/families/home", + "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" + }, + OpenStruct.new(id: 1) + ] + ) + end + end + end +end From e368d614f37c5cf4e4a17a81ae71c49376d96ff6 Mon Sep 17 00:00:00 2001 From: Raghu Ram Date: Wed, 3 Jan 2024 15:46:59 -0500 Subject: [PATCH 5/8] attach session details inside account (#106) * create message along with event * removed unused references * build message fixes * build async api message object with session * append event log attributes to headers * updates for message options and session include * attach session details inside account * spec fixes --------- Signed-off-by: Raghu Ram --- lib/event_source/operations/build_message_options.rb | 11 +++++++---- spec/event_source/event_spec.rb | 2 +- spec/event_source/message_spec.rb | 7 +++---- .../operations/build_message_options_spec.rb | 7 +++---- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/lib/event_source/operations/build_message_options.rb b/lib/event_source/operations/build_message_options.rb index e641c54..331371b 100644 --- a/lib/event_source/operations/build_message_options.rb +++ b/lib/event_source/operations/build_message_options.rb @@ -13,7 +13,7 @@ class BuildMessageOptions def call(params) headers = yield build_headers(params) payload = yield build_payload(params) - headers = yield append_session_details(headers) + headers = yield append_account_details(headers) Success(headers: headers, payload: payload) end @@ -36,17 +36,20 @@ def build_payload(params) Success(payload) end - def append_session_details(headers) + def append_account_details(headers) output = FetchSession.new.call + account = {} if output.success? session, current_user = output.value! - headers[:session] = session&.symbolize_keys + account[:session] = session&.symbolize_keys else # Create system account user when session is not available current_user = system_account if defined?(system_account) end - headers[:account_id] = current_user&.id&.to_s + + account[:id] = current_user&.id&.to_s + headers[:account] = account Success(headers) end diff --git a/spec/event_source/event_spec.rb b/spec/event_source/event_spec.rb index 802a64b..b0d5058 100644 --- a/spec/event_source/event_spec.rb +++ b/spec/event_source/event_spec.rb @@ -287,7 +287,7 @@ def session it "should build message" do expect(subject.message).to be_present - expect(subject.message.headers[:session]).to include( + expect(subject.message.headers[:account][:session]).to include( :session_id ) end diff --git a/spec/event_source/message_spec.rb b/spec/event_source/message_spec.rb index 702a317..b4c4396 100644 --- a/spec/event_source/message_spec.rb +++ b/spec/event_source/message_spec.rb @@ -58,8 +58,7 @@ def session event_time event_outcome market_kind - account_id - session + account ] ) end @@ -74,8 +73,8 @@ def session it "should have payload with session on the message" do message = described_class.new(input_params) - expect(message.headers[:session]).to be_a(Hash) - expect(message.headers[:session].keys).to match_array( + expect(message.headers[:account][:session]).to be_a(Hash) + expect(message.headers[:account][:session].keys).to match_array( %i[session_id portal login_session_id] ) end diff --git a/spec/event_source/operations/build_message_options_spec.rb b/spec/event_source/operations/build_message_options_spec.rb index 3cb8028..b012bfb 100644 --- a/spec/event_source/operations/build_message_options_spec.rb +++ b/spec/event_source/operations/build_message_options_spec.rb @@ -59,8 +59,7 @@ def session event_time event_outcome market_kind - account_id - session + account ] ) end @@ -77,8 +76,8 @@ def session result = subject.call(input_params) message_options = result.value! - expect(message_options[:headers][:session]).to be_a(Hash) - expect(message_options[:headers][:session].keys).to match_array( + expect(message_options[:headers][:account][:session]).to be_a(Hash) + expect(message_options[:headers][:account][:session].keys).to match_array( %i[session_id portal login_session_id] ) end From f4e83cbdf124c4d48e469e239f36261191ee61c9 Mon Sep 17 00:00:00 2001 From: Raghu Ram Date: Tue, 30 Jan 2024 16:34:40 -0500 Subject: [PATCH 6/8] fixes for system account (#107) * fixes for system account * updated fetch_session spec --- .../operations/build_message_options.rb | 12 ++--- lib/event_source/operations/fetch_session.rb | 11 ++++- .../operations/fetch_session_spec.rb | 44 +++++++++---------- 3 files changed, 38 insertions(+), 29 deletions(-) diff --git a/lib/event_source/operations/build_message_options.rb b/lib/event_source/operations/build_message_options.rb index 331371b..4456ae9 100644 --- a/lib/event_source/operations/build_message_options.rb +++ b/lib/event_source/operations/build_message_options.rb @@ -38,17 +38,17 @@ def build_payload(params) def append_account_details(headers) output = FetchSession.new.call + return output unless output.success? + + session, current_user, system_account = output.value! account = {} - if output.success? - session, current_user = output.value! + if session.present? && current_user.present? account[:session] = session&.symbolize_keys + account[:id] = current_user&.id&.to_s else - # Create system account user when session is not available - current_user = system_account if defined?(system_account) + account[:id] = system_account&.id&.to_s end - - account[:id] = current_user&.id&.to_s headers[:account] = account Success(headers) diff --git a/lib/event_source/operations/fetch_session.rb b/lib/event_source/operations/fetch_session.rb index 9e72d28..ce2ce83 100644 --- a/lib/event_source/operations/fetch_session.rb +++ b/lib/event_source/operations/fetch_session.rb @@ -13,8 +13,9 @@ def call helper = yield include_session_helper session = yield fetch_session current_user = yield fetch_current_user + system_account = yield fetch_system_account - Success([session, current_user]) + Success([session, current_user, system_account]) end private @@ -42,6 +43,14 @@ def fetch_current_user Failure("current_user is not defined") end end + + def fetch_system_account + if respond_to?(:system_account) + Success(system_account) + else + Failure("system_account is not defined") + end + end end end end diff --git a/spec/event_source/operations/fetch_session_spec.rb b/spec/event_source/operations/fetch_session_spec.rb index 568df59..29ce65a 100644 --- a/spec/event_source/operations/fetch_session_spec.rb +++ b/spec/event_source/operations/fetch_session_spec.rb @@ -6,16 +6,20 @@ let(:fetch_session) { described_class.new } describe "when session helper not defined" do + before do + allow(fetch_session).to receive(:respond_to?).with(:session).and_return( + respond_to_session + ) + allow(fetch_session).to receive(:respond_to?).with( + :current_user + ).and_return(respond_to_current_user) + end + + let(:respond_to_session) { true } + let(:respond_to_current_user) { true } context "when current user not defined" do - before do - allow(fetch_session).to receive(:respond_to?).with(:session).and_return( - true - ) - allow(fetch_session).to receive(:respond_to?).with( - :current_user - ).and_return(false) - end + let(:respond_to_current_user) { false } it "should fail" do result = fetch_session.call @@ -26,14 +30,7 @@ end context "when session not defined" do - before do - allow(fetch_session).to receive(:respond_to?).with(:session).and_return( - false - ) - allow(fetch_session).to receive(:respond_to?).with( - :current_user - ).and_return(true) - end + let(:respond_to_session) { false } it "should fail" do result = fetch_session.call @@ -58,20 +55,23 @@ def session "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" } end + + def system_account + OpenStruct.new(id: 2) + end end + let(:session_concern) { Class.new.extend(SessionConcern) } + it "should return session and current user" do result = fetch_session.call expect(result.success?).to be_truthy expect(result.value!).to eq( [ - { - "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb", - "portal" => "enroll/families/home", - "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" - }, - OpenStruct.new(id: 1) + session_concern.session, + session_concern.current_user, + session_concern.system_account ] ) end From ed23a949f6900cf2a756d4cee3889a3aa100716f Mon Sep 17 00:00:00 2001 From: Raghu Ram Date: Fri, 9 Feb 2024 11:40:29 -0500 Subject: [PATCH 7/8] Add subscriber name as suffix for executable container keys (#108) * Add subscriber name as suffix for executable container keys * added specs --- .../protocols/amqp/bunny_queue_proxy.rb | 70 +++----- lib/event_source/subscriber.rb | 8 +- .../protocols/amqp/bunny_queue_proxy_spec.rb | 153 ++++++++++++++++++ .../subscribers/enterprise_subscriber.rb | 28 ++++ .../subscribers/event_log_subscriber.rb | 28 ++++ .../asyncapi/amqp_enterprise_subscribe.yml | 55 +++++++ 6 files changed, 294 insertions(+), 48 deletions(-) create mode 100644 spec/rails_app/app/event_source/subscribers/enterprise_subscriber.rb create mode 100644 spec/rails_app/app/event_source/subscribers/event_log_subscriber.rb create mode 100644 spec/support/asyncapi/amqp_enterprise_subscribe.yml diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 2b431e5..89c07b5 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -117,41 +117,6 @@ def convert_subscriber_prefetch(options) def resolve_subscriber_routing_keys(channel, operation); end - # def register_subscription(subscriber_klass, bindings) - # consumer_proxy = consumer_proxy_for(bindings) - - # consumer_proxy.on_delivery do |delivery_info, metadata, payload| - # on_receive_message( - # subscriber_klass, - # delivery_info, - # metadata, - # payload - # ) - # end - - # subscribe_consumer(consumer_proxy) - # end - - # def subscribe_consumer(consumer_proxy) - # @subject.subscribe_with(consumer_proxy) - # @consumers.push(consumer_proxy) - # end - - # def consumer_proxy_for(bindings) - # operation_bindings = convert_to_consumer_options(bindings[:amqp]) - - # logger.debug 'consumer proxy options:' - # logger.debug operation_bindings.inspect - - # BunnyConsumerProxy.new( - # @subject.channel, - # @subject, - # '', - # operation_bindings[:no_ack], - # operation_bindings[:exclusive] - # ) - # end - def on_receive_message( subscriber_klass, delivery_info, @@ -164,17 +129,7 @@ def on_receive_message( logger.debug metadata.inspect logger.debug payload.inspect - if delivery_info.routing_key - routing_key = [app_name, delivery_info.routing_key].join(delimiter) - executable = subscriber_klass.executable_for(routing_key) - end - - unless executable - routing_key = [app_name, exchange_name].join(delimiter) - executable = subscriber_klass.executable_for(routing_key) - end - - logger.debug "routing_key: #{routing_key}" + executable = find_executable(subscriber_klass, delivery_info) return unless executable subscriber = subscriber_klass.new @@ -196,6 +151,13 @@ def on_receive_message( subscriber = nil end + def find_executable(subscriber_klass, delivery_info) + subscriber_suffix = subscriber_klass_name_to_suffix(subscriber_klass) + + find_executable_for_routing_key(subscriber_klass, delivery_info, subscriber_suffix) || + find_default_executable(subscriber_klass, subscriber_suffix) + end + def respond_to_missing?(name, include_private); end # Forward all missing method calls to the Bunny::Queue instance @@ -205,6 +167,22 @@ def method_missing(name, *args) private + def subscriber_klass_name_to_suffix(subscriber_klass) + subscriber_klass.name.downcase.gsub("::", '_') + end + + def find_executable_for_routing_key(subscriber_klass, delivery_info, subscriber_suffix) + return unless delivery_info.routing_key + + routing_key = [app_name, delivery_info.routing_key].join(delimiter) + subscriber_klass.executable_for(routing_key + "_#{subscriber_suffix}") + end + + def find_default_executable(subscriber_klass, subscriber_suffix) + default_routing_key = [app_name, exchange_name].join(delimiter) + subscriber_klass.executable_for(default_routing_key + "_#{subscriber_suffix}") + end + def delimiter EventSource.delimiter(:amqp) end diff --git a/lib/event_source/subscriber.rb b/lib/event_source/subscriber.rb index d3b1cd0..d62257a 100644 --- a/lib/event_source/subscriber.rb +++ b/lib/event_source/subscriber.rb @@ -100,11 +100,15 @@ def subscribe(subscription_name, &block) unless formatted_publisher_key.gsub(delimiter, '_') == identifier unique_key_elements.push(identifier) end - logger.debug "Subscriber#susbcribe Unique_key #{unique_key_elements.join(delimiter)}" + return unless block_given? + subscriber_suffix = self.name.downcase.gsub('::', '_') + subscriber_unique_key = unique_key_elements.join(delimiter) + "_#{subscriber_suffix}" + logger.debug "Subscriber#susbcribe Unique key #{subscriber_unique_key}" + EventSource::Subscriber.executable_container[ - unique_key_elements.join(delimiter) + subscriber_unique_key ] = block end diff --git a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb index 7f4aeab..a1758b3 100644 --- a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb @@ -168,4 +168,157 @@ # end # end end + + context "executable lookup with subscriber suffix" do + let(:connection_manager) { EventSource::ConnectionManager.instance } + let!(:connection) { connection_manager.add_connection(my_server) } + + let(:event_log_subscriber) do + Pathname.pwd.join( + "spec", + "rails_app", + "app", + "event_source", + "subscribers", + "event_log_subscriber.rb" + ) + end + + let(:enterprise_subscriber) do + Pathname.pwd.join( + "spec", + "rails_app", + "app", + "event_source", + "subscribers", + "enterprise_subscriber.rb" + ) + end + + let(:publish_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call( + path: + Pathname.pwd.join( + "spec", + "support", + "asyncapi", + "amqp_audit_log_publish.yml" + ) + ) + .success + end + + let(:subscribe_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call( + path: + Pathname.pwd.join( + "spec", + "support", + "asyncapi", + "amqp_audit_log_subscribe.yml" + ) + ) + .success + end + + let(:subscribe_two_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call( + path: + Pathname.pwd.join( + "spec", + "support", + "asyncapi", + "amqp_enterprise_subscribe.yml" + ) + ) + .success + end + + let(:publish_channel) do + connection.add_channel( + "enroll.audit_log.events.created", + publish_resource.channels.first + ) + end + let(:subscribe_channel) do + connection.add_channel( + "on_enroll.enroll.audit_log.events", + subscribe_resource.channels.first + ) + end + let(:subscribe_two_channel) do + connection.add_channel( + "on_enroll.enroll.enterprise.events", + subscribe_two_resource.channels.first + ) + end + + let(:load_subscribers) do + [event_log_subscriber, enterprise_subscriber].each do |file| + require file.to_s + end + end + + before do + allow(EventSource).to receive(:app_name).and_return("enroll") + connection.start unless connection.active? + publish_channel + subscribe_channel + subscribe_two_channel + load_subscribers + allow(subject).to receive(:exchange_name) { exchange_name } + end + + let(:audit_log_proc) do + EventSource::Subscriber.executable_container[ + "enroll.enroll.audit_log.events_subscribers_eventlogsubscriber" + ] + end + + let(:enterprise_advance_day_proc) do + EventSource::Subscriber.executable_container[ + "enroll.enroll.enterprise.events.date_advanced_subscribers_enterprisesubscriber" + ] + end + + context "when routing key based executable is not found" do + let(:delivery_info) do + double(routing_key: "enroll.enterprise.events.date_advanced") + end + + let(:exchange_name) { "enroll.audit_log.events" } + + it "should return default audit log proc" do + executable = + subject.find_executable( + Subscribers::EventLogSubscriber, + delivery_info + ) + expect(executable).to match(audit_log_proc) + end + end + + context "when routing key based executable is found" do + let(:delivery_info) do + double(routing_key: "enroll.enterprise.events.date_advanced") + end + + let(:exchange_name) { "enroll.enterprise.events" } + + it "should return executable for the routing key" do + executable = + subject.find_executable( + Subscribers::EnterpriseSubscriber, + delivery_info + ) + expect(executable).to match(enterprise_advance_day_proc) + end + end + end end diff --git a/spec/rails_app/app/event_source/subscribers/enterprise_subscriber.rb b/spec/rails_app/app/event_source/subscribers/enterprise_subscriber.rb new file mode 100644 index 0000000..c36b638 --- /dev/null +++ b/spec/rails_app/app/event_source/subscribers/enterprise_subscriber.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Subscribers + # Subscriber will receive Enterprise requests like date change + class EnterpriseSubscriber + include ::EventSource::Subscriber[amqp: "enroll.enterprise.events"] + + subscribe(:on_date_advanced) do |delivery_info, metadata, response| + logger.info "-" * 100 unless Rails.env.test? + logger.info "EnterpriseSubscriber#on_date_advanced, response: #{response}" + + ack(delivery_info.delivery_tag) + rescue StandardError, SystemStackError => e + ack(delivery_info.delivery_tag) + end + + subscribe( + :on_enroll_enterprise_events + ) do |delivery_info, _metadata, response| + logger.info "-" * 100 unless Rails.env.test? + logger.info "EnterpriseSubscriber#on_enroll_enterprise_events, response: #{response}" + + ack(delivery_info.delivery_tag) + rescue StandardError, SystemStackError => e + ack(delivery_info.delivery_tag) + end + end +end diff --git a/spec/rails_app/app/event_source/subscribers/event_log_subscriber.rb b/spec/rails_app/app/event_source/subscribers/event_log_subscriber.rb new file mode 100644 index 0000000..9dea894 --- /dev/null +++ b/spec/rails_app/app/event_source/subscribers/event_log_subscriber.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Subscribers + # Subscriber will receive Audit Log events + class EventLogSubscriber + include EventSource::Logging + include ::EventSource::Subscriber[amqp: "enroll.audit_log.events"] + + subscribe( + :on_enroll_audit_log_events + ) do |delivery_info, metadata, response| + logger.info "-" * 100 unless Rails.env.test? + + subscriber_logger.info "EventLogEventsSubscriber#on_enroll_audit_log_events, response: #{response}" + + ack(delivery_info.delivery_tag) + rescue StandardError, SystemStackError => e + ack(delivery_info.delivery_tag) + end + + private + + def subscriber_logger + @subscriber_logger ||= + Logger.new("#{Rails.root}/log/on_enroll_audit_log_events.log") + end + end +end diff --git a/spec/support/asyncapi/amqp_enterprise_subscribe.yml b/spec/support/asyncapi/amqp_enterprise_subscribe.yml new file mode 100644 index 0000000..1842ed0 --- /dev/null +++ b/spec/support/asyncapi/amqp_enterprise_subscribe.yml @@ -0,0 +1,55 @@ +--- +asyncapi: 2.0.0 +info: + title: Enroll App + version: 0.1.0 + description: AMQP Subsribe configuration for the Enroll App services + contact: + name: IdeaCrew + url: https://ideacrew.com + email: info@ideacrew.com + license: + name: MIT + url: https://opensource.org/licenses/MIT + +servers: + production: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Production Server + development: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Test Server + test: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Test Server +channels: + on_enroll.enroll.enterprise.events: + bindings: + amqp: + is: queue + queue: + name: on_enroll.enroll.enterprise.events + durable: true + exclusive: false + auto_delete: false + vhost: "/" + subscribe: + bindings: + amqp: + ack: true + exclusive: false + routing_key: enroll.# + prefetch: 1 + bindingVersion: "0.2.0" + operationId: on_enroll.enroll.enterprise.events + description: Events - for system wide changes + +tags: + - name: linter_tag + description: placeholder that satisfies the linter \ No newline at end of file From 7fa9b28a563c6e20a1203c201cfb814cd697ece5 Mon Sep 17 00:00:00 2001 From: Trey Date: Wed, 31 Jul 2024 16:16:20 -0400 Subject: [PATCH 8/8] adds downstream dependency checking (#98) --- .github/workflows/downstream.yml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 .github/workflows/downstream.yml diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml new file mode 100644 index 0000000..4b352d1 --- /dev/null +++ b/.github/workflows/downstream.yml @@ -0,0 +1,24 @@ +name: Downstream + +on: + workflow_dispatch: + branches: + - trunk + push: + branches: + - trunk + +jobs: + check-dependencies: + runs-on: ubuntu-latest + steps: + - name: Bundle Dependency Checks + uses: convictional/trigger-workflow-and-wait@v1.6.1 + with: + owner: ideacrew + repo: ic_dependency_jamboree + github_token: ${{ secrets.GH_PAT }} + workflow_file_name: test_bundle.yml + ref: trunk + propagate_failure: false +