From 69ca95e58e1b962324943569a9428f3cef2c7cf0 Mon Sep 17 00:00:00 2001 From: Nurzhan Saktaganov Date: Mon, 2 Sep 2024 15:56:42 +0300 Subject: [PATCH] implement new test framework with real tarantools (#54) * build vshard cluster * bootstrap cluster by lua vshard router * run go tests over vshard cluster --- CHANGELOG.md | 1 + Makefile | 3 + tests/tnt/Makefile | 57 +++++++++ tests/tnt/README.md | 12 ++ tests/tnt/cfgmaker.go | 73 +++++++++++ tests/tnt/cfgmaker.lua | 118 +++++++++++++++++ tests/tnt/concurrent_topology_test.go | 178 ++++++++++++++++++++++++++ tests/tnt/router.lua | 29 +++++ tests/tnt/storage.lua | 147 +++++++++++++++++++++ tests/tnt/tnt_test.go | 64 +++++++++ 10 files changed, 682 insertions(+) create mode 100644 tests/tnt/Makefile create mode 100644 tests/tnt/README.md create mode 100644 tests/tnt/cfgmaker.go create mode 100644 tests/tnt/cfgmaker.lua create mode 100644 tests/tnt/concurrent_topology_test.go create mode 100644 tests/tnt/router.lua create mode 100644 tests/tnt/storage.lua create mode 100644 tests/tnt/tnt_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index af234b2..cf558f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ FEATURES: * Added etcd v2 topology provider implementation (#16) * Add TopologyController mock for testing improve * Add linter job (#33) +* New test framework with real tarantools REFACTOR: diff --git a/Makefile b/Makefile index 80a0c67..5a19a8e 100644 --- a/Makefile +++ b/Makefile @@ -37,3 +37,6 @@ testrace: BUILD_TAGS+=testonly testrace: @CGO_ENABLED=1 \ $(GO_CMD) test -tags='$(BUILD_TAGS)' -race -timeout=$(EXTENDED_TEST_TIMEOUT) -parallel=20 + +test/tnt: + @$(MAKE) -C ./tests/tnt diff --git a/tests/tnt/Makefile b/tests/tnt/Makefile new file mode 100644 index 0000000..8e82e1b --- /dev/null +++ b/tests/tnt/Makefile @@ -0,0 +1,57 @@ +NREPLICASETS?=5 +START_PORT?=33000 + +RED=\033[0;31m +GREEN=\033[0;32m +YELLOW=\033[0;33m +NC=\033[0m # No Color + +default: run + +all: default + +run: | clean cluster-up bootstrap gotest cluster-down + +# cleanup tmp working directory +clean: + @echo "${GREEN}STAGE: CLEANUP${NC}" + rm -rf tmp + +# prepare vshard-storages, that contains ${NREPLICASETS} replicasets. +# every replicaset has one master and one follower instance. +# every replicaset runs in background mode, no logs are stored (/dev/null) +cluster-up: + @echo "${GREEN}STAGE: CLUSTER UP${NC}" + mkdir -p tmp + rsid=1 ; while [[ $$rsid -le ${NREPLICASETS} ]] ; do \ + mkdir -p tmp/$${rsid}/master; \ + mkdir -p tmp/$${rsid}/follower; \ + ln -sf `(pwd)`/storage.lua tmp/$${rsid}/master/storage_$${rsid}_master.lua; \ + ln -sf `(pwd)`/cfgmaker.lua tmp/$${rsid}/master/cfgmaker.lua; \ + ln -sf `(pwd)`/storage.lua tmp/$${rsid}/follower/storage_$${rsid}_follower.lua; \ + ln -sf `(pwd)`/cfgmaker.lua tmp/$${rsid}/follower/cfgmaker.lua; \ + TT_WORK_DIR=tmp/$${rsid}/master/ TT_PID_FILE="tarantool.pid" TT_BACKGROUND=true START_PORT=${START_PORT} TT_LOG=/dev/null NREPLICASETS=${NREPLICASETS} tarantool tmp/$${rsid}/master/storage_$${rsid}_master.lua; \ + TT_WORK_DIR=tmp/$${rsid}/follower/ TT_PID_FILE="tarantool.pid" TT_BACKGROUND=true START_PORT=${START_PORT} TT_LOG=/dev/null NREPLICASETS=${NREPLICASETS} tarantool tmp/$${rsid}/follower/storage_$${rsid}_follower.lua; \ + ((rsid = rsid + 1)) ; \ + done + +# bootstrap vshard cluster using lua vshard.router +bootstrap: + @echo "${GREEN}STAGE: BOOTSTRAP CLUSTER${NC}" + mkdir -p tmp/router_work_dir + TT_WORK_DIR=tmp/router_work_dir/ NREPLICASETS=${NREPLICASETS} START_PORT=${START_PORT} tarantool router.lua + +# stop vshard storage tarantool +cluster-down: + @echo "${GREEN}STAGE: CLUSTER DOWN${NC}" + rsid=1 ; while [[ $$rsid -le ${NREPLICASETS} ]] ; do \ + kill -9 `cat tmp/$${rsid}/master/tarantool.pid`; \ + kill -9 `cat tmp/$${rsid}/follower/tarantool.pid`; \ + ((rsid = rsid + 1)) ; \ + done + +# run go tests, minus "-" signs before command allows failures, otherwise cluster-down stage won't run. +gotest: + @echo "${GREEN}STAGE: RUN GOTESTS${NC}" + -NREPLICASETS=${NREPLICASETS} START_PORT=${START_PORT} go test -race -parallel=20 -coverpkg="../../" -coverprofile cover.out -timeout=90s +# go tool cover -html=cover.out diff --git a/tests/tnt/README.md b/tests/tnt/README.md new file mode 100644 index 0000000..47e7ffc --- /dev/null +++ b/tests/tnt/README.md @@ -0,0 +1,12 @@ +# A framework to test go-vshard-router module using real tarantools + +## Requirements + +Your system must have: +- an installed tarantool that supports vshard (1.9+) +- an installed tnt vshard library (`local vshard = require('vshard')` must work) + +The next will run all go-tests in this directory +```bash +make run +``` diff --git a/tests/tnt/cfgmaker.go b/tests/tnt/cfgmaker.go new file mode 100644 index 0000000..0dc66fc --- /dev/null +++ b/tests/tnt/cfgmaker.go @@ -0,0 +1,73 @@ +package tnt_test + +import ( + "fmt" + + vshardrouter "github.com/KaymeKaydex/go-vshard-router" + "github.com/google/uuid" +) + +type cfgmaker struct { + startPort int + nreplicasets int +} + +func (c cfgmaker) getUUID(rsID int, n int) uuid.UUID { + const uuidTemplate = "00000000-0000-%04d-%04d-000000000000" + + uuidStr := fmt.Sprintf(uuidTemplate, rsID, n) + + uuid, err := uuid.Parse(uuidStr) + if err != nil { + panic(err) + } + + return uuid +} + +func (c cfgmaker) replicasetUUID(rsID int) uuid.UUID { + return c.getUUID(rsID, 0) +} + +func (c cfgmaker) masterUUID(rsID int) uuid.UUID { + return c.getUUID(rsID, 1) +} + +func (c cfgmaker) followerUUID(rsID int) uuid.UUID { + return c.getUUID(rsID, 2) +} + +func (c cfgmaker) getInstanceAddr(port int) string { + const addrTemplate = "127.0.0.1:%d" + + return fmt.Sprintf(addrTemplate, port) +} + +func (c cfgmaker) masterAddr(rsID int) string { + port := c.startPort + 2*(rsID-1) + return c.getInstanceAddr(port) +} + +func (c cfgmaker) followerAddr(rsID int) string { + port := c.startPort + 2*(rsID-1) + 1 + return c.getInstanceAddr(port) +} + +func (c cfgmaker) clusterCfg() map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo { + cfg := make(map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo) + + for rsID := 1; rsID <= c.nreplicasets; rsID++ { + cfg[vshardrouter.ReplicasetInfo{ + Name: fmt.Sprintf("replicaset_%d", rsID), + UUID: c.replicasetUUID(rsID), + }] = []vshardrouter.InstanceInfo{{ + Addr: c.masterAddr(rsID), + UUID: c.masterUUID(rsID), + }, { + Addr: c.followerAddr(rsID), + UUID: c.followerUUID(rsID), + }} + } + + return cfg +} diff --git a/tests/tnt/cfgmaker.lua b/tests/tnt/cfgmaker.lua new file mode 100644 index 0000000..ec0f434 --- /dev/null +++ b/tests/tnt/cfgmaker.lua @@ -0,0 +1,118 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +local config_example = { + sharding = { + ['cbf06940-0790-498b-948d-042b62cf3d29'] = { -- replicaset #1 + replicas = { + ['8a274925-a26d-47fc-9e1b-af88ce939412'] = { + uri = 'storage:storage@127.0.0.1:3301', + name = 'storage_1_a', + master = true + }, + ['3de2e3e1-9ebe-4d0d-abb1-26d301b84633'] = { + uri = 'storage:storage@127.0.0.1:3302', + name = 'storage_1_b' + } + }, + }, -- replicaset #1 + ['ac522f65-aa94-4134-9f64-51ee384f1a54'] = { -- replicaset #2 + replicas = { + ['1e02ae8a-afc0-4e91-ba34-843a356b8ed7'] = { + uri = 'storage:storage@127.0.0.1:3303', + name = 'storage_2_a', + master = true + }, + ['001688c3-66f8-4a31-8e19-036c17d489c2'] = { + uri = 'storage:storage@127.0.0.1:3304', + name = 'storage_2_b' + } + }, + }, -- replicaset #2 + }, -- sharding + replication_connect_quorum = 0, +} + +local function get_uid(rs_id, instance_id) + local uuid_template = "00000000-0000-%04d-%04d-000000000000" + + return string.format(uuid_template, rs_id, instance_id) +end + +local function replicaset_uuid(rs_id) + return get_uid(rs_id, 0) +end + +local function master_replica_uuid(rs_id) + return get_uid(rs_id, 1) +end + +local function follower_replica_uuid(rs_id) + return get_uid(rs_id, 2) +end + +local function master_replica_name(rs_id) + return string.format("storage_%d_master", rs_id) +end + +local function follower_replica_name(rs_id) + return string.format("storage_%d_follower", rs_id) +end + +local function replica_cfg(start_port, rs_id, is_master) + local uri_template = 'storage:storage@127.0.0.1:%d' + local port, name + if is_master then + port = start_port + 2 * (rs_id - 1) -- multiple to 2 because there are 2 instances per replicaset + name = master_replica_name(rs_id) + else + port = start_port + 2 * (rs_id - 1) + 1 + name = follower_replica_name(rs_id) + end + + return { + uri = string.format(uri_template, port), + name = name, + master = is_master, + } +end + +local function master_replica_cfg(start_port, rs_id) + return replica_cfg(start_port, rs_id, true) +end + +local function follower_replica_cfg(start_port, rs_id) + return replica_cfg(start_port, rs_id, false) +end + +local function clustercfg(start_port, nreplicasets) + local cfg = { + sharding = {}, + replication_connect_quorum = 0, + } + + for rs_id = 1, nreplicasets do + local master_uuid = master_replica_uuid(rs_id) + local follower_uuid = follower_replica_uuid(rs_id) + + local replicas = { + [master_uuid] = master_replica_cfg(start_port, rs_id), + [follower_uuid] = follower_replica_cfg(start_port, rs_id), + } + + local rs_uuid = replicaset_uuid(rs_id) + + cfg.sharding[rs_uuid] = { + replicas = replicas, + } + end + + return cfg +end + +return { + clustercfg = clustercfg, + master_uuid = master_replica_uuid, + follower_uuid = follower_replica_uuid, +} diff --git a/tests/tnt/concurrent_topology_test.go b/tests/tnt/concurrent_topology_test.go new file mode 100644 index 0000000..624234a --- /dev/null +++ b/tests/tnt/concurrent_topology_test.go @@ -0,0 +1,178 @@ +package tnt_test + +import ( + "context" + "fmt" + "log" + "math/rand" + "sync" + "testing" + "time" + + vshardrouter "github.com/KaymeKaydex/go-vshard-router" +) + +type concurrentTopologyProvider struct { + done chan struct{} + closed chan struct{} +} + +func (c *concurrentTopologyProvider) Init(tc vshardrouter.TopologyController) error { + ctx := context.Background() + cfg := getCfg() + + if err := tc.AddReplicasets(ctx, cfg); err != nil { + panic(err) + } + + c.done = make(chan struct{}) + c.closed = make(chan struct{}) + + added := cfg + removed := make(map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo) + + go func() { + defer close(c.closed) + //nolint:errcheck + defer tc.AddReplicasets(ctx, removed) + + type actiont int + + const add actiont = 0 + const remove actiont = 1 + + for { + select { + case <-c.done: + return + default: + } + + canAdd := len(removed) > 0 + canRemove := len(added) > 0 + + var action actiont + + switch { + case canAdd && canRemove: + //nolint:gosec + action = actiont(rand.Int() % 2) + case canAdd: + action = add + case canRemove: + action = remove + default: + panic(fmt.Sprintf("unreachable case: %v, %v", added, removed)) + } + + switch action { + case add: + var keys []vshardrouter.ReplicasetInfo + for k := range removed { + keys = append(keys, k) + } + //nolint:gosec + key := keys[rand.Int()%len(keys)] + + added[key] = removed[key] + delete(removed, key) + + _ = tc.AddReplicaset(ctx, key, added[key]) + case remove: + var keys []vshardrouter.ReplicasetInfo + for k := range added { + keys = append(keys, k) + } + //nolint:gosec + key := keys[rand.Int()%len(keys)] + + removed[key] = added[key] + delete(added, key) + + _ = tc.RemoveReplicaset(ctx, key.UUID) + default: + panic("unreachable case") + } + } + }() + + return nil +} + +func (c *concurrentTopologyProvider) Close() { + close(c.done) + <-c.closed +} + +func TestConncurrentTopologyChange(t *testing.T) { + /* What we do: + 1) Addreplicaset + Removereplicaset by random in one goroutine + 2) Call ReplicaCall, MapRw and etc. in another goroutines + */ + + if !isCorrectRun() { + log.Printf("Incorrect run of tnt-test framework") + return + } + + t.Parallel() + + tc := &concurrentTopologyProvider{} + + router, err := vshardrouter.NewRouter(context.Background(), vshardrouter.Config{ + TopologyProvider: tc, + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: "storage", + Password: "storage", + }) + if err != nil { + panic(err) + } + + wg := sync.WaitGroup{} + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + default: + } + + //nolint:gosec + bucketID := uint64((rand.Int() % totalBucketCount) + 1) + args := []interface{}{"arg1"} + + _, _, _ = router.RouterCallImpl(ctx, bucketID, vshardrouter.CallOpts{}, "echo", args) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + default: + } + + args := []interface{}{"arg1"} + _, _ = router.RouterMapCallRWImpl(ctx, "echo", args, vshardrouter.CallOpts{}) + } + }() + + wg.Wait() + + // is router.Close method required? + tc.Close() +} diff --git a/tests/tnt/router.lua b/tests/tnt/router.lua new file mode 100644 index 0000000..a498acd --- /dev/null +++ b/tests/tnt/router.lua @@ -0,0 +1,29 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +local math = require('math') +local os = require('os') +local vshard = require('vshard') + +local function getenvint(envname) + local v = tonumber(os.getenv(envname) or 0) + v = math.floor(v or 0) + assert(v > 0) + return v +end + +local nreplicasets = getenvint('NREPLICASETS') +local start_port = getenvint('START_PORT') + +local cfgmaker = dofile('cfgmaker.lua') + +local clustercfg = cfgmaker.clustercfg(start_port, nreplicasets) + +clustercfg['bucket_count'] = 100 + +local router = vshard.router.new('router', clustercfg) + +router:bootstrap({timeout = 4, if_not_bootstrapped = true}) + +os.exit(0) diff --git a/tests/tnt/storage.lua b/tests/tnt/storage.lua new file mode 100644 index 0000000..96983eb --- /dev/null +++ b/tests/tnt/storage.lua @@ -0,0 +1,147 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +local box = require('box') +local debug = require('debug') +local math = require('math') +local os = require('os') + +-- vshard must be global, not local. Otherwise it does not work. +vshard = require('vshard') + +local function getenvint(envname) + local v = tonumber(os.getenv(envname) or 0) + v = math.floor(v or 0) + assert(v > 0) + return v +end + +local nreplicasets = getenvint('NREPLICASETS') +local start_port = getenvint('START_PORT') + +local source = debug.getinfo(1,"S").source + +local is_master = false +local filename = string.match(source, "/storage_%d+_master.lua$") +if filename then + is_master = true +else + filename = string.match(source, "/storage_%d+_follower.lua$") +end +assert(filename ~= nil) + +local rs_id = tonumber(string.match(filename, "%d+") or 0) +assert(rs_id > 0) + +local cfgmaker = dofile('cfgmaker.lua') + +local clustercfg = cfgmaker.clustercfg(start_port, nreplicasets) +local instance_uuid +if is_master then + instance_uuid = cfgmaker.master_uuid(rs_id) +else + instance_uuid = cfgmaker.follower_uuid(rs_id) +end + + +vshard.storage.cfg(clustercfg, instance_uuid) + +-- everything below is copypasted from storage.lua in vshard example: +-- https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/example/storage.lua +box.once("testapp:schema:1", function() + local customer = box.schema.space.create('customer') + customer:format({ + {'customer_id', 'unsigned'}, + {'bucket_id', 'unsigned'}, + {'name', 'string'}, + }) + customer:create_index('customer_id', {parts = {'customer_id'}}) + customer:create_index('bucket_id', {parts = {'bucket_id'}, unique = false}) + + local account = box.schema.space.create('account') + account:format({ + {'account_id', 'unsigned'}, + {'customer_id', 'unsigned'}, + {'bucket_id', 'unsigned'}, + {'balance', 'unsigned'}, + {'name', 'string'}, + }) + account:create_index('account_id', {parts = {'account_id'}}) + account:create_index('customer_id', {parts = {'customer_id'}, unique = false}) + account:create_index('bucket_id', {parts = {'bucket_id'}, unique = false}) + box.snapshot() + + box.schema.func.create('customer_lookup') + box.schema.role.grant('public', 'execute', 'function', 'customer_lookup') + box.schema.func.create('customer_add') + box.schema.role.grant('public', 'execute', 'function', 'customer_add') + box.schema.func.create('echo') + box.schema.role.grant('public', 'execute', 'function', 'echo') + box.schema.func.create('sleep') + box.schema.role.grant('public', 'execute', 'function', 'sleep') + box.schema.func.create('raise_luajit_error') + box.schema.role.grant('public', 'execute', 'function', 'raise_luajit_error') + box.schema.func.create('raise_client_error') + box.schema.role.grant('public', 'execute', 'function', 'raise_client_error') +end) + + +function customer_add(customer) + box.begin() + box.space.customer:insert({customer.customer_id, customer.bucket_id, + customer.name}) + for _, account in ipairs(customer.accounts) do + box.space.account:insert({ + account.account_id, + customer.customer_id, + customer.bucket_id, + 0, + account.name + }) + end + box.commit() + return true +end + +function customer_lookup(customer_id) + if type(customer_id) ~= 'number' then + error('Usage: customer_lookup(customer_id)') + end + + local customer = box.space.customer:get(customer_id) + if customer == nil then + return nil + end + customer = { + customer_id = customer.customer_id; + name = customer.name; + } + local accounts = {} + for _, account in box.space.account.index.customer_id:pairs(customer_id) do + table.insert(accounts, { + account_id = account.account_id; + name = account.name; + balance = account.balance; + }) + end + customer.accounts = accounts; + return customer +end + +function echo(...) + return ... +end + +function sleep(time) + fiber.sleep(time) + return true +end + +function raise_luajit_error() + assert(1 == 2) +end + +function raise_client_error() + box.error(box.error.UNKNOWN) +end diff --git a/tests/tnt/tnt_test.go b/tests/tnt/tnt_test.go new file mode 100644 index 0000000..2d26397 --- /dev/null +++ b/tests/tnt/tnt_test.go @@ -0,0 +1,64 @@ +package tnt_test + +import ( + "fmt" + "os" + "strconv" + "testing" + + vshardrouter "github.com/KaymeKaydex/go-vshard-router" +) + +const ( + totalBucketCount = 100 + + envNreplicasetsKey = "NREPLICASETS" + envStartPortKey = "START_PORT" +) + +func getEnvInt(key string) int { + vStr := os.Getenv(key) + + v, err := strconv.Atoi(vStr) + if err != nil { + panic(err) + } + + if v <= 0 { + panic(fmt.Sprintf("ENV '%s' invalied: '%s'", key, vStr)) + } + + return v +} + +func isCorrectRun() bool { + if len(os.Getenv(envNreplicasetsKey)) == 0 || len(os.Getenv(envStartPortKey)) == 0 { + return false + } + + return true +} + +func getCfg() map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo { + c := cfgmaker{ + nreplicasets: getEnvInt(envNreplicasetsKey), + startPort: getEnvInt(envStartPortKey), + } + + return c.clusterCfg() +} + +func TestConcurrentRouterCall(t *testing.T) { + /* TODO + 1) Invalidate some random bucket id + 2) concurrent call of replicalcall + */ + _ = t +} + +func TestRetValues(t *testing.T) { + /* TODO + 1) Replicacall returns no value, 1 value, 2 values, 3 values, etc..., assert, lua error? + */ + _ = t +}