diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..8589d1f --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,51 @@ +name: CI + +on: + push: + branches: + - main + pull_request: + +jobs: + build: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + ruby: + - '3.2' + name: Ruby ${{ matrix.ruby }} + services: + mysql: + image: mysql:5.7 + env: + MYSQL_ALLOW_EMPTY_PASSWORD: yes + MYSQL_DATABASE: mysql2_split_test + ports: + - 3306:3306 + options: >- + --health-cmd "mysqladmin ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + steps: + - uses: actions/checkout@v2 + - uses: ruby/setup-ruby@v1 + with: + ruby-version: ${{ matrix.ruby }} + bundler-cache: true + - run: | + bundle exec rspec + env: + MYSQL_HOST: 127.0.0.1 + RAILS_ENV: test + RuboCop: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: ruby/setup-ruby@v1 + with: + ruby-version: '3.2' + bundler-cache: true + - run: | + bundle exec rubocop --parallel --color \ No newline at end of file diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..e58f3bd --- /dev/null +++ b/.rspec @@ -0,0 +1,3 @@ +--force-color +--format documentation +--require ./spec/spec_helper.rb \ No newline at end of file diff --git a/.rubocop.yml b/.rubocop.yml new file mode 100644 index 0000000..ed58f2f --- /dev/null +++ b/.rubocop.yml @@ -0,0 +1,12 @@ +require: + - rubocop-rails + +Style/Documentation: + Enabled: false + +Rails/Delegate: + Enabled: false + +Metrics/BlockLength: + Exclude: + - spec/**/* \ No newline at end of file diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..810f08e --- /dev/null +++ b/Gemfile @@ -0,0 +1,4 @@ +# frozen_string_literal: true + +source 'http://rubygems.org' +gemspec diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..41969de --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,103 @@ +PATH + remote: . + specs: + mysql2-split (0.1.0) + forwardable (~> 1) + +GEM + remote: http://rubygems.org/ + specs: + activemodel (7.1.3.2) + activesupport (= 7.1.3.2) + activerecord (7.1.3.2) + activemodel (= 7.1.3.2) + activesupport (= 7.1.3.2) + timeout (>= 0.4.0) + activesupport (7.1.3.2) + base64 + bigdecimal + concurrent-ruby (~> 1.0, >= 1.0.2) + connection_pool (>= 2.2.5) + drb + i18n (>= 1.6, < 2) + minitest (>= 5.1) + mutex_m + tzinfo (~> 2.0) + ast (2.4.2) + base64 (0.2.0) + bigdecimal (3.1.7) + concurrent-ruby (1.2.3) + connection_pool (2.4.1) + diff-lcs (1.5.1) + drb (2.2.1) + forwardable (1.3.3) + i18n (1.14.4) + concurrent-ruby (~> 1.0) + json (2.7.1) + language_server-protocol (3.17.0.3) + minitest (5.22.3) + mutex_m (0.2.0) + mysql2 (0.5.6) + parallel (1.24.0) + parser (3.3.0.5) + ast (~> 2.4.1) + racc + racc (1.7.3) + rack (3.0.10) + rainbow (3.1.1) + rake (13.1.0) + regexp_parser (2.9.0) + rexml (3.2.6) + rspec (3.13.0) + rspec-core (~> 3.13.0) + rspec-expectations (~> 3.13.0) + rspec-mocks (~> 3.13.0) + rspec-core (3.13.0) + rspec-support (~> 3.13.0) + rspec-expectations (3.13.0) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-mocks (3.13.0) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-support (3.13.1) + rubocop (1.62.1) + json (~> 2.3) + language_server-protocol (>= 3.17.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.31.1, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.31.2) + parser (>= 3.3.0.4) + rubocop-rails (2.24.1) + activesupport (>= 4.2.0) + rack (>= 1.1) + rubocop (>= 1.33.0, < 2.0) + rubocop-ast (>= 1.31.1, < 2.0) + ruby-progressbar (1.13.0) + timeout (0.4.1) + tzinfo (2.0.6) + concurrent-ruby (~> 1.0) + unicode-display_width (2.5.0) + +PLATFORMS + arm64-darwin-23 + x86_64-linux + +DEPENDENCIES + activerecord (>= 7.1.0) + activesupport (>= 7.1.0) + mysql2 + mysql2-split! + rake + rspec (~> 3) + rubocop (~> 1.62.0) + rubocop-rails (~> 2.24.0) + +BUNDLED WITH + 2.4.22 diff --git a/README.md b/README.md new file mode 100644 index 0000000..b5cbcd3 --- /dev/null +++ b/README.md @@ -0,0 +1,76 @@ +# MySQL2 Split + +![Build Status](https://github.com/olioex/mysql2-split/workflows/ci/badge.svg) + +MySQL2Split is generic primary/replica proxy for ActiveRecord 7.1+ and MySQL. It handles the switching of connections between primary and replica database servers. It comes with an ActiveRecord database adapter implementation. + +Mysql2Split is heavily inspired by [Makara](https://github.com/instacart/makara) from TaskRabbit and then Instacart. Unfortunately this project is unmaintained and broke for us with Rails 7.1. This is an attempt to start afresh on the project. It is definitely not as fully featured as Makara at this stage. + +## Installation + +Use the current version of the gem from [rubygems](https://rubygems.org/gems/makara) in your `Gemfile`. + +```ruby +gem 'mysql2-split' +``` + +This project assumes that your read/write endpoints are handled by a separate system (e.g. DNS). + +## Usage + +After a write request during a thread the adapter will continue using the `primary` server, unless the context is specifically released. + +### Configuration + +Update your **database.yml** as follows: + +```yml +development: + adapter: mysql2_split + mysql2_split: + primary: + <<: *default + database: database_name + host: primary-host.local + replica: + <<: *default + password: ithappenstobedifferent + host: replica-host.local +``` + +### Forcing connections + +A context is local to the curent thread of execution. This will allow you to stick to the primary safely in a single thread +in systems such as sidekiq, for instance. + +#### Releasing stuck connections (clearing context) + +If you need to clear the current context, releasing any stuck connections, all you have to do is: + +```ruby +Mysql2Split::Context.release_all +``` + +#### Forcing connection to primary server + +```ruby +Mysql2Split::Context.stick_to_primary +``` + +### Logging + +You can set a logger instance to ::Mysql2Split::Logging::Logger.logger and Mysql2Split. + +```ruby +Mysql2Split::Logging::Logger.logger = ::Logger.new(STDOUT) +``` + +### What queries goes where? + +In general: Any `SELECT` statements will execute against your replica(s), anything else will go to the primary. + +There are some edge cases: +* `SET` operations will be sent to all connections +* Execution of specific methods such as `connect!`, `disconnect!`, and `clear_cache!` are invoked on all underlying connections +* Calls inside a transaction will always be sent to the primary (otherwise changes from within the transaction could not be read back on most transaction isolation levels) +* Locking reads (e.g. `SELECT ... FOR UPDATE`) will always be sent to the primary diff --git a/lib/active_record/connection_adapters/mysql2_split_adapter.rb b/lib/active_record/connection_adapters/mysql2_split_adapter.rb new file mode 100644 index 0000000..aa35d2a --- /dev/null +++ b/lib/active_record/connection_adapters/mysql2_split_adapter.rb @@ -0,0 +1,124 @@ +# frozen_string_literal: true + +require 'active_record/connection_adapters/abstract_adapter' +require 'active_record/connection_adapters/mysql2_adapter' +require_relative '../../mysql2_split' + +module ActiveRecord + module ConnectionHandling + def mysql2_split_connection(config) + ActiveRecord::ConnectionAdapters::Mysql2SplitAdapter.new(config) + end + end +end + +module ActiveRecord + module ConnectionAdapters + class Mysql2SplitAdapter < ActiveRecord::ConnectionAdapters::Mysql2Adapter + SQL_PRIMARY_MATCHERS = [ + /\A\s*select.+for update\Z/i, /select.+lock in share mode\Z/i, + /\A\s*select.+(nextval|currval|lastval|get_lock|release_lock|pg_advisory_lock|pg_advisory_unlock)\(/i, + /\A\s*show/i + ].freeze + SQL_REPLICA_MATCHERS = [/\A\s*(select|with.+\)\s*select)\s/i].freeze + SQL_ALL_MATCHERS = [/\A\s*set\s/i].freeze + SQL_SKIP_ALL_MATCHERS = [/\A\s*set\s+local\s/i].freeze + + def initialize(*args) + @replica_config = args[0][:mysql2_split]['replica'] + args[0] = args[0][:mysql2_split]['primary'] + + super(*args) + @connection_parameters ||= args[0] + update_config + end + + def execute(sql) + if should_send_to_all?(sql) + send_to_replica(sql, connection: :all, method: :execute) + return super(sql) + end + return send_to_replica(sql, connection: :replica, method: :execute) if can_go_to_replica?(sql) + + Mysql2Split::Context.stick_to_primary if write_query?(sql) + Mysql2Split::Context.used_connection(:primary) + + super(sql) + end + + def execute_and_free(sql, name = nil, async: false) # :nodoc:# + if should_send_to_all?(sql) + send_to_replica(sql, name, connection: :all) + return super(sql, name, async:) + end + return send_to_replica(sql, connection: :replica) if can_go_to_replica?(sql) + + Mysql2Split::Context.stick_to_primary if write_query?(sql) + Mysql2Split::Context.used_connection(:primary) + + super(sql, name, async:) + end + + def connect!(...) + replica_connection.connect!(...) + super + end + + def reconnect!(...) + replica_connection.reconnect!(...) + super + end + + def disconnect!(...) + replica_connection.disconnect!(...) + super + end + + def clear_cache!(...) + replica_connection.clear_cache!(...) + super + end + + private + + def should_send_to_all?(sql) + SQL_ALL_MATCHERS.any? { |matcher| sql =~ matcher } && SQL_SKIP_ALL_MATCHERS.none? { |matcher| sql =~ matcher } + end + + def can_go_to_replica?(sql) + return false if Mysql2Split::Context.use_primary? || + open_transactions.positive? || + SQL_PRIMARY_MATCHERS.any? { |matcher| sql =~ matcher } + + true + end + + def send_to_replica(sql, connection: nil, method: :exec_query) + Mysql2Split::Context.used_connection(connection) if connection + if method == :execute + replica_connection.execute(sql) + else + replica_connection.exec_query(sql) + end + end + + def write_query?(sql) + %w[INSERT UPDATE DELETE LOCK].include?(sql.split(' ').first) + end + + def replica_connection + @replica_connection ||= ActiveRecord::ConnectionAdapters::Mysql2Adapter.new(@replica_config) + end + + def update_config + @config[:flags] ||= 0 + + if @config[:flags].is_a? Array + @config[:flags].push 'FOUND_ROWS' + else + @config[:flags] |= ::Mysql2::Client::FOUND_ROWS + end + end + end + end +end diff --git a/lib/mysql2_split.rb b/lib/mysql2_split.rb new file mode 100644 index 0000000..fc80c5d --- /dev/null +++ b/lib/mysql2_split.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +require 'active_support' + +module Mysql2Split + autoload :Context, 'mysql2_split/context' + autoload :VERSION, 'mysql2_split/version' + + module Logging + autoload :Subscriber, 'mysql2_split/logging/subscriber' + autoload :Logger, 'mysql2_split/logging/logger' + end +end + +ActiveSupport.on_load(:active_record) do + ActiveRecord::LogSubscriber.log_subscribers.each do |subscriber| + subscriber.extend Mysql2Split::Logging::Subscriber + end +end diff --git a/lib/mysql2_split/context.rb b/lib/mysql2_split/context.rb new file mode 100644 index 0000000..d70aaa6 --- /dev/null +++ b/lib/mysql2_split/context.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +module Mysql2Split + class Context + THREAD_KEY = :mysql2_split_context + + # Stores the staged data with an expiration time based on the current time, + # and clears any expired entries. Returns true if any changes were made to + # the current store + def initialize(primary: false, expiry: nil) + @primary = primary + @expiry = expiry + @last_used_connection = :primary + end + + def stick_to_primary + @primary = true + end + + def potential_write + stick_to_primary + end + + def release_all + @primary = false + @expiry = nil + @last_used_connection = nil + end + + def use_primary? + @primary + end + + def used_connection(connection) + @last_used_connection = connection + end + + attr_reader :last_used_connection + + class << self + def stick_to_primary + current.stick_to_primary + end + + def release_all + current.release_all + end + + def used_connection(connection) + current.used_connection(connection) + end + + def use_primary? + current.use_primary? + end + + def last_used_connection + current.last_used_connection + end + + protected + + def current + fetch(THREAD_KEY) { new } + end + + def fetch(key) + get(key) || set(key, yield) + end + + def get(key) + Thread.current.thread_variable_get(key) + end + + def set(key, value) + Thread.current.thread_variable_set(key, value) + end + end + end +end diff --git a/lib/mysql2_split/logging/logger.rb b/lib/mysql2_split/logging/logger.rb new file mode 100644 index 0000000..7fedac2 --- /dev/null +++ b/lib/mysql2_split/logging/logger.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module Mysql2Split + module Logging + class Logger + class << self + def log(message, format = :info) + logger&.send(format, "[Mysql2Split] #{message}") + end + + attr_accessor :logger + end + end + end +end diff --git a/lib/mysql2_split/logging/subscriber.rb b/lib/mysql2_split/logging/subscriber.rb new file mode 100644 index 0000000..f6afb4f --- /dev/null +++ b/lib/mysql2_split/logging/subscriber.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Mysql2Split + module Logging + module Subscriber + IGNORE_PAYLOAD_NAMES = %w[SCHEMA EXPLAIN].freeze + + def sql(event) + name = event.payload[:name] + unless IGNORE_PAYLOAD_NAMES.include?(name) + name = [current_wrapper_name(event), name].compact.join(' ') + event.payload[:name] = name + end + super(event) + end + + protected + + def current_wrapper_name(_event) + connection = Mysql2Split::Context.last_used_connection + return nil unless connection + + "[#{connection}]" + end + end + end +end diff --git a/lib/mysql2_split/version.rb b/lib/mysql2_split/version.rb new file mode 100644 index 0000000..c242ed5 --- /dev/null +++ b/lib/mysql2_split/version.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Mysql2Split + unless defined?(::Mysql2Split::VERSION) + module VERSION + MAJOR = 0 + MINOR = 1 + PATCH = 0 + PRE = nil + + def self.to_s + [MAJOR, MINOR, PATCH, PRE].compact.join('.') + end + end + end + ::Mysql2Split::VERSION +end diff --git a/mysql2-split.gemspec b/mysql2-split.gemspec new file mode 100644 index 0000000..ebe36cb --- /dev/null +++ b/mysql2-split.gemspec @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +require File.expand_path('lib/mysql2_split/version.rb', __dir__) + +Gem::Specification.new do |gem| + gem.authors = ['Lloyd Watkin'] + gem.email = ['lloyd@olioex.com'] + gem.description = 'Read/Write proxy for ActiveRecord using primary/replca databases' + gem.summary = 'Read/Write proxy for ActiveRecord using primary/replca databases' + gem.homepage = 'https://github.com/olioex/mysql2-split' + gem.licenses = ['MIT'] + gem.metadata = { + 'source_code_uri' => 'https://github.com/olioex/mysql2-split' + } + + gem.files = `git ls-files`.split($OUTPUT_RECORD_SEPARATOR) + gem.executables = gem.files.grep(%r{^bin/}).map { |f| File.basename(f) } + gem.name = 'mysql2-split' + gem.require_paths = ['lib'] + gem.version = Mysql2Split::VERSION + + gem.required_ruby_version = '>= 3.2.0' + + gem.add_dependency 'forwardable', '~> 1' + + gem.add_development_dependency 'activerecord', '>= 7.1.0' + gem.add_development_dependency 'activesupport', '>= 7.1.0' + gem.add_development_dependency 'mysql2' + gem.add_development_dependency 'rake' + gem.add_development_dependency 'rspec', '~> 3' + gem.add_development_dependency 'rubocop', '~> 1.62.0' + gem.add_development_dependency 'rubocop-rails', '~> 2.24.0' +end diff --git a/spec/lib/mysql2_split/context_spec.rb b/spec/lib/mysql2_split/context_spec.rb new file mode 100644 index 0000000..912269d --- /dev/null +++ b/spec/lib/mysql2_split/context_spec.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +require 'mysql2_split/context' + +RSpec.describe Mysql2Split::Context do + describe '#initialize' do + it 'sets the primary flag and expiry' do + context = described_class.new(primary: true, expiry: 60) + expect(context.use_primary?).to be true + expect(context.last_used_connection).to eq(:primary) + end + end + + describe '#stick_to_primary' do + it 'sets the primary flag to true' do + context = described_class.new + context.stick_to_primary + expect(context.use_primary?).to be true + end + end + + describe '#potential_write' do + it 'calls stick_to_primary' do + context = described_class.new + expect(context).to receive(:stick_to_primary) + context.potential_write + end + end + + describe '#release_all' do + it 'resets the primary flag and expiry' do + context = described_class.new(primary: true, expiry: 60) + context.release_all + expect(context.use_primary?).to be false + expect(context.last_used_connection).to be_nil + end + end + + describe '#used_connection' do + it 'sets the last used connection' do + context = described_class.new + context.used_connection(:secondary) + expect(context.last_used_connection).to eq(:secondary) + end + end +end diff --git a/spec/lib/mysql2_split/logging/logger_spec.rb b/spec/lib/mysql2_split/logging/logger_spec.rb new file mode 100644 index 0000000..ac28061 --- /dev/null +++ b/spec/lib/mysql2_split/logging/logger_spec.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +RSpec.describe Mysql2Split::Logging::Logger do + describe '.log' do + let(:logger) { double('logger') } + + before do + described_class.logger = logger + end + + it 'logs the message with the specified format' do + expect(logger).to receive(:send).with(:info, '[Mysql2Split] Test message') + described_class.log('Test message', :info) + end + + it 'does not log the message if logger is not set' do + described_class.logger = nil + expect(logger).not_to receive(:send) + described_class.log('Test message', :info) + end + end + + describe '.logger=' do + let(:logger) { double('logger') } + + it 'sets the logger' do + described_class.logger = logger + expect(described_class.logger).to eq(logger) + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..7a23652 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,3 @@ +# frozen_string_literal: true + +require './lib/mysql2_split'