diff --git a/.github/workflows/docs_and_rspec.yml b/.github/workflows/rspec.yml similarity index 62% rename from .github/workflows/docs_and_rspec.yml rename to .github/workflows/rspec.yml index 23a74d1e..198903e4 100644 --- a/.github/workflows/docs_and_rspec.yml +++ b/.github/workflows/rspec.yml @@ -1,8 +1,12 @@ -name: Rubocop and Rspec +name: Rspec on: push jobs: rspec: + strategy: + fail-fast: false + matrix: + ruby_version: ['2.6.3', '2.7.5', '3.0.5', '3.1.4', '3.2.2'] runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -14,14 +18,14 @@ jobs: sudo rabbitmqctl set_permissions -p event_source guest ".*" ".*" ".*" - uses: ruby/setup-ruby@v1 with: - ruby-version: 2.6 + ruby-version: ${{ matrix.ruby_version }} - name: Cache Gems uses: actions/cache@v1 with: path: vendor/bundle - key: ${{ runner.os }}-event_source-gems-${{ hashFiles('**/Gemfile.lock') }}-${{ hashFiles('**/Gemfile' ) }} + key: ${{ runner.os }}-${{matrix.ruby_version}}-event_source-gems-${{ hashFiles('**/Gemfile.lock') }}-${{ hashFiles('**/Gemfile' ) }} restore-keys: | - ${{ runner.os }}-event_source-gems-${{ hashFiles('**/Gemfile.lock') }}-${{ hashFiles('**/Gemfile' ) }} + ${{ runner.os }}-${{matrix.ruby_version}}-event_source-gems-${{ hashFiles('**/Gemfile.lock') }}-${{ hashFiles('**/Gemfile' ) }} - name: bundle install run: | bundle config path vendor/bundle diff --git a/.ruby-version b/.ruby-version index 338a5b5d..0444f320 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -2.6.6 +3.2.1 \ No newline at end of file diff --git a/Gemfile.lock b/Gemfile.lock index a3559182..4a70de83 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,16 +1,16 @@ PATH remote: . specs: - event_source (0.5.7) + event_source (0.5.8) addressable (>= 2.8.0) bunny (>= 2.14) deep_merge (~> 1.2.0) - dry-configurable (= 0.12.1) + dry-configurable (~> 0.12) dry-events (~> 0.3) dry-inflector (~> 0.2) dry-initializer (~> 3.0) dry-monads (~> 1.3) - dry-schema (= 1.6.2) + dry-schema (~> 1.6) dry-struct (~> 1.4) dry-types (~> 1.5) dry-validation (~> 1.6) @@ -18,10 +18,9 @@ PATH faraday_middleware (~> 1.0) logging (~> 2.3.0) mime-types - nokogiri (>= 1.12.5) + nokogiri (>= 1.13.0) oj (~> 3.11) ox (~> 2.14) - resque-bus (~> 0.7.0) typhoeus (~> 1.4.0) GEM @@ -154,8 +153,6 @@ GEM dry-initializer (~> 3.0) dry-schema (~> 1.5, >= 1.5.2) erubi (1.10.0) - et-orbi (1.2.6) - tzinfo ethon (0.15.0) ffi (>= 1.15.0) faker (2.18.0) @@ -176,9 +173,6 @@ GEM faraday_middleware (1.2.0) faraday (~> 1.0) ffi (1.15.4) - fugit (1.5.2) - et-orbi (~> 1.1, >= 1.1.8) - raabro (~> 1.4) globalid (0.5.2) activesupport (>= 5.0) hashdiff (1.0.1) @@ -200,22 +194,18 @@ GEM mime-types-data (~> 3.2015) mime-types-data (3.2021.0901) mini_mime (1.1.2) - mini_portile2 (2.6.1) + mini_portile2 (2.8.1) minitest (5.14.4) mongo (2.14.0) bson (>= 4.8.2, < 5.0.0) mongoid (7.3.0) activemodel (>= 5.1, < 6.2) mongo (>= 2.10.5, < 3.0.0) - mono_logger (1.1.1) multi_json (1.15.0) multipart-post (2.1.1) mustermann (1.1.1) ruby2_keywords (~> 0.0.1) nio4r (2.5.8) - nokogiri (1.12.5) - mini_portile2 (~> 2.6.1) - racc (~> 1.4) oj (3.13.9) ox (2.14.5) parallel (1.20.1) @@ -228,11 +218,7 @@ GEM byebug (~> 11.0) pry (~> 0.10) public_suffix (4.0.6) - queue-bus (0.12.0) - multi_json - redis - raabro (1.4.0) - racc (1.6.0) + racc (1.6.2) rack (2.2.3) rack-protection (2.1.0) rack @@ -266,30 +252,8 @@ GEM thor (~> 1.0) rainbow (3.0.0) rake (13.0.6) - rbtree (0.4.4) - redis (4.5.1) - redis-namespace (1.8.1) - redis (>= 3.0.4) + rbtree (0.4.6) regexp_parser (2.1.1) - resque (1.27.4) - mono_logger (~> 1.0) - multi_json (~> 1.0) - redis-namespace (~> 1.3) - sinatra (>= 0.9.2) - vegas (~> 0.1.2) - resque-bus (0.7.0) - queue-bus (>= 0.7, < 1) - resque (>= 1.10.0, < 2.0) - resque-retry - resque-scheduler (>= 2.0.1) - resque-retry (1.7.6) - resque (>= 1.25, < 3.0) - resque-scheduler (~> 4.0) - resque-scheduler (4.5.0) - mono_logger (~> 1.0) - redis (>= 3.3) - resque (>= 1.27) - rufus-scheduler (~> 3.2, < 3.7) rexml (3.2.5) rspec-core (3.10.1) rspec-support (~> 3.10.0) @@ -321,8 +285,6 @@ GEM parser (>= 3.0.1.1) ruby-progressbar (1.11.0) ruby2_keywords (0.0.4) - rufus-scheduler (3.6.0) - fugit (~> 1.1, >= 1.1.6) set (1.0.2) sinatra (2.1.0) mustermann (~> 1.0) @@ -346,8 +308,6 @@ GEM tzinfo (2.0.4) concurrent-ruby (~> 1.0) unicode-display_width (2.0.0) - vegas (0.1.11) - rack (>= 1.0.0) webmock (3.13.0) addressable (>= 2.3.6) crack (>= 0.3.2) diff --git a/event_source.gemspec b/event_source.gemspec index a9d84ee8..f5bdc6a5 100644 --- a/event_source.gemspec +++ b/event_source.gemspec @@ -38,7 +38,7 @@ Gem::Specification.new do |spec| spec.add_dependency 'addressable', '>= 2.8.0' spec.add_dependency 'bunny', '>= 2.14' spec.add_dependency 'deep_merge', '~> 1.2.0' - spec.add_dependency 'dry-configurable', '0.12.1' + spec.add_dependency 'dry-configurable', '~> 0.12' spec.add_dependency 'dry-events', '~> 0.3' spec.add_dependency 'dry-inflector', '~> 0.2' spec.add_dependency 'dry-initializer', '~> 3.0' @@ -46,15 +46,14 @@ Gem::Specification.new do |spec| spec.add_dependency 'dry-struct', '~> 1.4' spec.add_dependency 'dry-types', '~> 1.5' spec.add_dependency 'dry-validation', '~> 1.6' - spec.add_dependency 'dry-schema', '1.6.2' + spec.add_dependency 'dry-schema', '~> 1.6' spec.add_dependency 'faraday', '~> 1.4.1' spec.add_dependency 'faraday_middleware', '~> 1.0' spec.add_dependency 'logging', '~> 2.3.0' - spec.add_dependency 'nokogiri', '>= 1.12.5' + spec.add_dependency 'nokogiri', '>= 1.13.0' spec.add_dependency 'mime-types' spec.add_dependency 'oj', '~> 3.11' spec.add_dependency 'ox', '~> 2.14' - spec.add_dependency 'resque-bus', '~> 0.7.0' spec.add_dependency 'typhoeus', '~> 1.4.0' # TODO: Change to development dependency diff --git a/lib/event_source.rb b/lib/event_source.rb index 6a1bfbb3..0956d8c6 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -13,6 +13,7 @@ require 'active_support/all' # TODO: Remove ActiveSupport dependency require 'event_source/version' +require 'event_source/ruby_versions' require 'event_source/error' require 'event_source/inflector' require 'event_source/logging' @@ -30,11 +31,16 @@ require 'event_source/worker' require 'event_source/publish_operation' require 'event_source/subscribe_operation' +require 'event_source/message' require 'event_source/command' require 'event_source/publisher' require 'event_source/event' require 'event_source/subscriber' require 'event_source/operations/codec64' +require 'event_source/operations/create_message' +require 'event_source/operations/fetch_session' +require 'event_source/operations/build_message_options' +require 'event_source/operations/build_message' # Event source provides ability to compose, publish and subscribe to events module EventSource diff --git a/lib/event_source/async_api/message.rb b/lib/event_source/async_api/message.rb index dc28c383..ab3ffb8a 100644 --- a/lib/event_source/async_api/message.rb +++ b/lib/event_source/async_api/message.rb @@ -20,30 +20,14 @@ class Message < Dry::Struct # @!attribute [r] headers # Schema definition of the application headers. Schema must be of type "object". # It must not define the protocol headers. - # @return [Schema] - attribute :headers, Schema.meta(omittable: true) + # @return [Types::Any] + attribute :headers, Types::Any.meta(omittable: true) # @!attribute [r] payload # Definition of the message payload. It can be of any type but defaults to Schema object # @return [Types::Any] attribute :payload, Types::Any.meta(omittable: true) - # @!attribute [r] correlation_id - # Definition of the correlation ID used for message tracing or matching - # @return [String] - attribute :correlation_id do - # @!attribute [r] description - # An optional description of the identifier. - # CommonMark syntax can be used for rich text representation - # @return [String] - attribute :description, Types::String.meta(omittable: true) - - # @!attribute [r] location - # Required. A runtime expression that specifies the location of the correlation ID - # @return [String] - attribute :location, Types::String.meta(omittable: true) - end.meta(omittable: true) - # @!attribute [r] schema_format # A string containing the name of the schema format used to define the message payload. # If omitted, implementations should parse the payload as a Schema object. Check out the diff --git a/lib/event_source/async_api/operations/async_api_conf/load_path.rb b/lib/event_source/async_api/operations/async_api_conf/load_path.rb index b3a9955f..1a8e6d42 100644 --- a/lib/event_source/async_api/operations/async_api_conf/load_path.rb +++ b/lib/event_source/async_api/operations/async_api_conf/load_path.rb @@ -8,7 +8,7 @@ module Operations module AsyncApiConf # load channel params from given file path class LoadPath - send(:include, Dry::Monads[:result, :do, :try]) + include Dry::Monads[:result, :do, :try] def call(path:) file_io = yield read(path) @@ -27,7 +27,7 @@ def read(path) def deserialize(file_io) Try do - YAML.safe_load(file_io, [Symbol]) + YAML.load(file_io) end.to_result end diff --git a/lib/event_source/async_api/publish_bindings.rb b/lib/event_source/async_api/publish_bindings.rb index 2d84b450..3ae45f3d 100644 --- a/lib/event_source/async_api/publish_bindings.rb +++ b/lib/event_source/async_api/publish_bindings.rb @@ -12,6 +12,7 @@ class PublishBindings < Dry::Struct transform_keys(&:to_sym) attribute :http, ::EventSource::Protocols::Http::PublishBindings.meta(omittable: true) attribute :amqp, Types::Hash.meta(omittable: true) + attribute :x_amqp_exchange_to_exchanges, Types::Hash.meta(omittable: true) end end end diff --git a/lib/event_source/channel.rb b/lib/event_source/channel.rb index 0b4f36aa..fc6d951a 100644 --- a/lib/event_source/channel.rb +++ b/lib/event_source/channel.rb @@ -76,6 +76,7 @@ def add_publish_operation(async_api_channel_item) @channel_proxy.add_publish_operation(async_api_channel_item) return false unless publish_proxy + @channel_proxy.create_exchange_to_exchange_bindings(publish_proxy) if @connection.protocol == :amqp operation_id = async_api_channel_item.publish.operationId logger.info "Adding Publish Operation: #{operation_id}" @@ -95,6 +96,7 @@ def add_publish_operation(async_api_channel_item) # @return [EventSource::SubscribeOperation] def add_subscribe_operation(async_api_channel_item) return false unless async_api_channel_item.subscribe + subscribe_proxy = @channel_proxy.add_subscribe_operation(async_api_channel_item) diff --git a/lib/event_source/configure/config.rb b/lib/event_source/configure/config.rb index 33ffe46d..5c4423ce 100644 --- a/lib/event_source/configure/config.rb +++ b/lib/event_source/configure/config.rb @@ -8,8 +8,13 @@ module Configure class Config include EventSource::Logging + def initialize + @log_level = :warn + end + # TODO: add default for pub_sub_root attr_writer :pub_sub_root, :protocols, :server_configurations + attr_accessor :app_name, :log_level def load_protocols @protocols.each do |protocol| @@ -164,8 +169,6 @@ def delimiter(protocol) '.' end end - - attr_accessor :app_name end end end diff --git a/lib/event_source/error.rb b/lib/event_source/error.rb index 01f75efa..ade7918a 100644 --- a/lib/event_source/error.rb +++ b/lib/event_source/error.rb @@ -31,5 +31,6 @@ class Error < StandardError ConnectionNotFound = Class.new(Error) ServerConfigurationNotFound = Class.new(Error) ServerConfigurationInvalid = Class.new(Error) + MessageBuildError = Class.new(Error) end end diff --git a/lib/event_source/event.rb b/lib/event_source/event.rb index 02ae58bb..fdc77ae4 100644 --- a/lib/event_source/event.rb +++ b/lib/event_source/event.rb @@ -8,7 +8,7 @@ class Event # @attr_reader [Array] attribute_keys optional list of attributes that must be included in { Payload } # @attr_reader [String] publisher_path namespaced key indicating the class that registers event for publishing # @attr_reader [String] payload attribute/value pairs for the message that accompanies the event - attr_reader :attribute_keys, :publisher_path, :payload, :headers, :metadata + attr_reader :attribute_keys, :publisher_path, :payload, :headers, :metadata, :message HeaderDefaults = { version: '3.0', @@ -26,12 +26,22 @@ def initialize(options = {}) send(:headers=, options[:headers] || {}) @publisher_path = klass_var_for(:publisher_path) || nil + build_message(options) if headers.delete(:build_message) + if @publisher_path.eql?(nil) raise EventSource::Error::PublisherKeyMissing, "add 'publisher_path' to #{self.class.name}" end end + def build_message(options) + @message = EventSource::Message.new( + headers: options[:headers], + payload: options[:attributes], + event_name: name + ) + end + # Set payload # @overload payload=(payload) # @param [Hash] payload New payload @@ -127,6 +137,7 @@ def set_instance_variable_for(element, value) def validate_attribute_presence return unless attribute_keys.present? + gapped_keys = attribute_keys - payload.keys @event_errors = [] event_errors.push("missing required keys: #{gapped_keys}") unless gapped_keys.empty? diff --git a/lib/event_source/logging.rb b/lib/event_source/logging.rb index 5a0b38b7..0fdb0db1 100644 --- a/lib/event_source/logging.rb +++ b/lib/event_source/logging.rb @@ -24,7 +24,7 @@ def logger ::Logging.appenders.rolling_file( 'log/event_source.log', age: 'daily', - level: :debug, + level: EventSource.config.log_level, keep: 7, layout: ::Logging.layouts.json ) diff --git a/lib/event_source/message.rb b/lib/event_source/message.rb new file mode 100644 index 00000000..4666befd --- /dev/null +++ b/lib/event_source/message.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require "forwardable" + +module EventSource + # Construct async api message object + class Message + extend Forwardable + + def initialize(options = {}) + message_options = build_message(options) + @message = create_message(message_options) + end + + def_delegators :@message, :payload, :headers + + private + + def build_message(options) + result = EventSource::Operations::BuildMessageOptions.new.call(options) + unless result.success? + raise EventSource::Error::MessageBuildError, + "unable to build message options due to #{result.failure}" + end + result.success + end + + def create_message(options) + result = EventSource::Operations::CreateMessage.new.call(options) + unless result.success? + raise EventSource::Error::MessageBuildError, + "unable to create message due to #{result.failure}" + end + result.success + end + end +end diff --git a/lib/event_source/operations/build_message.rb b/lib/event_source/operations/build_message.rb new file mode 100644 index 00000000..1a9f1cc7 --- /dev/null +++ b/lib/event_source/operations/build_message.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" + +module EventSource + module Operations + # create message + class BuildMessage + include Dry::Monads[:result, :do] + + def call(params) + values = yield build_options(params) + message = yield create_message(values) + + Success(message) + end + + private + + def build_options(params) + result = BuildMessageOptions.new.call(params) + result.success? ? result : Failure(result.errors.to_h) + end + + def create_message(values) + CreateMessage.new.call(values) + end + end + end +end diff --git a/lib/event_source/operations/build_message_options.rb b/lib/event_source/operations/build_message_options.rb new file mode 100644 index 00000000..4456ae9c --- /dev/null +++ b/lib/event_source/operations/build_message_options.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" +require "securerandom" + +module EventSource + module Operations + # extract message options + class BuildMessageOptions + include Dry::Monads[:result, :do] + + def call(params) + headers = yield build_headers(params) + payload = yield build_payload(params) + headers = yield append_account_details(headers) + + Success(headers: headers, payload: payload) + end + + private + + def build_headers(params) + headers = params[:headers]&.symbolize_keys || {} + headers[:correlation_id] ||= SecureRandom.uuid + headers[:message_id] ||= SecureRandom.uuid + headers[:event_name] ||= params[:event_name] + headers[:event_time] = headers[:event_time]&.utc + + Success(headers) + end + + def build_payload(params) + payload = params[:payload]&.symbolize_keys || {} + + Success(payload) + end + + def append_account_details(headers) + output = FetchSession.new.call + return output unless output.success? + + session, current_user, system_account = output.value! + account = {} + + if session.present? && current_user.present? + account[:session] = session&.symbolize_keys + account[:id] = current_user&.id&.to_s + else + account[:id] = system_account&.id&.to_s + end + headers[:account] = account + + Success(headers) + end + end + end +end diff --git a/lib/event_source/operations/create_message.rb b/lib/event_source/operations/create_message.rb new file mode 100644 index 00000000..f0d0b410 --- /dev/null +++ b/lib/event_source/operations/create_message.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" + +module EventSource + module Operations + # create message + class CreateMessage + include Dry::Monads[:result, :do] + + def call(params) + values = yield build(params) + message = yield create(values) + + Success(message) + end + + private + + def build(params) + result = + ::EventSource::AsyncApi::Contracts::MessageContract.new.call(params) + + result.success? ? Success(result.to_h) : Failure(result.errors.to_h) + end + + def create(values) + Success(EventSource::AsyncApi::Message.new(values)) + end + end + end +end diff --git a/lib/event_source/operations/fetch_session.rb b/lib/event_source/operations/fetch_session.rb new file mode 100644 index 00000000..ce2ce83e --- /dev/null +++ b/lib/event_source/operations/fetch_session.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" + +module EventSource + module Operations + # fetch session + class FetchSession + include Dry::Monads[:result, :do] + + def call + helper = yield include_session_helper + session = yield fetch_session + current_user = yield fetch_current_user + system_account = yield fetch_system_account + + Success([session, current_user, system_account]) + end + + private + + def include_session_helper + self.class.include(::SessionConcern) + + Success(::SessionConcern) + rescue NameError => e + Failure(e.to_s) + end + + def fetch_session + if respond_to?(:session) + Success(session) + else + Failure("session is not defined") + end + end + + def fetch_current_user + if respond_to?(:current_user) + Success(current_user) + else + Failure("current_user is not defined") + end + end + + def fetch_system_account + if respond_to?(:system_account) + Success(system_account) + else + Failure("system_account is not defined") + end + end + end + end +end diff --git a/lib/event_source/protocols/amqp/bunny_channel_proxy.rb b/lib/event_source/protocols/amqp/bunny_channel_proxy.rb index 4c6179db..b7c517b2 100644 --- a/lib/event_source/protocols/amqp/bunny_channel_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_channel_proxy.rb @@ -79,6 +79,40 @@ def add_subscribe_operation(_async_api_subscribe_operation) add_queue end + def create_exchange_to_exchange_bindings(exchange_proxy) + exchange_to_exchange_bindings&.each do |exchange_name, options| + options.deep_symbolize_keys! + source_exchange = exchange_by_name(exchange_name) + source_exchange ||= exchange_proxy.bunny_exchange_for(options[:bindings][:amqp][:exchange]) + + bind_exchange_to_exchange(source_exchange.name, exchange_proxy.name, options) + end + end + + def publish_bindings + async_api_channel_item&.publish&.bindings + end + + def exchange_to_exchange_bindings + publish_bindings&.x_amqp_exchange_to_exchanges + end + + def bind_exchange_to_exchange(source_name, destination_name, options) + bind_exchange( + source_name, + destination_name, + { routing_key: options[:routing_key] } + ) + logger.info "Exchange #{destination_name} bound to exchange #{source_name}" + rescue Bunny::NotFound => e + raise EventSource::Protocols::Amqp::Error::ExchangeNotFoundError, + "exchange #{source_name} not found. got exception #{e}" + end + + def exchange_name_from_tag(subscribe_operation_item) + subscribe_operation_item.tags&.detect {|tag| tag.description == "exchange name"}&.name + end + # @return [String] a human-readable summary for this channel def to_s subject.to_s @@ -109,6 +143,10 @@ def bind_queue(name, exchange, options = {}) subject.queue_bind(name, exchange, options) end + def bind_exchange(name, exchange, options = {}) + subject.exchange_bind(name, exchange, options) + end + def respond_to_missing?(name, include_private); end def method_missing(name, *args) diff --git a/lib/event_source/protocols/amqp/bunny_connection_proxy.rb b/lib/event_source/protocols/amqp/bunny_connection_proxy.rb index 8a40484c..c549266e 100644 --- a/lib/event_source/protocols/amqp/bunny_connection_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_connection_proxy.rb @@ -180,7 +180,7 @@ def connection_credentials_from_server(server) if URI(url) amqp_url = URI.parse(url) return {} unless amqp_url.userinfo - { username: URI.unescape(amqp_url.user), password: URI.unescape(amqp_url.password) } + { username: CGI.unescape(amqp_url.user), password: CGI.unescape(amqp_url.password) } else {} end diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 1bf62634..28e735c1 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -45,6 +45,7 @@ def bunny_exchange_for(bindings) def publish(payload:, publish_bindings:, headers: {}) bunny_publish_bindings = sanitize_bindings((publish_bindings || {}).to_h) bunny_publish_bindings[:correlation_id] = headers.delete(:correlation_id) if headers[:correlation_id] + bunny_publish_bindings[:message_id] = headers.delete(:message_id) if headers[:message_id] bunny_publish_bindings[:headers] = headers unless headers.empty? logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}" diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 2b431e55..89c07b58 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -117,41 +117,6 @@ def convert_subscriber_prefetch(options) def resolve_subscriber_routing_keys(channel, operation); end - # def register_subscription(subscriber_klass, bindings) - # consumer_proxy = consumer_proxy_for(bindings) - - # consumer_proxy.on_delivery do |delivery_info, metadata, payload| - # on_receive_message( - # subscriber_klass, - # delivery_info, - # metadata, - # payload - # ) - # end - - # subscribe_consumer(consumer_proxy) - # end - - # def subscribe_consumer(consumer_proxy) - # @subject.subscribe_with(consumer_proxy) - # @consumers.push(consumer_proxy) - # end - - # def consumer_proxy_for(bindings) - # operation_bindings = convert_to_consumer_options(bindings[:amqp]) - - # logger.debug 'consumer proxy options:' - # logger.debug operation_bindings.inspect - - # BunnyConsumerProxy.new( - # @subject.channel, - # @subject, - # '', - # operation_bindings[:no_ack], - # operation_bindings[:exclusive] - # ) - # end - def on_receive_message( subscriber_klass, delivery_info, @@ -164,17 +129,7 @@ def on_receive_message( logger.debug metadata.inspect logger.debug payload.inspect - if delivery_info.routing_key - routing_key = [app_name, delivery_info.routing_key].join(delimiter) - executable = subscriber_klass.executable_for(routing_key) - end - - unless executable - routing_key = [app_name, exchange_name].join(delimiter) - executable = subscriber_klass.executable_for(routing_key) - end - - logger.debug "routing_key: #{routing_key}" + executable = find_executable(subscriber_klass, delivery_info) return unless executable subscriber = subscriber_klass.new @@ -196,6 +151,13 @@ def on_receive_message( subscriber = nil end + def find_executable(subscriber_klass, delivery_info) + subscriber_suffix = subscriber_klass_name_to_suffix(subscriber_klass) + + find_executable_for_routing_key(subscriber_klass, delivery_info, subscriber_suffix) || + find_default_executable(subscriber_klass, subscriber_suffix) + end + def respond_to_missing?(name, include_private); end # Forward all missing method calls to the Bunny::Queue instance @@ -205,6 +167,22 @@ def method_missing(name, *args) private + def subscriber_klass_name_to_suffix(subscriber_klass) + subscriber_klass.name.downcase.gsub("::", '_') + end + + def find_executable_for_routing_key(subscriber_klass, delivery_info, subscriber_suffix) + return unless delivery_info.routing_key + + routing_key = [app_name, delivery_info.routing_key].join(delimiter) + subscriber_klass.executable_for(routing_key + "_#{subscriber_suffix}") + end + + def find_default_executable(subscriber_klass, subscriber_suffix) + default_routing_key = [app_name, exchange_name].join(delimiter) + subscriber_klass.executable_for(default_routing_key + "_#{subscriber_suffix}") + end + def delimiter EventSource.delimiter(:amqp) end diff --git a/lib/event_source/protocols/http/soap/payload_header_middleware.rb b/lib/event_source/protocols/http/soap/payload_header_middleware.rb index 89b8d75f..37f75c64 100644 --- a/lib/event_source/protocols/http/soap/payload_header_middleware.rb +++ b/lib/event_source/protocols/http/soap/payload_header_middleware.rb @@ -3,10 +3,13 @@ module EventSource module Protocols module Http + # soap module module Soap # Add SOAP security headers and body around the payload. class PayloadHeaderMiddleware < Faraday::Middleware def on_request(env) + return env if env.request_headers["Authorization"] + sec_config = SecurityHeaderConfiguration.new(options[:soap_settings]) body_to_encode = env.body || "" decorate_result = Operations::DecoratePayloadUsingConfiguration.new.call( diff --git a/lib/event_source/publisher.rb b/lib/event_source/publisher.rb index 5032a0b1..d5aa9b3c 100644 --- a/lib/event_source/publisher.rb +++ b/lib/event_source/publisher.rb @@ -66,7 +66,10 @@ def publish(event) logger.debug "Publisher#publish publish_operation_name: #{publish_operation_name}" publish_operation = find_publish_operation_for(publish_operation_name) - publish_operation.call(event.payload, {headers: event.headers}) + payload = event.message&.payload || event.payload + headers = event.message&.headers || event.headers + + publish_operation.call(payload, {headers: headers}) end def channel_name diff --git a/lib/event_source/railtie.rb b/lib/event_source/railtie.rb index 5994b706..ea971274 100644 --- a/lib/event_source/railtie.rb +++ b/lib/event_source/railtie.rb @@ -3,9 +3,6 @@ module EventSource # :nodoc: class Railtie < Rails::Railtie - - rake_tasks do - load 'event_source/tasks/event_source.rake' - end + end end \ No newline at end of file diff --git a/lib/event_source/ruby_versions.rb b/lib/event_source/ruby_versions.rb new file mode 100644 index 00000000..80df5eea --- /dev/null +++ b/lib/event_source/ruby_versions.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module EventSource + # Helper class to tell us under which ruby version we are operating. + class RubyVersions + CURRENT_VERSION = Gem::Version.new(RUBY_VERSION) + + VERSION_THREE = Gem::Version.new("3.0.0") + VERSION_THREE_ONE = Gem::Version.new("3.1.0") + + LESS_THAN_THREE = CURRENT_VERSION < VERSION_THREE + LESS_THAN_THREE_ONE = CURRENT_VERSION < VERSION_THREE_ONE + end +end \ No newline at end of file diff --git a/lib/event_source/subscriber.rb b/lib/event_source/subscriber.rb index d3b1cd09..d62257a6 100644 --- a/lib/event_source/subscriber.rb +++ b/lib/event_source/subscriber.rb @@ -100,11 +100,15 @@ def subscribe(subscription_name, &block) unless formatted_publisher_key.gsub(delimiter, '_') == identifier unique_key_elements.push(identifier) end - logger.debug "Subscriber#susbcribe Unique_key #{unique_key_elements.join(delimiter)}" + return unless block_given? + subscriber_suffix = self.name.downcase.gsub('::', '_') + subscriber_unique_key = unique_key_elements.join(delimiter) + "_#{subscriber_suffix}" + logger.debug "Subscriber#susbcribe Unique key #{subscriber_unique_key}" + EventSource::Subscriber.executable_container[ - unique_key_elements.join(delimiter) + subscriber_unique_key ] = block end diff --git a/lib/event_source/tasks/event_source.rake b/lib/event_source/tasks/event_source.rake deleted file mode 100644 index 750c0a6a..00000000 --- a/lib/event_source/tasks/event_source.rake +++ /dev/null @@ -1,19 +0,0 @@ -# frozen_string_literal: true - -# require 'resque_bus/tasks' -# will give you these tasks - -require "queue_bus/tasks" -require "resque_bus/tasks" -require "resque/tasks" - -namespace :event_source do - desc "Subscribes this application to Event Source events" - task :subscribe => ["queuebus:preload", "queuebus:subscribe"] - - desc "Start a Event Source worker for subscription queues" - task :work => ["queuebus:preload", "queuebus:setup", "resque:work"] - - desc "Start a Event Source worker for incoming driver queue" - task :driver => ["queuebus:preload", "queuebus:driver", "resque:work"] -end \ No newline at end of file diff --git a/lib/event_source/uris/amqp_uri.rb b/lib/event_source/uris/amqp_uri.rb index 3775a78e..315a145b 100644 --- a/lib/event_source/uris/amqp_uri.rb +++ b/lib/event_source/uris/amqp_uri.rb @@ -7,5 +7,10 @@ module URI class AMQP < Generic DEFAULT_PORT = 5672 end - @@schemes['AMQP'] = AMQP + + if EventSource::RubyVersions::LESS_THAN_THREE_ONE + @@schemes['AMQP'] = AMQP + else + register_scheme 'AMQP', AMQP + end end diff --git a/lib/event_source/version.rb b/lib/event_source/version.rb index a2989776..4bdf4d94 100644 --- a/lib/event_source/version.rb +++ b/lib/event_source/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module EventSource - VERSION = "0.5.7" + VERSION = "0.5.8" end diff --git a/spec/config_helper.rb b/spec/config_helper.rb index ff3ea25a..d3d11654 100644 --- a/spec/config_helper.rb +++ b/spec/config_helper.rb @@ -2,6 +2,7 @@ EventSource.configure do |config| config.protocols = %w[amqp http] + config.log_level = :warn end EventSource.initialize! diff --git a/spec/event_source/channel_spec.rb b/spec/event_source/channel_spec.rb new file mode 100644 index 00000000..3f1020cc --- /dev/null +++ b/spec/event_source/channel_spec.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'config_helper' + +RSpec.describe EventSource::Channel do + let(:protocol) { :amqp } + let(:url) { 'amqp://localhost:5672/' } + let(:protocol_version) { '0.9.1' } + let(:description) { 'Development RabbitMQ Server' } + + let(:my_server) do + { + url: url, + protocol: protocol, + protocol_version: protocol_version, + description: description + } + end + + let(:client) do + EventSource::Protocols::Amqp::BunnyConnectionProxy.new(my_server) + end + + let(:connection) { EventSource::Connection.new(client) } + + let(:publish_resource_path) do + Pathname.pwd.join('spec', 'support', 'asyncapi', 'amqp_audit_log_publish.yml') + end + + let(:subscribe_resource_path) do + Pathname.pwd.join('spec', 'support', 'asyncapi', 'amqp_audit_log_subscribe.yml') + end + + let(:publish_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: publish_resource_path) + .success + end + + let(:subscribe_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: subscribe_resource_path) + .success + end + + let(:connection_proxy) { connection.connection_proxy } + let(:channel_proxy) { connection_proxy.add_channel(channel_item_key, publish_resource.channels.first) } + let(:subscribe_channel_proxy) { connection_proxy.add_channel(subscribe_channel_item_key, subscribe_resource.channels.first) } + + subject do + described_class.new(connection, channel_proxy, publish_resource.channels.first) + described_class.new(connection, subscribe_channel_proxy, subscribe_resource.channels.first) + end + + context 'When exchange to exchange bindings present' do + + let(:channel_item_key) { 'enroll.audit_log.events.created' } + let(:subscribe_channel_item_key) { 'on_enroll.enroll.audit_log.events' } + + before { connection.start unless connection.active? } + after { connection.disconnect if connection.active? } + + let(:exchange_names) { ['enroll.audit_log.events', 'enroll.enterprise.events', 'enroll.individual.enrollments'] } + let(:audit_log_queue) { subscribe_channel_proxy.queues["on_enroll.enroll.audit_log.events"] } + + it 'should create exchanges' do + exchange_names.each {|key| expect(channel_proxy.exchanges).not_to include(key) } + subject + exchange_names.each {|key| expect(channel_proxy.exchanges).to include(key) } + end + + it 'should route messsages to audit log through exchange to exchange bindings' do + subject + + audit_log_queue.purge + sleep 1 + + channel_proxy.exchanges['enroll.enterprise.events'].publish( + 'test message from enterprise events!!', + routing_key: 'enroll.enterprise.date_advanced' + ) + channel_proxy.exchanges['enroll.individual.enrollments'].publish( + 'test message from enrollment events!!', + routing_key: 'enroll.individual.enrollments.coverage_selected' + ) + + sleep 1 + expect(audit_log_queue.message_count).to eq 2 + expect(audit_log_queue.pop.last).to eq 'test message from enterprise events!!' + expect(audit_log_queue.pop.last).to eq 'test message from enrollment events!!' + end + end +end diff --git a/spec/event_source/event_spec.rb b/spec/event_source/event_spec.rb index c67b0d82..b0d5058f 100644 --- a/spec/event_source/event_spec.rb +++ b/spec/event_source/event_spec.rb @@ -16,26 +16,26 @@ module EventSource class MyValidEvent < EventSource::Event - publisher_path 'parties.organization_publisher' + publisher_path "parties.organization_publisher" end class MyEvent < EventSource::Event - publisher_path 'parties.organization_publisher' + publisher_path "parties.organization_publisher" attribute_keys :hbx_id, :fein, :entity_kind end class MyEventTwo < EventSource::Event - publisher_path 'parties.organization_publisher' + publisher_path "parties.organization_publisher" end class MyEventThree < EventSource::Event - publisher_path 'parties.organization_publisher' + publisher_path "parties.organization_publisher" attribute_keys :hbx_id, :entity_kind, :fein, :legal_name end end RSpec.describe EventSource::Event do - context 'A new Event class' do + context "A new Event class" do # context "and a required publisher_path isn't provided" do # let(:empty_event_class) do # class MyEmptyEvent < EventSource::Event @@ -67,7 +67,7 @@ class MyEventThree < EventSource::Event # end # end - context 'and the required publisher_path provided is valid' do + context "and the required publisher_path provided is valid" do let(:valid_event_class) { EventSource::MyValidEvent } subject { valid_event_class.new } @@ -76,22 +76,22 @@ class MyEventThree < EventSource::Event # expect(subject.publisher_class).to be_a(Parties::OrganizationPublisher) # end - it 'should have an event_key' do - expect(subject.name).to eq 'event_source.my_valid_event' + it "should have an event_key" do + expect(subject.name).to eq "event_source.my_valid_event" end end - context 'with a defined contract_class' do + context "with a defined contract_class" do context "and the contract_class isn't defined" do - it 'should raise an EventSource::Errors::ContractNotDefined' + it "should raise an EventSource::Errors::ContractNotDefined" end end end - context 'An initialized Event class with defined attribute_keys' do + context "An initialized Event class with defined attribute_keys" do let(:event_class) { EventSource::MyEvent } - it 'keys should be initialized for each attribute' do + it "keys should be initialized for each attribute" do expect(event_class.new.attribute_keys).to eq %i[hbx_id fein entity_kind] end @@ -105,12 +105,12 @@ class MyEventThree < EventSource::Event # end # end - context 'and all attribute values are present' do + context "and all attribute values are present" do let(:attributes) do - { hbx_id: '553234', entity_kind: 'c_corp', fein: '546232323' } + { hbx_id: "553234", entity_kind: "c_corp", fein: "546232323" } end - it '#valid? should return true' do + it "#valid? should return true" do expect(subject.valid?).to be_truthy end end @@ -137,82 +137,82 @@ class MyEventThree < EventSource::Event # end end - context 'An initialized Event class with no attribute_keys' do + context "An initialized Event class with no attribute_keys" do let(:event_class) { EventSource::MyEventTwo } subject { event_class.new } - it 'attribute_keys should be empty' do + it "attribute_keys should be empty" do expect(subject.attribute_keys).to be_empty end - context 'with no attributes passed' do - it '#event_errors should be empty' do + context "with no attributes passed" do + it "#event_errors should be empty" do expect(subject.event_errors).to be_empty end - it '#valid? should return true' do + it "#valid? should return true" do expect(subject.valid?).to be_truthy end - it 'attributes should be an empty hash' do + it "attributes should be an empty hash" do expect(subject.payload).to be_empty end end - context 'and with attributes passed' do + context "and with attributes passed" do let(:attributes) do { - hbx_id: '553234', - entity_kind: 'c_corp', - fein: '546232323', - legal_name: 'Test Corp LLC' + hbx_id: "553234", + entity_kind: "c_corp", + fein: "546232323", + legal_name: "Test Corp LLC" } end subject { event_class.new(attributes: attributes) } - it '#event_errors should be empty' do + it "#event_errors should be empty" do expect(subject.event_errors).to be_empty end - it '#valid? should return true' do + it "#valid? should return true" do expect(subject.valid?).to be_truthy end - it 'should have all attributes' do + it "should have all attributes" do expect(subject.payload).to eq attributes end end end - context 'An initialized Event class with attribute_keys' do + context "An initialized Event class with attribute_keys" do let(:event_class) { EventSource::MyEventThree } - context 'with attributes passed' do + context "with attributes passed" do let(:attributes) do { - hbx_id: '553234', - entity_kind: 'c_corp', - fein: '546232323', - legal_name: 'Test Corp LLC' + hbx_id: "553234", + entity_kind: "c_corp", + fein: "546232323", + legal_name: "Test Corp LLC" } end subject { event_class.new(attributes: attributes) } - it 'attribute_keys should be present' do + it "attribute_keys should be present" do expect(subject.attribute_keys).to eq %i[ - hbx_id - entity_kind - fein - legal_name - ] + hbx_id + entity_kind + fein + legal_name + ] end - it '#event_errors should be empty' do + it "#event_errors should be empty" do expect(subject.event_errors).to be_empty end - it '#valid? should return true' do + it "#valid? should return true" do expect(subject.valid?).to be_truthy end - it 'should have all attributes' do + it "should have all attributes" do expect(subject.payload).to eq attributes end end @@ -236,17 +236,61 @@ class MyEventThree < EventSource::Event # end # end - context 'with attribute getter' do - let(:attributes) { { hbx_id: '553234', fein: '546232323' } } + context "with attribute getter" do + let(:attributes) { { hbx_id: "553234", fein: "546232323" } } subject { event_class.new(attributes: attributes) } - context 'when attribute name is passed' do - it 'should return the value' do + context "when attribute name is passed" do + it "should return the value" do expect(subject[:fein]).to eq attributes[:fein] expect(subject[:hbx_id]).to eq attributes[:hbx_id] end end end end + + describe "message composite event" do + module EventSource + class MyCustomEvent < EventSource::Event + + publisher_path "parties.organization_publisher" + end + end + + module SessionConcern + def current_user + OpenStruct.new(id: 1) + end + + def session + { "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" } + end + end + + context "when event is message composite" do + let(:options) do + { + payload: { + subject_id: "gid://enroll/Person/53e693d7eb899ad9ca01e734", + category: "hc4cc eligibility", + event_time: DateTime.now + }, + headers: { + correlation_id: "edf0e41b-891a-42b1-a4b6-2dbd97d085e4", + build_message: true + } + } + end + + subject { EventSource::MyCustomEvent.new(options) } + + it "should build message" do + expect(subject.message).to be_present + expect(subject.message.headers[:account][:session]).to include( + :session_id + ) + end + end + end end diff --git a/spec/event_source/logging_spec.rb b/spec/event_source/logging_spec.rb index ee947e1e..e2fc0aaf 100644 --- a/spec/event_source/logging_spec.rb +++ b/spec/event_source/logging_spec.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require 'config_helper' require 'event_source/logging' class LogService diff --git a/spec/event_source/message_spec.rb b/spec/event_source/message_spec.rb new file mode 100644 index 00000000..b4c4396d --- /dev/null +++ b/spec/event_source/message_spec.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe EventSource::Message do + module SessionConcern + def current_user + OpenStruct.new(id: 1) + end + + def session + { + "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb", + "portal" => "enroll/families/home", + "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" + } + end + end + + context "input params passed" do + let(:input_params) do + { + payload: { + record: double + }, + headers: { + event_name: "events.hc4cc.eligibilities.created", + event_category: "hc4cc_eligibility", + event_time: DateTime.now, + event_outcome: "eligibility created", + subject_id: "gid://enroll/Person/53e693d7eb899ad9ca01e734", + resource_id: "gid://enroll/Eligibility/53e693d7eb899ad9ca01e734", + market_kind: "individual" + }, + event_name: "enroll.events.person.hc4cc_eligibility.created" + } + end + + context "when params passed" do + it "should create message entity" do + message = described_class.new(input_params) + + expect(message).to be_a(EventSource::Message) + end + + it "should have headers on the message" do + message = described_class.new(input_params) + + expect(message.headers).to be_a(Hash) + expect(message.headers.keys).to match_array( + %i[ + correlation_id + subject_id + resource_id + event_category + message_id + event_name + event_time + event_outcome + market_kind + account + ] + ) + end + + it "should have payload on the message" do + message = described_class.new(input_params) + + expect(message.payload).to be_a(Hash) + expect(message.payload.keys).to match_array(%i[record]) + end + + it "should have payload with session on the message" do + message = described_class.new(input_params) + + expect(message.headers[:account][:session]).to be_a(Hash) + expect(message.headers[:account][:session].keys).to match_array( + %i[session_id portal login_session_id] + ) + end + end + end +end diff --git a/spec/event_source/operations/build_message_options_spec.rb b/spec/event_source/operations/build_message_options_spec.rb new file mode 100644 index 00000000..b012bfb7 --- /dev/null +++ b/spec/event_source/operations/build_message_options_spec.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe EventSource::Operations::BuildMessageOptions do + module SessionConcern + def current_user + OpenStruct.new(id: 1) + end + + def session + { + "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb", + "portal" => "enroll/families/home", + "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" + } + end + end + + context "input params passed" do + let(:input_params) do + { + payload: { + record: double + }, + headers: { + event_name: "events.hc4cc.eligibilities.created", + event_category: "hc4cc_eligibility", + event_time: DateTime.now, + event_outcome: "eligibility created", + subject_id: "gid://enroll/Person/53e693d7eb899ad9ca01e734", + resource_id: "gid://enroll/Eligibility/53e693d7eb899ad9ca01e734", + market_kind: "individual" + }, + event_name: "enroll.events.person.hc4cc_eligibility.created" + } + end + + context "when params passed" do + it "should return success" do + result = subject.call(input_params) + + expect(result.success?).to be_truthy + end + + it "should build headers" do + result = subject.call(input_params) + message_options = result.value! + + expect(message_options[:headers]).to be_a(Hash) + expect(message_options[:headers].keys).to match_array( + %i[ + correlation_id + subject_id + resource_id + event_category + message_id + event_name + event_time + event_outcome + market_kind + account + ] + ) + end + + it "should build payload" do + result = subject.call(input_params) + message_options = result.value! + + expect(message_options[:payload]).to be_a(Hash) + expect(message_options[:payload].keys).to match_array(%i[record]) + end + + it "should build session options" do + result = subject.call(input_params) + message_options = result.value! + + expect(message_options[:headers][:account][:session]).to be_a(Hash) + expect(message_options[:headers][:account][:session].keys).to match_array( + %i[session_id portal login_session_id] + ) + end + end + end +end diff --git a/spec/event_source/operations/build_message_spec.rb b/spec/event_source/operations/build_message_spec.rb new file mode 100644 index 00000000..fc41cbfa --- /dev/null +++ b/spec/event_source/operations/build_message_spec.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe EventSource::Operations::BuildMessage do + module SessionConcern + def current_user + OpenStruct.new(id: 1) + end + + def session + { + "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb", + "portal" => "enroll/families/home", + "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" + } + end + end + + context "input params passed" do + let(:input_params) do + { + payload: { + record: double + }, + headers: { + event_name: "events.hc4cc.eligibilities.created", + event_category: "hc4cc_eligibility", + event_time: DateTime.now, + event_outcome: "eligibility created", + subject_id: "gid://enroll/Person/53e693d7eb899ad9ca01e734", + resource_id: "gid://enroll/Eligibility/53e693d7eb899ad9ca01e734", + market_kind: "individual" + }, + event_name: "enroll.events.person.hc4cc_eligibility.created" + } + end + + context "when session available" do + it "should message options session options" do + result = subject.call(input_params) + + expect(result.success?).to be_truthy + expect(result.success).to be_a(EventSource::AsyncApi::Message) + end + end + end +end diff --git a/spec/event_source/operations/fetch_session_spec.rb b/spec/event_source/operations/fetch_session_spec.rb new file mode 100644 index 00000000..29ce65a1 --- /dev/null +++ b/spec/event_source/operations/fetch_session_spec.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe EventSource::Operations::FetchSession do + let(:fetch_session) { described_class.new } + + describe "when session helper not defined" do + before do + allow(fetch_session).to receive(:respond_to?).with(:session).and_return( + respond_to_session + ) + allow(fetch_session).to receive(:respond_to?).with( + :current_user + ).and_return(respond_to_current_user) + end + + let(:respond_to_session) { true } + let(:respond_to_current_user) { true } + + context "when current user not defined" do + let(:respond_to_current_user) { false } + + it "should fail" do + result = fetch_session.call + + expect(result.success?).to be_falsey + expect(result.failure).to eq "current_user is not defined" + end + end + + context "when session not defined" do + let(:respond_to_session) { false } + + it "should fail" do + result = fetch_session.call + + expect(result.success?).to be_falsey + expect(result.failure).to eq "session is not defined" + end + end + end + + describe "when session helper defined" do + context "when operation called" do + module SessionConcern + def current_user + OpenStruct.new(id: 1) + end + + def session + { + "session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb", + "portal" => "enroll/families/home", + "login_session_id" => "ad465b7f-1d9e-44b1-ba72-b97e166f3acb" + } + end + + def system_account + OpenStruct.new(id: 2) + end + end + + let(:session_concern) { Class.new.extend(SessionConcern) } + + it "should return session and current user" do + result = fetch_session.call + + expect(result.success?).to be_truthy + expect(result.value!).to eq( + [ + session_concern.session, + session_concern.current_user, + session_concern.system_account + ] + ) + end + end + end +end diff --git a/spec/event_source/protocols/amqp/bunny_connection_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_connection_proxy_spec.rb index 06168528..64cc1051 100644 --- a/spec/event_source/protocols/amqp/bunny_connection_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_connection_proxy_spec.rb @@ -187,7 +187,7 @@ # rubocop:disable Lint/UriEscapeUnescape let(:connection_url) do - "amqp://#{URI.escape(username)}:#{URI.escape(password)}@localhost:5672" + "amqp://#{CGI.escape(username)}:#{CGI.escape(password)}@localhost:5672" end # rubocop:enable Lint/UriEscapeUnescape diff --git a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb index 7f4aeabb..a1758b38 100644 --- a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb @@ -168,4 +168,157 @@ # end # end end + + context "executable lookup with subscriber suffix" do + let(:connection_manager) { EventSource::ConnectionManager.instance } + let!(:connection) { connection_manager.add_connection(my_server) } + + let(:event_log_subscriber) do + Pathname.pwd.join( + "spec", + "rails_app", + "app", + "event_source", + "subscribers", + "event_log_subscriber.rb" + ) + end + + let(:enterprise_subscriber) do + Pathname.pwd.join( + "spec", + "rails_app", + "app", + "event_source", + "subscribers", + "enterprise_subscriber.rb" + ) + end + + let(:publish_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call( + path: + Pathname.pwd.join( + "spec", + "support", + "asyncapi", + "amqp_audit_log_publish.yml" + ) + ) + .success + end + + let(:subscribe_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call( + path: + Pathname.pwd.join( + "spec", + "support", + "asyncapi", + "amqp_audit_log_subscribe.yml" + ) + ) + .success + end + + let(:subscribe_two_resource) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call( + path: + Pathname.pwd.join( + "spec", + "support", + "asyncapi", + "amqp_enterprise_subscribe.yml" + ) + ) + .success + end + + let(:publish_channel) do + connection.add_channel( + "enroll.audit_log.events.created", + publish_resource.channels.first + ) + end + let(:subscribe_channel) do + connection.add_channel( + "on_enroll.enroll.audit_log.events", + subscribe_resource.channels.first + ) + end + let(:subscribe_two_channel) do + connection.add_channel( + "on_enroll.enroll.enterprise.events", + subscribe_two_resource.channels.first + ) + end + + let(:load_subscribers) do + [event_log_subscriber, enterprise_subscriber].each do |file| + require file.to_s + end + end + + before do + allow(EventSource).to receive(:app_name).and_return("enroll") + connection.start unless connection.active? + publish_channel + subscribe_channel + subscribe_two_channel + load_subscribers + allow(subject).to receive(:exchange_name) { exchange_name } + end + + let(:audit_log_proc) do + EventSource::Subscriber.executable_container[ + "enroll.enroll.audit_log.events_subscribers_eventlogsubscriber" + ] + end + + let(:enterprise_advance_day_proc) do + EventSource::Subscriber.executable_container[ + "enroll.enroll.enterprise.events.date_advanced_subscribers_enterprisesubscriber" + ] + end + + context "when routing key based executable is not found" do + let(:delivery_info) do + double(routing_key: "enroll.enterprise.events.date_advanced") + end + + let(:exchange_name) { "enroll.audit_log.events" } + + it "should return default audit log proc" do + executable = + subject.find_executable( + Subscribers::EventLogSubscriber, + delivery_info + ) + expect(executable).to match(audit_log_proc) + end + end + + context "when routing key based executable is found" do + let(:delivery_info) do + double(routing_key: "enroll.enterprise.events.date_advanced") + end + + let(:exchange_name) { "enroll.enterprise.events" } + + it "should return executable for the routing key" do + executable = + subject.find_executable( + Subscribers::EnterpriseSubscriber, + delivery_info + ) + expect(executable).to match(enterprise_advance_day_proc) + end + end + end end diff --git a/spec/rails_app/app/event_source/adapters/parties/queue_bus_adapter.rb b/spec/rails_app/app/event_source/adapters/parties/queue_bus_adapter.rb deleted file mode 100644 index 23d1903d..00000000 --- a/spec/rails_app/app/event_source/adapters/parties/queue_bus_adapter.rb +++ /dev/null @@ -1,49 +0,0 @@ -# frozen_string_literal: true -# # frozen_string_literal: true - -# module Parties -# class QueueBusAdapter < EventSource::Adapter - -# def enabled! -# require 'resque-bus' - -# # event_source_root = Rails.root.join('app', 'event_source') - -# # publishers_dir = event_source_root.join('publishers') -# # Dir[publishers_dir.join('parties', '*.rb')].each {|file| require file } -# # EventSource::Publisher.register_publishers(publishers_dir) - -# # require 'active_support/notifications' -# # called the first time we know we are using this adapter -# # it would be a good spot to require the libraries you're using -# # and modify EventSource::Worker as needed -# # raise NotImplementedError -# end - -# def enqueue(event) -# QueueBus.publish(event.event_key, event.payload) -# end - -# def dequeue(queue, event_name, subscriber, block) -# method_name = "on_#{event_name.gsub('.', '_')}" - -# EeventSource.channel('faa').send(queue, event_name, block) -# # QueueBus::Subscriber.subscribe_queue(queue, method_name) - -# # publisher = publisher_by(queue) -# # puts "--------->>>> queue #{queue} event #{event_name} --- #{block.inspect}" - -# # if block.present? -# # publisher.subscribe(event_name) do |event| -# # block.call(event) -# # end -# # else -# # subscribe_listener(queue, subscriber) -# # end - -# # ActiveSupport::Notifications.subscribe(key) do |name, started, finished, unique_id, data| -# # block.call(data) -# # end -# end -# end -# end diff --git a/spec/rails_app/app/event_source/subscribers/enterprise_subscriber.rb b/spec/rails_app/app/event_source/subscribers/enterprise_subscriber.rb new file mode 100644 index 00000000..c36b6385 --- /dev/null +++ b/spec/rails_app/app/event_source/subscribers/enterprise_subscriber.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Subscribers + # Subscriber will receive Enterprise requests like date change + class EnterpriseSubscriber + include ::EventSource::Subscriber[amqp: "enroll.enterprise.events"] + + subscribe(:on_date_advanced) do |delivery_info, metadata, response| + logger.info "-" * 100 unless Rails.env.test? + logger.info "EnterpriseSubscriber#on_date_advanced, response: #{response}" + + ack(delivery_info.delivery_tag) + rescue StandardError, SystemStackError => e + ack(delivery_info.delivery_tag) + end + + subscribe( + :on_enroll_enterprise_events + ) do |delivery_info, _metadata, response| + logger.info "-" * 100 unless Rails.env.test? + logger.info "EnterpriseSubscriber#on_enroll_enterprise_events, response: #{response}" + + ack(delivery_info.delivery_tag) + rescue StandardError, SystemStackError => e + ack(delivery_info.delivery_tag) + end + end +end diff --git a/spec/rails_app/app/event_source/subscribers/event_log_subscriber.rb b/spec/rails_app/app/event_source/subscribers/event_log_subscriber.rb new file mode 100644 index 00000000..9dea8947 --- /dev/null +++ b/spec/rails_app/app/event_source/subscribers/event_log_subscriber.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Subscribers + # Subscriber will receive Audit Log events + class EventLogSubscriber + include EventSource::Logging + include ::EventSource::Subscriber[amqp: "enroll.audit_log.events"] + + subscribe( + :on_enroll_audit_log_events + ) do |delivery_info, metadata, response| + logger.info "-" * 100 unless Rails.env.test? + + subscriber_logger.info "EventLogEventsSubscriber#on_enroll_audit_log_events, response: #{response}" + + ack(delivery_info.delivery_tag) + rescue StandardError, SystemStackError => e + ack(delivery_info.delivery_tag) + end + + private + + def subscriber_logger + @subscriber_logger ||= + Logger.new("#{Rails.root}/log/on_enroll_audit_log_events.log") + end + end +end diff --git a/spec/rails_app/config/initializers/event_source.rb b/spec/rails_app/config/initializers/event_source.rb index 20e04400..346d942c 100644 --- a/spec/rails_app/config/initializers/event_source.rb +++ b/spec/rails_app/config/initializers/event_source.rb @@ -5,6 +5,7 @@ config.pub_sub_root = Pathname.pwd.join('spec', 'rails_app', 'app', 'event_source') config.server_key = Rails.env.to_sym config.app_name = :enroll + config.log_level = :warn config.servers do |server| # mitc @@ -111,13 +112,10 @@ # { # url: ENV['RABBITMQ_SERVER'] # }, - # resque_bus: { - # protocol: :resque_bus - # } # ] # config.asyncapi_resources = AcaEntities::AsyncApi::Mitc - # config.asyncapi_resources = AcaEntities.find_resources_for(:enroll, %w[amqp resque_bus]) # will give you resouces in array of hashes form + # config.asyncapi_resources = AcaEntities.find_resources_for(:enroll, %w[amqp]) # will give you resouces in array of hashes form # AcaEntities::Operations::AsyncApi::FindResource.new.call(self) end diff --git a/spec/support/asyncapi/amqp_audit_log_publish.yml b/spec/support/asyncapi/amqp_audit_log_publish.yml new file mode 100644 index 00000000..b51812bd --- /dev/null +++ b/spec/support/asyncapi/amqp_audit_log_publish.yml @@ -0,0 +1,87 @@ +--- +asyncapi: 2.0.0 +info: + title: Enroll App + version: 0.1.0 + description: AMQP Publish configuration for the Fdsh services + contact: + name: IdeaCrew + url: https://ideacrew.com + email: info@ideacrew.com + license: + name: MIT + url: https://opensource.org/licenses/MIT + +servers: + production: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Production Server + development: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Test Server + test: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Test Server + +channels: + enroll.audit_log.events.created: + bindings: + amqp: + is: :routing_key + exchange: + name: enroll.audit_log.events + type: topic + content_type: application/json + durable: true + auto_delete: false + vhost: "/" + binding_version: "0.2.0" + publish: + operationId: enroll.audit_log.events.created + description: Events - system date advanced + bindings: + amqp: + app_id: enroll + type: enroll.audit_log.events + routing_key: enroll.audit_log.events.created + deliveryMode: 2 + mandatory: true + timestamp: true + content_type: application/json + bindingVersion: 0.2.0 + x_amqp_exchange_to_exchanges: + enroll.enterprise.events: + description: Forward Enterprise Events to Exchange enroll.audit_log.events + routing_key: enroll.enterprise.# + bindings: + amqp: + is: :routing_key + exchange: + name: enroll.enterprise.events + type: topic + content_type: application/json + durable: true + auto_delete: false + vhost: "/" + enroll.individual.enrollments: + description: Forward Individual Enrollment Events to Exchange enroll.audit_log.events + routing_key: enroll.individual.enrollments.# + bindings: + amqp: + is: :routing_key + exchange: + name: enroll.individual.enrollments + type: topic + content_type: application/json + durable: true + auto_delete: false + vhost: "/" +tags: + - name: linter_tag + description: placeholder that satisfies the linter \ No newline at end of file diff --git a/spec/support/asyncapi/amqp_audit_log_subscribe.yml b/spec/support/asyncapi/amqp_audit_log_subscribe.yml new file mode 100644 index 00000000..ebee9c12 --- /dev/null +++ b/spec/support/asyncapi/amqp_audit_log_subscribe.yml @@ -0,0 +1,55 @@ +--- +asyncapi: 2.0.0 +info: + title: Enroll App + version: 0.1.0 + description: AMQP Subsribe configuration for the Enroll App services + contact: + name: IdeaCrew + url: https://ideacrew.com + email: info@ideacrew.com + license: + name: MIT + url: https://opensource.org/licenses/MIT + +servers: + production: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Production Server + development: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Test Server + test: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Test Server +channels: + on_enroll.enroll.audit_log.events: + bindings: + amqp: + is: queue + queue: + name: on_enroll.enroll.audit_log.events + durable: true + exclusive: false + auto_delete: false + vhost: "/" + subscribe: + bindings: + amqp: + ack: true + exclusive: false + routing_key: enroll.# + prefetch: 1 + bindingVersion: "0.2.0" + operationId: on_enroll.enroll.audit_log.events + description: Events - for system wide changes + +tags: + - name: linter_tag + description: placeholder that satisfies the linter \ No newline at end of file diff --git a/spec/support/asyncapi/amqp_enterprise_subscribe.yml b/spec/support/asyncapi/amqp_enterprise_subscribe.yml new file mode 100644 index 00000000..1842ed07 --- /dev/null +++ b/spec/support/asyncapi/amqp_enterprise_subscribe.yml @@ -0,0 +1,55 @@ +--- +asyncapi: 2.0.0 +info: + title: Enroll App + version: 0.1.0 + description: AMQP Subsribe configuration for the Enroll App services + contact: + name: IdeaCrew + url: https://ideacrew.com + email: info@ideacrew.com + license: + name: MIT + url: https://opensource.org/licenses/MIT + +servers: + production: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Production Server + development: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Test Server + test: + url: "amqp://rabbitmq:5672/event_source" + protocol: :amqp + protocolVersion: "0.9.2" + description: RabbitMQ Test Server +channels: + on_enroll.enroll.enterprise.events: + bindings: + amqp: + is: queue + queue: + name: on_enroll.enroll.enterprise.events + durable: true + exclusive: false + auto_delete: false + vhost: "/" + subscribe: + bindings: + amqp: + ack: true + exclusive: false + routing_key: enroll.# + prefetch: 1 + bindingVersion: "0.2.0" + operationId: on_enroll.enroll.enterprise.events + description: Events - for system wide changes + +tags: + - name: linter_tag + description: placeholder that satisfies the linter \ No newline at end of file