diff --git a/config/config.go b/config/config.go index 8916e48..ae94145 100644 --- a/config/config.go +++ b/config/config.go @@ -29,6 +29,7 @@ import ( "github.com/go-playground/validator/v10" "github.com/apache/kvrocks-controller/logger" + "github.com/apache/kvrocks-controller/store/engine/embedded" "github.com/apache/kvrocks-controller/store/engine/etcd" "github.com/apache/kvrocks-controller/store/engine/zookeeper" ) @@ -51,6 +52,7 @@ const defaultPort = 9379 type Config struct { Addr string `yaml:"addr"` StorageType string `yaml:"storage_type"` + Embedded *embedded.Config `yaml:"embedded"` Etcd *etcd.Config `yaml:"etcd"` Zookeeper *zookeeper.Config `yaml:"zookeeper"` Admin AdminConfig `yaml:"admin"` diff --git a/config/config.yaml b/config/config.yaml index 2bec5dc..dba5995 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -22,7 +22,13 @@ addr: "127.0.0.1:9379" # Which store engine should be used by controller # options: etcd, zookeeper # default: etcd -storage_type: etcd +storage_type: embedded + +embedded: + addrs: + - "127.0.0.1:1212" +# - "127.0.0.1:1213" + id: 1 etcd: addrs: diff --git a/go.mod b/go.mod index d03e7a5..4dfc109 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,9 @@ module github.com/apache/kvrocks-controller -go 1.19 +go 1.21 require ( + github.com/fatih/color v1.16.0 github.com/gin-gonic/gin v1.9.1 github.com/go-playground/validator/v10 v10.14.0 github.com/go-redis/redis/v8 v8.11.5 @@ -11,9 +12,11 @@ require ( github.com/olekukonko/tablewriter v0.0.5 github.com/prometheus/client_golang v1.11.1 github.com/spf13/cobra v1.8.0 - github.com/stretchr/testify v1.8.3 - go.etcd.io/etcd v3.3.27+incompatible - go.etcd.io/etcd/client/v3 v3.5.4 + github.com/stretchr/testify v1.8.4 + go.etcd.io/etcd/client/pkg/v3 v3.5.14 + go.etcd.io/etcd/client/v3 v3.5.14 + go.etcd.io/etcd/raft/v3 v3.5.14 + go.etcd.io/etcd/server/v3 v3.5.14 go.uber.org/atomic v1.7.0 go.uber.org/zap v1.21.0 gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0 @@ -22,23 +25,20 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect - github.com/coreos/etcd v3.3.27+incompatible // indirect github.com/coreos/go-semver v0.3.0 // indirect - github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect - github.com/coreos/pkg v0.0.0-20230327231512-ba87abf18a23 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/fatih/color v1.16.0 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect @@ -57,16 +57,20 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect - go.etcd.io/etcd/api/v3 v3.5.4 // indirect - go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect + github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect + go.etcd.io/etcd/api/v3 v3.5.14 // indirect + go.etcd.io/etcd/pkg/v3 v3.5.14 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/net v0.22.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect - google.golang.org/grpc v1.38.0 // indirect - google.golang.org/protobuf v1.30.0 // indirect + golang.org/x/time v0.5.0 // indirect + google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/grpc v1.59.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 89003fb..9f5d59f 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,9 @@ -cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -16,44 +13,32 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= -github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= -github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= -github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= -github.com/coreos/etcd v3.3.27+incompatible h1:QIudLb9KeBsE5zyYxd1mjzRSkzLg9Wf9QlRwFgd6oTA= -github.com/coreos/etcd v3.3.27+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA= +github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= -github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU= -github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/coreos/pkg v0.0.0-20230327231512-ba87abf18a23 h1:SrdboTJZnOqc2r4cT4wQCzQJjGYwkclLwx2sPrDsx7g= -github.com/coreos/pkg v0.0.0-20230327231512-ba87abf18a23/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= -github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= -github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= -github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= @@ -65,6 +50,7 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= @@ -84,35 +70,26 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= -github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -141,8 +118,6 @@ github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNa github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= @@ -159,10 +134,13 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -178,7 +156,6 @@ github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= @@ -190,7 +167,6 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -206,37 +182,42 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.etcd.io/etcd v3.3.27+incompatible h1:5hMrpf6REqTHV2LW2OclNpRtxI0k9ZplMemJsMSWju0= -go.etcd.io/etcd v3.3.27+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= -go.etcd.io/etcd/api/v3 v3.5.4 h1:OHVyt3TopwtUQ2GKdd5wu3PmmipR4FTwCqoEjSyRdIc= -go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= -go.etcd.io/etcd/client/pkg/v3 v3.5.4 h1:lrneYvz923dvC14R54XcA7FXoZ3mlGZAgmwhfm7HqOg= -go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= -go.etcd.io/etcd/client/v3 v3.5.4 h1:p83BUL3tAYS0OT/r0qglgc3M1JjhM0diV8DSWAhVXv4= -go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= +go.etcd.io/etcd/api/v3 v3.5.14 h1:vHObSCxyB9zlF60w7qzAdTcGaglbJOpSj1Xj9+WGxq0= +go.etcd.io/etcd/api/v3 v3.5.14/go.mod h1:BmtWcRlQvwa1h3G2jvKYwIQy4PkHlDej5t7uLMUdJUU= +go.etcd.io/etcd/client/pkg/v3 v3.5.14 h1:SaNH6Y+rVEdxfpA2Jr5wkEvN6Zykme5+YnbCkxvuWxQ= +go.etcd.io/etcd/client/pkg/v3 v3.5.14/go.mod h1:8uMgAokyG1czCtIdsq+AGyYQMvpIKnSvPjFMunkgeZI= +go.etcd.io/etcd/client/v3 v3.5.14 h1:CWfRs4FDaDoSz81giL7zPpZH2Z35tbOrAJkkjMqOupg= +go.etcd.io/etcd/client/v3 v3.5.14/go.mod h1:k3XfdV/VIHy/97rqWjoUzrj9tk7GgJGH9J8L4dNXmAk= +go.etcd.io/etcd/pkg/v3 v3.5.14 h1:keuxhJiDCPjTKpW77GxJnnVVD5n4IsfvkDaqiqUMNEQ= +go.etcd.io/etcd/pkg/v3 v3.5.14/go.mod h1:7o+DL6a7DYz9KSjWByX+NGmQPYinoH3D36VAu/B3JqA= +go.etcd.io/etcd/raft/v3 v3.5.14 h1:mHnpbljpBBftmK+YUfp+49ivaCc126aBPLAnwDw0DnE= +go.etcd.io/etcd/raft/v3 v3.5.14/go.mod h1:WnIK5blyJGRKsHA3efovdNoLv9QELTZHzpDOVIAuL2s= +go.etcd.io/etcd/server/v3 v3.5.14 h1:l/3gdiSSoGU6MyKAYiL+8WSOMq9ySG+NqQ04euLtZfY= +go.etcd.io/etcd/server/v3 v3.5.14/go.mod h1:SPh0rUtGNDgOZd/aTbkAUYZV+5FFHw5sdbGnO2/byw0= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= @@ -250,30 +231,21 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= -golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= -golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= -golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= -golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= @@ -283,10 +255,7 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= -golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= -golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -296,7 +265,6 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -310,7 +278,6 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -333,7 +300,6 @@ golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= @@ -342,62 +308,46 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c h1:wtujag7C+4D6KMoulW9YauvK2lgdvCMS260jsqqBXr0= -google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= -google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= -google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= -google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= -google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY= +google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4= +google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q= +google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0 h1:POO/ycCATvegFmVuPpQzZFJ+pGZeX22Ufu6fibxDVjU= gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa3CI79GS0ol3YnhVnKP89i0kNg= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -408,7 +358,4 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= -sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= diff --git a/server/server.go b/server/server.go index b4b8a08..c300e4d 100644 --- a/server/server.go +++ b/server/server.go @@ -36,6 +36,7 @@ import ( "github.com/apache/kvrocks-controller/server/helper" "github.com/apache/kvrocks-controller/store" "github.com/apache/kvrocks-controller/store/engine" + "github.com/apache/kvrocks-controller/store/engine/embedded" "github.com/apache/kvrocks-controller/store/engine/etcd" "github.com/apache/kvrocks-controller/store/engine/zookeeper" ) @@ -60,6 +61,9 @@ func NewServer(cfg *config.Config) (*Server, error) { case strings.EqualFold(cfg.StorageType, "zookeeper"): logger.Get().Info("Use Zookeeper as store") persist, err = zookeeper.New(sessionID, cfg.Zookeeper) + case strings.EqualFold(cfg.StorageType, "embedded"): + logger.Get().Info("Use Embedded as storage") + persist, err = embedded.New(sessionID, cfg.Embedded) default: logger.Get().Info("Use Etcd as default store") persist, err = etcd.New(sessionID, cfg.Etcd) diff --git a/store/engine/embedded/embedded.go b/store/engine/embedded/embedded.go new file mode 100644 index 0000000..3ebf5fc --- /dev/null +++ b/store/engine/embedded/embedded.go @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package embedded + +import ( + "bytes" + "context" + "encoding/gob" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/apache/kvrocks-controller/consts" + "github.com/apache/kvrocks-controller/logger" + persistence "github.com/apache/kvrocks-controller/store/engine" + "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.uber.org/zap" +) + +type Peer struct { + Api string `yaml:"api"` + Raft string `yaml:"raft"` +} + +type Config struct { + Peers []Peer `yaml:"peers"` + Join bool `yaml:"join"` + Path string `yaml:"path"` +} + +func parseConfig(id string, cfg *Config) (int, []string, []string, string, error) { + apiPeers := make([]string, len(cfg.Peers)) + raftPeers := make([]string, len(cfg.Peers)) + nodeId := -1 + for i, peer := range cfg.Peers { + if peer.Api == id { + nodeId = i + 1 + } + apiPeers[i] = peer.Api + if !strings.HasPrefix(peer.Raft, "http://") { + raftPeers[i] = fmt.Sprintf("http://%s", peer.Raft) + } else { + raftPeers[i] = peer.Raft + } + } + path := strings.TrimSuffix(cfg.Path, "/") + if nodeId == -1 { + return 0, apiPeers, raftPeers, path, fmt.Errorf("Address %s is not in embedded store peers configuration", id) + } + return nodeId, apiPeers, raftPeers, path, nil +} + +type Embedded struct { + kv map[string][]byte + kvMu sync.RWMutex + snapshotter *snap.Snapshotter + + node *raftNode + + myID string + PeerIDs []string + + quitCh chan struct{} + leaderChangeCh <-chan bool + proposeCh chan string + confChangeCh chan raftpb.ConfChange +} + +func New(id string, cfg *Config) (*Embedded, error) { + nodeId, apiPeers, raftPeers, path, err := parseConfig(id, cfg) + if err != nil { + return nil, err + } + + proposeCh := make(chan string) + confChangeCh := make(chan raftpb.ConfChange) + leaderChangeCh := make(chan bool) + commitCh := make(chan *commit) + errorCh := make(chan error) + snapshotterReady := make(chan *snap.Snapshotter, 1) + + e := &Embedded{ + kv: make(map[string][]byte), + myID: id, + PeerIDs: apiPeers, + quitCh: make(chan struct{}), + leaderChangeCh: leaderChangeCh, + proposeCh: proposeCh, + confChangeCh: confChangeCh, + } + + getSnapshot := func() ([]byte, error) { + e.kvMu.RLock() + defer e.kvMu.RUnlock() + return json.Marshal(e.kv) + } + // start raft node synchronization loop + notifier := &raftNotifier{ + proposeC: proposeCh, + confChangeC: confChangeCh, + leaderChangeCh: leaderChangeCh, + commitC: commitCh, + errorC: errorCh, + snapshotterReady: snapshotterReady, + } + e.node = newRaftNode(nodeId, raftPeers, cfg.Join, path, getSnapshot, notifier) + + // block until snapshotter initialized + e.snapshotter = <-snapshotterReady + snapshot := e.loadSnapshot() + if snapshot != nil { + logger.Get().Sugar().Infof("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index) + if err := e.recoverFromSnapshot(snapshot.Data); err != nil { + logger.Get().With(zap.Error(err)).Error("Failed to recover snapshot") + } + } + + go e.readCommits(commitCh, errorCh) + return e, nil +} + +func (e *Embedded) loadSnapshot() *raftpb.Snapshot { + snapshot, err := e.snapshotter.Load() + if err != nil { + if !errors.Is(err, snap.ErrNoSnapshot) && !errors.Is(err, snap.ErrEmptySnapshot) { + logger.Get().With(zap.Error(err)).Error("Failed to initialize snapshot") + } + return nil + } + return snapshot +} + +func (e *Embedded) recoverFromSnapshot(snapshot []byte) error { + var store map[string][]byte + if err := json.Unmarshal(snapshot, &store); err != nil { + return err + } + e.kvMu.Lock() + defer e.kvMu.Unlock() + e.kv = store + return nil +} + +func (e *Embedded) readCommits(commitCh <-chan *commit, errorCh <-chan error) { + for c := range commitCh { + if c == nil { + // signaled to load snapshot + snapshot := e.loadSnapshot() + if snapshot != nil { + logger.Get().Sugar().Infof("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index) + if err := e.recoverFromSnapshot(snapshot.Data); err != nil { + logger.Get().With(zap.Error(err)).Error("Failed to recover snapshot") + } + } + continue + } + + for _, data := range c.data { + var entry persistence.Entry + dec := gob.NewDecoder(bytes.NewBufferString(data)) + if err := dec.Decode(&entry); err != nil { + logger.Get().With(zap.Error(err)).Error("Failed to decode message") + } + e.kvMu.Lock() + if entry.Value == nil { + delete(e.kv, entry.Key) + } else { + e.kv[entry.Key] = entry.Value + } + e.kvMu.Unlock() + } + close(c.applyDoneC) + } + if err, ok := <-errorCh; ok { + logger.Get().With(zap.Error(err)).Error("Error occurred during reading commits") + } +} + +func (e *Embedded) ID() string { + return e.myID +} + +func (e *Embedded) Leader() string { + if e.node.leader.Load() == 0 { + return e.myID + } + return e.PeerIDs[e.node.leader.Load()-1] +} + +func (e *Embedded) LeaderChange() <-chan bool { + return e.leaderChangeCh +} + +func (e *Embedded) IsReady(ctx context.Context) bool { + for { + select { + case <-e.quitCh: + return false + case <-time.After(100 * time.Millisecond): + if e.node.leader.Load() != 0 { + return true + } + case <-ctx.Done(): + return e.node.leader.Load() != 0 + } + } +} + +func (e *Embedded) Get(_ context.Context, key string) ([]byte, error) { + e.kvMu.RLock() + defer e.kvMu.RUnlock() + value, ok := e.kv[key] + if !ok { + return nil, consts.ErrNotFound + } + return value, nil +} + +func (e *Embedded) Exists(ctx context.Context, key string) (bool, error) { + _, err := e.Get(ctx, key) + if err != nil { + if errors.Is(err, consts.ErrNotFound) { + return false, nil + } + return false, err + } + return true, nil +} + +func (e *Embedded) Propose(k string, v []byte) error { + var buf strings.Builder + if err := gob.NewEncoder(&buf).Encode(persistence.Entry{Key: k, Value: v}); err != nil { + logger.Get().With(zap.Error(err)).Error("Failed to propose changes") + return err + } + e.proposeCh <- buf.String() + return nil +} + +func (e *Embedded) Set(_ context.Context, key string, value []byte) error { + return e.Propose(key, value) +} + +func (e *Embedded) Delete(_ context.Context, key string) error { + return e.Propose(key, nil) +} + +func (e *Embedded) List(_ context.Context, prefix string) ([]persistence.Entry, error) { + entries := make([]persistence.Entry, 0) + prefixLen := len(prefix) + e.kvMu.RLock() + defer e.kvMu.RUnlock() + //TODO use trie to accelerate query + for k, v := range e.kv { + if !strings.HasPrefix(k, prefix) || k == prefix { + continue + } + key := strings.TrimLeft(k[prefixLen+1:], "/") + if strings.ContainsRune(key, '/') { + continue + } + entries = append(entries, persistence.Entry{Key: key, Value: v}) + } + return entries, nil +} + +func (e *Embedded) Close() error { + close(e.quitCh) + return nil +} diff --git a/store/engine/embedded/embedded_test.go b/store/engine/embedded/embedded_test.go new file mode 100644 index 0000000..1448afe --- /dev/null +++ b/store/engine/embedded/embedded_test.go @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package embedded + +import ( + "context" + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/apache/kvrocks-controller/consts" + + "github.com/stretchr/testify/assert" +) + +func createConfig(peers []string, path string) *Config { + peerList := make([]Peer, len(peers)) + for i, peer := range peers { + peerList[i] = Peer{ + Api: peer, + Raft: fmt.Sprintf("http://%s", peer), + } + } + + return &Config{ + Peers: peerList, + Join: false, + Path: path, + } +} + +func TestNew(t *testing.T) { + dir, _ := os.MkdirTemp("", "TestNew") + defer os.RemoveAll(dir) + cfg := createConfig([]string{"localhost:20000", "localhost:20001"}, dir) + _, err := New("localhost:20000", cfg) + assert.NoError(t, err) +} + +func TestEmbedded_Propose(t *testing.T) { + dir, _ := os.MkdirTemp("", "TestEmbedded_Propose") + defer os.RemoveAll(dir) + cfg := createConfig([]string{"localhost:20002"}, dir) + + e1, err := New("localhost:20002", cfg) + assert.NoError(t, err) + + // Wait to be the leader + <-e1.LeaderChange() + + assert.True(t, e1.IsReady(context.Background())) + e1.Propose("key", []byte("value")) +} + +func TestEmbedded_SetAndGet(t *testing.T) { + dir, _ := os.MkdirTemp("", "TestEmbedded_SetAndGet") + defer os.RemoveAll(dir) + cfg := createConfig([]string{"localhost:20004", "localhost:20005"}, dir) + + e1, err := New("localhost:20004", cfg) + assert.NoError(t, err) + e2, err := New("localhost:20005", cfg) + assert.NoError(t, err) + e := []*Embedded{e1, e2} + + leader := -1 + select { + case <-e1.LeaderChange(): + leader = 0 + case <-e2.LeaderChange(): + leader = 1 + } + + err = e[leader].Set(context.Background(), "key", []byte("value")) + assert.NoError(t, err) + + var wg sync.WaitGroup + for _, e := range []*Embedded{e1, e2} { + wg.Add(1) + go func(e *Embedded) { + defer wg.Done() + time.Sleep(time.Second) + value, err := e.Get(context.Background(), "key") + assert.NoError(t, err) + assert.Equal(t, []byte("value"), value) + }(e) + } + wg.Wait() +} + +func TestEmbedded_Delete(t *testing.T) { + dir, _ := os.MkdirTemp("", "TestEmbedded_Delete") + defer os.RemoveAll(dir) + cfg := createConfig([]string{"localhost:20006", "localhost:20007"}, dir) + + e1, err := New("localhost:20006", cfg) + assert.NoError(t, err) + e2, err := New("localhost:20007", cfg) + assert.NoError(t, err) + e := []*Embedded{e1, e2} + + leader := -1 + select { + case <-e1.LeaderChange(): + leader = 0 + case <-e2.LeaderChange(): + leader = 1 + } + + err = e[leader].Set(context.Background(), "key", []byte("value")) + assert.NoError(t, err) + + err = e[leader].Delete(context.Background(), "key") + assert.NoError(t, err) + + var wg sync.WaitGroup + for _, e := range []*Embedded{e1, e2} { + wg.Add(1) + go func(e *Embedded) { + defer wg.Done() + time.Sleep(time.Second) + _, err := e.Get(context.Background(), "key") + assert.ErrorIs(t, err, consts.ErrNotFound) + }(e) + } + wg.Wait() +} + +func TestEmbedded_List(t *testing.T) { + dir, _ := os.MkdirTemp("", "TestEmbedded_List") + defer os.RemoveAll(dir) + cfg := createConfig([]string{"localhost:20008"}, dir) + + e1, err := New("localhost:20008", cfg) + assert.NoError(t, err) + + <-e1.LeaderChange() + + err = e1.Set(context.Background(), "key1", []byte("value1")) + assert.NoError(t, err) + + err = e1.Set(context.Background(), "key2", []byte("value2")) + assert.NoError(t, err) + + time.Sleep(time.Second) + entries, err := e1.List(context.Background(), "key") + assert.NoError(t, err) + assert.Len(t, entries, 2) +} diff --git a/store/engine/embedded/raft.go b/store/engine/embedded/raft.go new file mode 100644 index 0000000..05564f3 --- /dev/null +++ b/store/engine/embedded/raft.go @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package embedded + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "net/url" + "os" + "strconv" + "time" + + "github.com/apache/kvrocks-controller/logger" + "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/raftpb" + "go.uber.org/atomic" + + "go.etcd.io/etcd/client/pkg/v3/fileutil" + "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" + "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats" + "go.etcd.io/etcd/server/v3/wal" + "go.etcd.io/etcd/server/v3/wal/walpb" + "go.uber.org/zap" +) + +type commit struct { + data []string + applyDoneC chan<- struct{} +} + +// Used for communication with the caller +type raftNotifier struct { + // inbound channels + proposeC <-chan string // proposed messages (k,v) + confChangeC <-chan raftpb.ConfChange // proposed cluster config changes + + // outbound channels + leaderChangeCh chan<- bool // leader changes + commitC chan<- *commit // entries committed to log (k,v) + errorC chan<- error // errors from raft session + snapshotterReady chan<- *snap.Snapshotter // signals when snapshotter is ready +} + +// Snapshot function to get and define what a snapshot is +type snapshotFunc func() ([]byte, error) + +// A key-value stream backed by raft +type raftNode struct { + *raftNotifier + + id int // client ID for raft session + peers []string // raft peer URLs + join bool // node is joining an existing cluster + walDir string // path to WAL directory + snapDir string // path to snapshot directory + getSnapshot snapshotFunc + + isLeader atomic.Bool + leader atomic.Uint64 + + confState raftpb.ConfState + snapshotIndex uint64 + appliedIndex uint64 + + // raft backing for the commit/error channel + node raft.Node + raftStorage *raft.MemoryStorage + wal *wal.WAL + + snapshotter *snap.Snapshotter + + snapCount uint64 + transport *rafthttp.Transport + stopCh chan struct{} // signals proposal channel closed + httpStopCh chan struct{} // signals http server to shutdown + httpDoneCh chan struct{} // signals http server shutdown complete +} + +var defaultSnapshotCount uint64 = 10000 + +// newRaftNode initiates a raft instance and returns a committed log entry +// channel and error channel. Proposals for log updates are sent over the +// provided the proposal channel. All log entries are replayed over the +// commit channel, followed by a nil message (to indicate the channel is +// current), then new log entries. To shutdown, close proposeC and read errorC. +func newRaftNode(id int, peers []string, join bool, path string, getSnapshot snapshotFunc, notifier *raftNotifier) *raftNode { + + rc := &raftNode{ + raftNotifier: notifier, + //raftNotifier: raftNotifier{ + // proposeC: proposeC, + // confChangeC: confChangeC, + // leaderChangeCh: leaderChangeCh, + // commitC: commitC, + // errorC: errorC, + // snapshotterReady: snapshotterReady, + //}, + + id: id, + peers: peers, + join: join, + walDir: fmt.Sprintf("%s/storage-%d", path, id), + snapDir: fmt.Sprintf("%s/storage-%d-snap", path, id), + getSnapshot: getSnapshot, + snapCount: defaultSnapshotCount, + stopCh: make(chan struct{}), + httpStopCh: make(chan struct{}), + httpDoneCh: make(chan struct{}), + + // rest of structure populated after WAL replay + } + rc.isLeader.Store(false) + go rc.startRaft() + return rc +} + +func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error { + walSnap := walpb.Snapshot{ + Index: snap.Metadata.Index, + Term: snap.Metadata.Term, + ConfState: &snap.Metadata.ConfState, + } + // save the snapshot file before writing the snapshot to the wal. + // This makes it possible for the snapshot file to become orphaned, but prevents + // a WAL snapshot entry from having no corresponding snapshot file. + if err := rc.snapshotter.SaveSnap(snap); err != nil { + return err + } + if err := rc.wal.SaveSnapshot(walSnap); err != nil { + return err + } + return rc.wal.ReleaseLockTo(snap.Metadata.Index) +} + +func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) { + if len(ents) == 0 { + return ents + } + firstIdx := ents[0].Index + if firstIdx > rc.appliedIndex+1 { + logger.Get().Sugar().Fatalf("first index of committed entry[%d] should <= progress.appliedIndex[%d]+1", firstIdx, rc.appliedIndex) + } + if rc.appliedIndex-firstIdx+1 < uint64(len(ents)) { + nents = ents[rc.appliedIndex-firstIdx+1:] + } + return nents +} + +// publishEntries writes committed log entries to commit channel and returns +// whether all entries could be published. +func (rc *raftNode) publishEntries(ents []raftpb.Entry) (<-chan struct{}, bool) { + if len(ents) == 0 { + return nil, true + } + + data := make([]string, 0, len(ents)) + for i := range ents { + switch ents[i].Type { + case raftpb.EntryNormal: + if len(ents[i].Data) == 0 { + // ignore empty messages + break + } + s := string(ents[i].Data) + data = append(data, s) + case raftpb.EntryConfChange: + var cc raftpb.ConfChange + _ = cc.Unmarshal(ents[i].Data) + rc.confState = *rc.node.ApplyConfChange(cc) + switch cc.Type { + case raftpb.ConfChangeAddNode: + if len(cc.Context) > 0 { + rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)}) + } + case raftpb.ConfChangeRemoveNode: + if cc.NodeID == uint64(rc.id) { + logger.Get().Info("I've been removed from the cluster! Shutting down.") + return nil, false + } + rc.transport.RemovePeer(types.ID(cc.NodeID)) + case raftpb.ConfChangeUpdateNode, raftpb.ConfChangeAddLearnerNode: + } + case raftpb.EntryConfChangeV2: + } + } + + var applyDoneC chan struct{} + + if len(data) > 0 { + applyDoneC = make(chan struct{}, 1) + select { + case rc.commitC <- &commit{data, applyDoneC}: + case <-rc.stopCh: + return nil, false + } + } + + // after commit, update appliedIndex + rc.appliedIndex = ents[len(ents)-1].Index + + return applyDoneC, true +} + +func (rc *raftNode) loadSnapshot() *raftpb.Snapshot { + if wal.Exist(rc.walDir) { + walSnaps, err := wal.ValidSnapshotEntries(logger.Get(), rc.walDir) + if err != nil { + logger.Get().With(zap.Error(err)).Fatal("Error listing snapshots") + } + snapshot, err := rc.snapshotter.LoadNewestAvailable(walSnaps) + if err != nil && !errors.Is(err, snap.ErrNoSnapshot) { + logger.Get().With(zap.Error(err)).Fatal("Error loading snapshot") + } + return snapshot + } + return &raftpb.Snapshot{} +} + +// openWAL returns a WAL ready for reading. +func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL { + if !wal.Exist(rc.walDir) { + if err := os.Mkdir(rc.walDir, 0750); err != nil { + logger.Get().With(zap.Error(err)).Fatal("Cannot create dir for WAL") + } + + w, err := wal.Create(logger.Get(), rc.walDir, nil) + if err != nil { + logger.Get().With(zap.Error(err)).Fatal("Create WAL error") + } + _ = w.Close() + } + + walSnap := walpb.Snapshot{} + if snapshot != nil { + walSnap.Index, walSnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + } + logger.Get().Sugar().Infof("Loading WAL at term %d and index %d", walSnap.Term, walSnap.Index) + w, err := wal.Open(logger.Get(), rc.walDir, walSnap) + if err != nil { + logger.Get().With(zap.Error(err)).Fatal("Error loading WAL") + } + + return w +} + +// replayWAL replays WAL entries into the raft instance. +func (rc *raftNode) replayWAL() *wal.WAL { + logger.Get().Sugar().Infof("replaying WAL of member %d", rc.id) + snapshot := rc.loadSnapshot() + w := rc.openWAL(snapshot) + _, st, ents, err := w.ReadAll() + if err != nil { + logger.Get().With(zap.Error(err)).Fatal("Failed to read WAL") + } + rc.raftStorage = raft.NewMemoryStorage() + if snapshot != nil { + _ = rc.raftStorage.ApplySnapshot(*snapshot) + } + _ = rc.raftStorage.SetHardState(st) + + // append to storage so raft starts at the right place in log + _ = rc.raftStorage.Append(ents) + + return w +} + +func (rc *raftNode) writeError(err error) { + rc.stopHTTP() + close(rc.commitC) + rc.errorC <- err + close(rc.errorC) + rc.node.Stop() +} + +func (rc *raftNode) startRaft() { + if !fileutil.Exist(rc.snapDir) { + if err := os.Mkdir(rc.snapDir, 0750); err != nil { + logger.Get().With(zap.Error(err)).Fatal("Cannot create dir for snapshot") + } + } + rc.snapshotter = snap.New(logger.Get(), rc.snapDir) + + oldwal := wal.Exist(rc.walDir) + rc.wal = rc.replayWAL() + + // signal replay has finished + rc.snapshotterReady <- rc.snapshotter + + rpeers := make([]raft.Peer, len(rc.peers)) + for i := range rpeers { + rpeers[i] = raft.Peer{ID: uint64(i + 1)} + } + c := &raft.Config{ + ID: uint64(rc.id), + ElectionTick: 10, + HeartbeatTick: 1, + Storage: rc.raftStorage, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + MaxUncommittedEntriesSize: 1 << 30, + } + + if oldwal || rc.join { + rc.node = raft.RestartNode(c) + } else { + rc.node = raft.StartNode(c, rpeers) + } + + rc.transport = &rafthttp.Transport{ + Logger: logger.Get(), + ID: types.ID(rc.id), + ClusterID: 0x1000, + Raft: rc, + ServerStats: stats.NewServerStats("", ""), + LeaderStats: stats.NewLeaderStats(logger.Get(), strconv.Itoa(rc.id)), + ErrorC: make(chan error), + + DialRetryFrequency: 1, + } + + if err := rc.transport.Start(); err != nil { + logger.Get().With(zap.Error(err)).Panic("Failed to start raft http server") + } + for i := range rc.peers { + if i+1 != rc.id { + rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) + } + } + + go rc.serveHTTP() + go rc.serveProposeChannels() + go rc.serveRaftChannels() +} + +// stop closes http, closes all channels, and stops raft. +func (rc *raftNode) stop() { + rc.stopHTTP() + close(rc.commitC) + close(rc.errorC) + rc.node.Stop() +} + +func (rc *raftNode) stopHTTP() { + rc.transport.Stop() + close(rc.httpStopCh) + <-rc.httpDoneCh +} + +func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) { + if raft.IsEmptySnap(snapshotToSave) { + return + } + + logger.Get().Sugar().Infof("Publishing snapshot at index %d", rc.snapshotIndex) + defer logger.Get().Sugar().Infof("Finished publishing snapshot at index %d", rc.snapshotIndex) + + if snapshotToSave.Metadata.Index <= rc.appliedIndex { + logger.Get().Sugar().Fatalf("snapshot index [%d] should > progress.appliedIndex [%d]", snapshotToSave.Metadata.Index, rc.appliedIndex) + } + rc.commitC <- nil // trigger kvstore to load snapshot + + rc.confState = snapshotToSave.Metadata.ConfState + rc.snapshotIndex = snapshotToSave.Metadata.Index + rc.appliedIndex = snapshotToSave.Metadata.Index +} + +var snapshotCatchUpEntriesN uint64 = 10000 + +func (rc *raftNode) maybeTriggerSnapshot(applyDoneC <-chan struct{}) { + if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount { + return + } + + // wait until all committed entries are applied (or server is closed) + if applyDoneC != nil { + select { + case <-applyDoneC: + case <-rc.stopCh: + return + } + } + + logger.Get().Sugar().Infof("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex) + data, err := rc.getSnapshot() + if err != nil { + logger.Get().With(zap.Error(err)).Panic("Failed to snapshot") + } + snapshot, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data) + if err != nil { + panic(err) + } + if err := rc.saveSnap(snapshot); err != nil { + panic(err) + } + + compactIndex := uint64(1) + if rc.appliedIndex > snapshotCatchUpEntriesN { + compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN + } + if err := rc.raftStorage.Compact(compactIndex); err != nil { + if !errors.Is(err, raft.ErrCompacted) { + panic(err) + } + } else { + logger.Get().Sugar().Infof("compacted log at index %d", compactIndex) + } + + rc.snapshotIndex = rc.appliedIndex +} + +func (rc *raftNode) serveRaftChannels() { + snapshot, err := rc.raftStorage.Snapshot() + if err != nil { + panic(err) + } + rc.confState = snapshot.Metadata.ConfState + rc.snapshotIndex = snapshot.Metadata.Index + rc.appliedIndex = snapshot.Metadata.Index + + defer rc.wal.Close() + + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + // event loop on raft state machine updates + for { + select { + case <-ticker.C: + rc.node.Tick() + + // store raft entries to wal, then publish over commit channel + case rd := <-rc.node.Ready(): + if rd.SoftState != nil { + isLeader := rd.RaftState == raft.StateLeader + rc.leader.Store(rd.Lead) + if rc.isLeader.CAS(!isLeader, isLeader) { + rc.leaderChangeCh <- isLeader + } + } + // Must save the snapshot file and WAL snapshot entry before saving any other entries + // or hardstate to ensure that recovery after a snapshot restore is possible. + if !raft.IsEmptySnap(rd.Snapshot) { + _ = rc.saveSnap(rd.Snapshot) + } + _ = rc.wal.Save(rd.HardState, rd.Entries) + if !raft.IsEmptySnap(rd.Snapshot) { + _ = rc.raftStorage.ApplySnapshot(rd.Snapshot) + rc.publishSnapshot(rd.Snapshot) + } + _ = rc.raftStorage.Append(rd.Entries) + rc.transport.Send(rc.processMessages(rd.Messages)) + applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)) + if !ok { + rc.stop() + return + } + rc.maybeTriggerSnapshot(applyDoneC) + rc.node.Advance() + + case err := <-rc.transport.ErrorC: + rc.writeError(err) + return + + case <-rc.stopCh: + rc.stop() + return + } + } +} + +// serveProposeChannels continuously reads proposals from proposeC and +// confChangeC, and send to peers. +func (rc *raftNode) serveProposeChannels() { + confChangeCount := uint64(0) + + for rc.proposeC != nil && rc.confChangeC != nil { + select { + case prop, ok := <-rc.proposeC: + if !ok { + rc.proposeC = nil + } else { + // blocks until accepted by raft state machine + _ = rc.node.Propose(context.TODO(), []byte(prop)) + } + + case cc, ok := <-rc.confChangeC: + if !ok { + rc.confChangeC = nil + } else { + confChangeCount++ + cc.ID = confChangeCount + _ = rc.node.ProposeConfChange(context.TODO(), cc) + } + } + } + // client closed channel; shutdown raft if not already + close(rc.stopCh) +} + +// When there is a `raftpb.EntryConfChange` after creating the snapshot, +// then the confState included in the snapshot is out of date. so We need +// to update the confState before sending a snapshot to a follower. +func (rc *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message { + for i := 0; i < len(ms); i++ { + if ms[i].Type == raftpb.MsgSnap { + ms[i].Snapshot.Metadata.ConfState = rc.confState + } + } + return ms +} + +func (rc *raftNode) serveHTTP() { + hostUrl, err := url.Parse(rc.peers[rc.id-1]) + if err != nil { + logger.Get().With(zap.Error(err)).Fatal("Failed parsing URL") + } + + ln, err := net.Listen("tcp", hostUrl.Host) + if err != nil { + logger.Get().With(zap.Error(err)).Fatal("Failed to listen rafthttp") + } + + err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln) + select { + case <-rc.httpStopCh: + _ = ln.Close() + default: + logger.Get().With(zap.Error(err)).Fatal("Failed to serve rafthttp") + } + close(rc.httpDoneCh) +} + +func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error { + return rc.node.Step(ctx, m) +} +func (rc *raftNode) IsIDRemoved(_ uint64) bool { return false } +func (rc *raftNode) ReportUnreachable(id uint64) { rc.node.ReportUnreachable(id) } +func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) { + rc.node.ReportSnapshot(id, status) +} diff --git a/store/engine/embedded/raft_test.go b/store/engine/embedded/raft_test.go new file mode 100644 index 0000000..9518660 --- /dev/null +++ b/store/engine/embedded/raft_test.go @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package embedded + +import ( + "fmt" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.uber.org/atomic" +) + +func mockRaftNode(count int, path string, basePort int) ([]*raftNode, []chan string, []chan bool, []chan *commit) { + nodes := make([]*raftNode, count) + snapshotterReadyList := make([]chan *snap.Snapshotter, count) + proposeChList := make([]chan string, count) + leaderChangeChList := make([]chan bool, count) + commitChList := make([]chan *commit, count) + peers := make([]string, count) + for i := 0; i < count; i++ { + peers[i] = fmt.Sprintf("http://127.0.0.1:%d", basePort+i) + } + + for i := 0; i < count; i++ { + snapshotterReadyList[i] = make(chan *snap.Snapshotter, 1) + proposeChList[i] = make(chan string) + leaderChangeChList[i] = make(chan bool) + commitChList[i] = make(chan *commit) + notifier := &raftNotifier{ + proposeChList[i], + make(chan raftpb.ConfChange), + leaderChangeChList[i], + commitChList[i], + make(chan error), + snapshotterReadyList[i], + } + nodes[i] = newRaftNode(i+1, peers, false, path, func() ([]byte, error) { + return nil, nil + }, notifier) + } + for i := 0; i < count; i++ { + <-snapshotterReadyList[i] + } + return nodes, proposeChList, leaderChangeChList, commitChList +} + +func TestRaftNode_processMessages(t *testing.T) { + dir, _ := os.MkdirTemp("", "TestRaftNode_processMessages") + defer os.RemoveAll(dir) + + //nolint:dogsled + nodes, _, _, _ := mockRaftNode(1, dir, 10000) + node := nodes[0] + msgs := []raftpb.Message{ + { + Type: raftpb.MsgSnap, + Snapshot: raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{ + ConfState: raftpb.ConfState{Voters: []uint64{1, 2, 3}}, + }, + }, + }, + { + Type: raftpb.MsgProp, + }, + } + + expected := raftpb.ConfState{Voters: []uint64{1, 2, 3}} + node.confState = expected + + result := node.processMessages(msgs) + + assert.Equal(t, result[0].Snapshot.Metadata.ConfState, expected) +} + +func TestRaftNode_saveSnap(t *testing.T) { + dir, _ := os.MkdirTemp("", "TestRaftNode_saveSnap") + defer os.RemoveAll(dir) + //nolint:dogsled + nodes, _, _, _ := mockRaftNode(1, dir, 10001) + node := nodes[0] + + snapshot := raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{ + Index: 1, + Term: 1, + ConfState: raftpb.ConfState{Voters: []uint64{1, 2, 3}}, + }, + Data: []byte("test data"), + } + + // Save the snapshot + err := node.saveSnap(snapshot) + assert.NoError(t, err, "Failed to save snapshot") + + _, err = os.Stat(filepath.Join(dir, fmt.Sprintf("storage-%d-snap/%016x-%016x.snap", 1, 1, 1))) + assert.NoError(t, err, "Cannot find saved snapshot") + + _, err = os.Stat(filepath.Join(dir, fmt.Sprintf("storage-%d/%016x-%016x.wal", 1, 0, 0))) + assert.NoError(t, err, "Cannot find saved wal") + + savedSnap, err := node.snapshotter.Load() + assert.NoError(t, err, "Failed to load snapshot") + + assert.Equal(t, snapshot, *savedSnap) +} + +func TestRaftNode_EventualConsistency(t *testing.T) { + tests := []struct { + name string + count int + }{ + {"single", 1}, + {"double", 2}, + {"triple", 3}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dir, _ := os.MkdirTemp("", fmt.Sprintf("TestRaftNode_EventualConsistency_%s", test.name)) + defer os.RemoveAll(dir) + + // Create two mock raftNodes + prefixSum := test.count * (test.count - 1) / 2 + _, proposeChList, leaderChangeChList, commitChList := mockRaftNode(test.count, dir, 10002+prefixSum) + + // Define the data to be proposed + data := []string{"data1-1", "data1-2", "data1-3"} + leader := atomic.NewInt64(-1) + for i := 0; i < test.count; i++ { + go func(i int64) { + for range leaderChangeChList[i] { + leader.Store(i) + } + }(int64(i)) + } + + for leader.Load() < 0 { + time.Sleep(10 * time.Millisecond) + } + + // Start two goroutines to propose data + for _, column := range data { + proposeChList[leader.Load()] <- column + } + + // Start two goroutines to read commits + var commits [][]string + for i := 0; i < test.count; i++ { + commits = append(commits, make([]string, 0, len(data))) + } + + for i, ch := range commitChList { + commits[i] = append(commits[i], (<-ch).data...) + } + + // Check the consistency of all data + sort.Strings(commits[0]) + assert.NotEmpty(t, commits[0]) + for i := 1; i < test.count; i++ { + sort.Strings(commits[i]) + assert.Equal(t, commits[0], commits[i]) + } + }) + } +} diff --git a/store/engine/etcd/etcd.go b/store/engine/etcd/etcd.go index 4920cfa..455f571 100644 --- a/store/engine/etcd/etcd.go +++ b/store/engine/etcd/etcd.go @@ -27,9 +27,9 @@ import ( "sync" "time" + "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" - "go.etcd.io/etcd/pkg/transport" "go.uber.org/atomic" "go.uber.org/zap"