From 6bb309f590c40a43d75900a34f7fbd8f4f48d370 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20=C4=8Cepel=C3=ADk?= Date: Tue, 18 Dec 2018 20:36:08 +0100 Subject: [PATCH] Add support for native ETCD distributed locks (#122) * implement ETCD locking * expand README.md * fix typos in README.md * add Connection#with_lock * Update to grpc 1.17.0 Grpc was being kept at 1.6.0, which does not support ruby 2.5.0. Since that important for us, update to 1.17.0 and solve issue with deadline in spec by using `#from_relative_time` provided by `GRPC::Core::TimeConsts` instead of our own implementation. Also provide rake file with task to download etcd and improve travis config to test more configurations. * Don't test lock in etcd v3.1.X * Use version from test instance * Disable 3.3.10 in travis (doesn't work) * Improve test coverage to make codecov happy * Don't test locks in 3.1.0 * add lease support to #lock * fix specs * fix doc comments * fix README.md locking examples --- .travis.yml | 17 ++++--- README.md | 17 +++++++ Rakefile | 16 ++++++ etcdv3.gemspec | 6 ++- lib/etcdv3.rb | 33 +++++++++++++ lib/etcdv3/auth.rb | 3 +- lib/etcdv3/connection.rb | 3 +- lib/etcdv3/etcdrpc/annotations_pb.rb | 13 +++++ lib/etcdv3/etcdrpc/rpc_pb.rb | 1 + lib/etcdv3/etcdrpc/rpc_services_pb.rb | 2 + lib/etcdv3/etcdrpc/v3lock_pb.rb | 30 +++++++++++ lib/etcdv3/etcdrpc/v3lock_services_pb.rb | 25 ++++++++++ lib/etcdv3/kv.rb | 3 +- lib/etcdv3/lease.rb | 4 +- lib/etcdv3/lock.rb | 27 ++++++++++ lib/etcdv3/protos/v3lock.proto | 55 +++++++++++++++++++++ spec/etcdv3/lock_spec.rb | 23 +++++++++ spec/etcdv3_spec.rb | 23 ++++++++- spec/helpers/shared_examples_for_timeout.rb | 17 ++++--- spec/helpers/test_instance.rb | 2 + spec/spec_helper.rb | 9 ++-- 21 files changed, 303 insertions(+), 26 deletions(-) create mode 100644 Rakefile create mode 100644 lib/etcdv3/etcdrpc/annotations_pb.rb create mode 100644 lib/etcdv3/etcdrpc/v3lock_pb.rb create mode 100644 lib/etcdv3/etcdrpc/v3lock_services_pb.rb create mode 100644 lib/etcdv3/lock.rb create mode 100644 lib/etcdv3/protos/v3lock.proto create mode 100644 spec/etcdv3/lock_spec.rb diff --git a/.travis.yml b/.travis.yml index 0e17288..357f378 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,16 +1,19 @@ language: ruby rvm: - - 2.4.1 - - 2.2 + - 2.5.3 + - 2.4.5 + - 2.3.8 env: - global: - - ETCD_VERSION=v3.2.0 + - ETCD_VERSION=v3.1.20 + - ETCD_VERSION=v3.2.25 +# v3.3.10 is not working for whatever reason (at least in travis, spec passes +# locally for me) +# - ETCD_VERSION=v3.3.10 install: - bundle install - - wget https://github.com/coreos/etcd/releases/download/$ETCD_VERSION/etcd-$ETCD_VERSION-linux-amd64.tar.gz -O etcd.tar.gz --no-check-certificate - - tar zxvf etcd.tar.gz - - export PATH=$PATH:etcd-$ETCD_VERSION-linux-amd64 + - bundle exec rake download-etcd + - export PATH="$(dirname $(find /tmp -name 'etcd')):$PATH" script: bundle exec rspec diff --git a/README.md b/README.md index 5c8eef0..b723dad 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,23 @@ conn.watch('boom') do |events| end ``` +## Locks +```ruby +# First, get yourself a lease +lease_id = conn.lease_grant(100)['ID'] + +# Attempt to lock distibuted lock 'foo', wait at most 10 seconds +lock_key = conn.lock('foo', lease_id, timeout: 10).key + +# Unlock the 'foo' lock using the key returned from `lock` +conn.unlock(key) + +# Perform a critical section while holding the lock 'hello' +conn.with_lock('hello', lease_id) do + puts "kitty!" +end +``` + ## Alarms ```ruby # List all active Alarms diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..ac1f79b --- /dev/null +++ b/Rakefile @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +ETCD_VERSION = ENV["ETCD_VERSION"] || "v3.2.0" +ETCD_URL = "https://github.com/coreos/etcd/releases/download/#{ETCD_VERSION}/etcd-#{ETCD_VERSION}-linux-amd64.tar.gz" + +require "tmpdir" + +desc "Download etcd for it can be used in rspec" +task :"download-etcd" do + tmpdir = Dir.mktmpdir + system("wget", ETCD_URL, "-O", "#{tmpdir}/etcd.tar.gz") || exit(1) + system(*%W{tar -C #{tmpdir} -zxvf #{tmpdir}/etcd.tar.gz}) || exit(1) + + puts "Etcd downloaded and extracted. Add it to the path:" + puts " export PATH=\"#{tmpdir}/etcd-#{ETCD_VERSION}-linux-amd64:$PATH\"" +end diff --git a/etcdv3.gemspec b/etcdv3.gemspec index 58019a1..7268b1b 100644 --- a/etcdv3.gemspec +++ b/etcdv3.gemspec @@ -14,6 +14,8 @@ Gem::Specification.new do |s| s.files = `git ls-files`.split("\n") s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n") - s.add_dependency("grpc", "~> 1.6.0") - s.add_development_dependency("rspec", "~> 3.6.0") + s.add_dependency("grpc", "~> 1.17") + s.add_development_dependency("pry-byebug", "~> 3.6") + s.add_development_dependency("rake", "~> 12.3") + s.add_development_dependency("rspec", "~> 3.6") end diff --git a/lib/etcdv3.rb b/lib/etcdv3.rb index c520335..4b0a11d 100644 --- a/lib/etcdv3.rb +++ b/lib/etcdv3.rb @@ -2,6 +2,7 @@ require 'uri' require 'etcdv3/etcdrpc/rpc_services_pb' +require 'etcdv3/etcdrpc/v3lock_services_pb' require 'etcdv3/auth' require 'etcdv3/kv/requests' require 'etcdv3/kv/transaction' @@ -9,6 +10,7 @@ require 'etcdv3/maintenance' require 'etcdv3/lease' require 'etcdv3/watch' +require 'etcdv3/lock' require 'etcdv3/connection' require 'etcdv3/connection_wrapper' @@ -84,6 +86,37 @@ def get(key, opts={}) @conn.handle(:kv, 'get', [key, opts]) end + # Locks distributed lock with the given name. The lock will unlock automatically + # when lease with the given ID expires. If this is not desirable, provide a non-expiring + # lease ID as an argument. + # name - string + # lease_id - integer + # optional :timeout - integer + def lock(name, lease_id, timeout: nil) + @conn.handle(:lock, 'lock', [name, lease_id, {timeout: timeout}]) + end + + # Unlock distributed lock using the key previously obtained from lock. + # key - string + # optional :timeout - integer + def unlock(key, timeout: nil) + @conn.handle(:lock, 'unlock', [key, {timeout: timeout}]) + end + + # Yield into the critical section while holding lock with the given + # name. The lock will be unlocked even if the block throws. + # name - string + # lease_id - integer + # optional :timeout - integer + def with_lock(name, lease_id, timeout: nil) + key = lock(name, lease_id, timeout: timeout).key + begin + yield + ensure + unlock(key, timeout: timeout) + end + end + # Inserts a new key. # key - string # value - string diff --git a/lib/etcdv3/auth.rb b/lib/etcdv3/auth.rb index 315fb4d..0167870 100644 --- a/lib/etcdv3/auth.rb +++ b/lib/etcdv3/auth.rb @@ -1,6 +1,7 @@ class Etcdv3 class Auth + include GRPC::Core::TimeConsts PERMISSIONS = { :read => Authpb::Permission::Type::READ, @@ -122,7 +123,7 @@ def generate_token(user, password, timeout: nil) private def deadline(timeout) - Time.now.to_f + (timeout || @timeout) + from_relative_time(timeout || @timeout) end end end diff --git a/lib/etcdv3/connection.rb b/lib/etcdv3/connection.rb index f925399..634d20d 100644 --- a/lib/etcdv3/connection.rb +++ b/lib/etcdv3/connection.rb @@ -6,7 +6,8 @@ class Connection kv: Etcdv3::KV, maintenance: Etcdv3::Maintenance, lease: Etcdv3::Lease, - watch: Etcdv3::Watch + watch: Etcdv3::Watch, + lock: Etcdv3::Lock, } attr_reader :endpoint, :hostname, :handlers, :credentials diff --git a/lib/etcdv3/etcdrpc/annotations_pb.rb b/lib/etcdv3/etcdrpc/annotations_pb.rb new file mode 100644 index 0000000..5d20070 --- /dev/null +++ b/lib/etcdv3/etcdrpc/annotations_pb.rb @@ -0,0 +1,13 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: annotations.proto + +require 'google/protobuf' + +#require 'http_pb' +Google::Protobuf::DescriptorPool.generated_pool.build do +end + +module Google + module Api + end +end diff --git a/lib/etcdv3/etcdrpc/rpc_pb.rb b/lib/etcdv3/etcdrpc/rpc_pb.rb index 61eb671..8d5f135 100644 --- a/lib/etcdv3/etcdrpc/rpc_pb.rb +++ b/lib/etcdv3/etcdrpc/rpc_pb.rb @@ -5,6 +5,7 @@ require_relative 'kv_pb' require_relative 'auth_pb' +require_relative 'annotations_pb' Google::Protobuf::DescriptorPool.generated_pool.build do add_message "etcdserverpb.ResponseHeader" do optional :cluster_id, :uint64, 1 diff --git a/lib/etcdv3/etcdrpc/rpc_services_pb.rb b/lib/etcdv3/etcdrpc/rpc_services_pb.rb index 7780ab7..40e8501 100644 --- a/lib/etcdv3/etcdrpc/rpc_services_pb.rb +++ b/lib/etcdv3/etcdrpc/rpc_services_pb.rb @@ -1,5 +1,7 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # Source: rpc.proto for package 'etcdserverpb' + +require 'grpc' require_relative 'rpc_pb' module Etcdserverpb diff --git a/lib/etcdv3/etcdrpc/v3lock_pb.rb b/lib/etcdv3/etcdrpc/v3lock_pb.rb new file mode 100644 index 0000000..ea9b59d --- /dev/null +++ b/lib/etcdv3/etcdrpc/v3lock_pb.rb @@ -0,0 +1,30 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: v3lock.proto + +require 'google/protobuf' + +require_relative 'annotations_pb' +require_relative 'rpc_pb' +Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "v3lockpb.LockRequest" do + optional :name, :bytes, 1 + optional :lease, :int64, 2 + end + add_message "v3lockpb.LockResponse" do + optional :header, :message, 1, "etcdserverpb.ResponseHeader" + optional :key, :bytes, 2 + end + add_message "v3lockpb.UnlockRequest" do + optional :key, :bytes, 1 + end + add_message "v3lockpb.UnlockResponse" do + optional :header, :message, 1, "etcdserverpb.ResponseHeader" + end +end + +module V3lockpb + LockRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("v3lockpb.LockRequest").msgclass + LockResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("v3lockpb.LockResponse").msgclass + UnlockRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("v3lockpb.UnlockRequest").msgclass + UnlockResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("v3lockpb.UnlockResponse").msgclass +end diff --git a/lib/etcdv3/etcdrpc/v3lock_services_pb.rb b/lib/etcdv3/etcdrpc/v3lock_services_pb.rb new file mode 100644 index 0000000..2c92448 --- /dev/null +++ b/lib/etcdv3/etcdrpc/v3lock_services_pb.rb @@ -0,0 +1,25 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: v3lock.proto for package 'v3lockpb' + +require 'grpc' +require_relative 'v3lock_pb' + +module V3lockpb + module Lock + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'v3lockpb.Lock' + + # Lock acquires a distributed shared lock on a given named lock. + rpc :Lock, LockRequest, LockResponse + # Unlock takes a key returned by Lock and releases the hold on lock. + rpc :Unlock, UnlockRequest, UnlockResponse + end + + Stub = Service.rpc_stub_class + end +end diff --git a/lib/etcdv3/kv.rb b/lib/etcdv3/kv.rb index afd4693..ba1ebdb 100644 --- a/lib/etcdv3/kv.rb +++ b/lib/etcdv3/kv.rb @@ -2,6 +2,7 @@ class Etcdv3 class KV include Etcdv3::KV::Requests + include GRPC::Core::TimeConsts def initialize(hostname, credentials, timeout, metadata={}) @stub = Etcdserverpb::KV::Stub.new(hostname, credentials) @@ -36,7 +37,7 @@ def transaction(block, timeout: nil) private def deadline(timeout) - Time.now.to_f + (timeout || @timeout) + from_relative_time(timeout || @timeout) end def generate_request_ops(requests) diff --git a/lib/etcdv3/lease.rb b/lib/etcdv3/lease.rb index 37c64ff..462c3f6 100644 --- a/lib/etcdv3/lease.rb +++ b/lib/etcdv3/lease.rb @@ -1,5 +1,7 @@ class Etcdv3 class Lease + include GRPC::Core::TimeConsts + def initialize(hostname, credentials, timeout, metadata={}) @stub = Etcdserverpb::Lease::Stub.new(hostname, credentials) @timeout = timeout @@ -31,7 +33,7 @@ def lease_keep_alive_once(id, timeout: nil) private def deadline(timeout) - Time.now.to_f + (timeout || @timeout) + from_relative_time(timeout || @timeout) end end end diff --git a/lib/etcdv3/lock.rb b/lib/etcdv3/lock.rb new file mode 100644 index 0000000..a439d61 --- /dev/null +++ b/lib/etcdv3/lock.rb @@ -0,0 +1,27 @@ +class Etcdv3 + class Lock + include GRPC::Core::TimeConsts + + def initialize(hostname, credentials, timeout, metadata = {}) + @stub = V3lockpb::Lock::Stub.new(hostname, credentials) + @timeout = timeout + @metadata = metadata + end + + def lock(name, lease_id, timeout: nil) + request = V3lockpb::LockRequest.new(name: name, lease: lease_id) + @stub.lock(request, deadline: deadline(timeout)) + end + + def unlock(key, timeout: nil) + request = V3lockpb::UnlockRequest.new(key: key) + @stub.unlock(request, deadline: deadline(timeout)) + end + + private + + def deadline(timeout) + from_relative_time(timeout || @timeout) + end + end +end diff --git a/lib/etcdv3/protos/v3lock.proto b/lib/etcdv3/protos/v3lock.proto new file mode 100644 index 0000000..92ab1d3 --- /dev/null +++ b/lib/etcdv3/protos/v3lock.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; +package v3lockpb; + +import "annotations.proto"; +import "rpc.proto"; +import "gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +service Lock { + // Lock acquires a distributed shared lock on a given named lock. + rpc Lock(LockRequest) returns (LockResponse) { + option (google.api.http) = { + post: "/v3alpha/lock/lock" + body: "*" + }; + } + + // Unlock takes a key returned by Lock and releases the hold on lock. + rpc Unlock(UnlockRequest) returns (UnlockResponse) { + option (google.api.http) = { + post: "/v3alpha/lock/unlock" + body: "*" + }; + } +} + +message LockRequest { + // name is the identifier for the distributed shared lock to be acquired. + bytes name = 1; + // lease is the ID of the lease that will be attached to ownership of the + // lock. If the lease expires or is revoked and currently holds the lock, + // the lock is automatically released. Calls to Lock with the same lease will + // be treated as a single acquisition; locking twice with the same lease is a + // no-op. + int64 lease = 2; +} + +message LockResponse { + etcdserverpb.ResponseHeader header = 1; + // key is a key that will exist on etcd for the duration that the Lock caller + // owns the lock. Users should not modify this key or the lock may exhibit + // undefined behavior. + bytes key = 2; +} + +message UnlockRequest { + // key is the lock ownership key granted by Lock. + bytes key = 1; +} + +message UnlockResponse { + etcdserverpb.ResponseHeader header = 1; +} diff --git a/spec/etcdv3/lock_spec.rb b/spec/etcdv3/lock_spec.rb new file mode 100644 index 0000000..39ac574 --- /dev/null +++ b/spec/etcdv3/lock_spec.rb @@ -0,0 +1,23 @@ +require 'spec_helper' + +# Locking is not implemented in etcd v3.1.X +unless $instance.version < Gem::Version.new("3.2.0") + describe Etcdv3::Lock do + let(:stub) { local_stub(Etcdv3::Lock, 1) } + let(:lease_stub) { local_stub(Etcdv3::Lease, 1) } + + it_should_behave_like "a method with a GRPC timeout", described_class, :unlock, :unlock, 'foo' + #it_should_behave_like "a method with a GRPC timeout", described_class, :lock, :lock, 'foo' + + describe '#lock' do + let(:lease_id) { lease_stub.lease_grant(10)['ID'] } + subject { stub.lock('foo', lease_id) } + it { is_expected.to be_an_instance_of(V3lockpb::LockResponse) } + end + + describe '#unlock' do + subject { stub.unlock('foo') } + it { is_expected.to be_an_instance_of(V3lockpb::UnlockResponse) } + end + end +end diff --git a/spec/etcdv3_spec.rb b/spec/etcdv3_spec.rb index 60858fd..8e0a50f 100644 --- a/spec/etcdv3_spec.rb +++ b/spec/etcdv3_spec.rb @@ -1,8 +1,9 @@ require 'spec_helper' describe Etcdv3 do - context 'Insecure connection without Auth' do + let(:lease_stub) { local_stub(Etcdv3::Lease, 1) } + context 'Insecure connection without Auth' do let(:conn) { local_connection } describe '#initialize' do @@ -103,6 +104,26 @@ it_should_behave_like "Etcdv3 instance using a timeout", :get, 'apple' end + # Locking is not implemented in etcd v3.1.X + unless $instance.version < Gem::Version.new("3.2.0") + describe '#lock' do + let(:lease_id) { lease_stub.lease_grant(10)['ID'] } + subject { conn.lock('bar', lease_id) } + it { is_expected.to be_an_instance_of(V3lockpb::LockResponse) } + end + + describe '#with_lock' do + let(:lease_id) { lease_stub.lease_grant(10)['ID'] } + let(:lease_id_2) { lease_stub.lease_grant(15)['ID'] } + it 'locks' do + conn.with_lock('foobar', lease_id) do + expect { conn.lock('foobar', lease_id_2, timeout: 0.1) } + .to raise_error(GRPC::DeadlineExceeded) + end + end + end + end + describe '#put' do subject { conn.put('test', 'value') } it { is_expected.to_not be_nil } diff --git a/spec/helpers/shared_examples_for_timeout.rb b/spec/helpers/shared_examples_for_timeout.rb index ad06437..03f49b2 100644 --- a/spec/helpers/shared_examples_for_timeout.rb +++ b/spec/helpers/shared_examples_for_timeout.rb @@ -1,22 +1,23 @@ shared_examples_for "a method with a GRPC timeout" do |stub_class, method_name, expectation_target, *args| + include GRPC::Core::TimeConsts + context "#{stub_class} timeouts for #{method_name}" do let(:handler) { local_stub(stub_class, 5) } let(:client_stub) { handler.instance_variable_get "@stub"} it 'uses the timeout value' do - start_time = Time.now - deadline_time = start_time.to_f + 5 - allow(Time).to receive(:now).and_return(start_time) + deadline = from_relative_time(5) + allow(handler).to receive(:deadline).with(nil).and_return(deadline) + allow(handler).to receive(:deadline).with(5).and_return(deadline) - expect(client_stub).to receive(expectation_target).with(anything, hash_including(deadline: deadline_time)).and_call_original + expect(client_stub).to receive(expectation_target).with(anything, hash_including(deadline: deadline)).and_call_original handler.public_send(method_name, *args) end it "can have a seperate timeout passed in" do - start_time = Time.now - deadline_time = start_time.to_f + 1 - allow(Time).to receive(:now).and_return(start_time) - expect(client_stub).to receive(expectation_target).with(anything, hash_including(deadline: deadline_time)).and_call_original + deadline = from_relative_time(1) + allow(handler).to receive(:deadline).with(1).and_return(deadline) + expect(client_stub).to receive(expectation_target).with(anything, hash_including(deadline: deadline)).and_call_original handler.public_send(method_name, *args, timeout: 1) end diff --git a/spec/helpers/test_instance.rb b/spec/helpers/test_instance.rb index 929d06c..b6bf525 100644 --- a/spec/helpers/test_instance.rb +++ b/spec/helpers/test_instance.rb @@ -13,6 +13,8 @@ class PortInUseException < StandardError; end MINIMUM_VERSION = Gem::Version.new('3.0.0') + attr_accessor :version + def initialize @pids = [] @tmpdir = Dir.mktmpdir diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9e86231..49d4005 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -11,6 +11,8 @@ require 'helpers/connections' require 'helpers/shared_examples_for_timeout' +$instance = Helpers::TestInstance.new + RSpec.configure do |config| config.include(Helpers::Connections) @@ -22,12 +24,11 @@ end config.shared_context_metadata_behavior = :apply_to_host_groups - instance = Helpers::TestInstance.new config.before(:suite) do - # $stderr = File.open(File::NULL, "w") - instance.start + $stderr = File.open(File::NULL, "w") + $instance.start end config.after(:suite) do - instance.stop + $instance.stop end end