Skip to content

Commit

Permalink
Pinger test (#6)
Browse files Browse the repository at this point in the history
* pinger test for mailru/activerecord#28
  • Loading branch information
ebirukov authored Jan 15, 2024
1 parent e3383a0 commit 6b8f104
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 23 deletions.
117 changes: 97 additions & 20 deletions database/test/tarantool/pinger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ func TestConnectFailover(t *testing.T) {

arConnTimeout := 200 * time.Millisecond

pingInterval := 500 * time.Millisecond

// включаем микросекунды в std логере
log.SetFlags(log.LstdFlags | log.Lmicroseconds)

Expand Down Expand Up @@ -63,8 +61,7 @@ func TestConnectFailover(t *testing.T) {
"arcfg/Timeout": arConnTimeout,
})

pinger := activerecord.NewPinger(activerecord.WithPingInterval(pingInterval))
defer pinger.StopWatch()
pinger := NewBasicConnectionChecker()

logger := activerecord.NewLogger()
logger.SetLogLevel(activerecord.ErrorLoggerLevel)
Expand All @@ -75,11 +72,15 @@ func TestConnectFailover(t *testing.T) {
activerecord.WithConnectionPinger(pinger),
)

_, err = activerecord.AddClusterChecker(ctx, cfgName, octopus.ClusterConfigParams)
_, err = activerecord.AddClusterChecker(ctx, cfgName, activerecord.ClusterConfigParameters{
Globs: octopus.DefaultConnectionParams,
OptionCreator: octopus.DefaultOptionCreator,
OptionChecker: octopus.CheckShardInstance,
})
require.NoError(t, err)

// проверяем типы и состав узлов кластера после загрузки конфигурации
instances := pinger.ObservedInstances(cfgName)
instances := pinger.ObservedInstances()
// все инстансы из конфигурации (включая несуществующие fakehost)
require.Len(t, instances, 5)

Expand All @@ -101,18 +102,18 @@ func TestConnectFailover(t *testing.T) {
for g := 0; g < 8; g++ {
g := g
eg.Go(func() error {
for i := 0; i < 1000; i++ {
for i := 0; i < 5000; i++ {
st := time.Now()
// чтобы не тротлить пул
time.Sleep(800 * time.Microsecond)
err = nil //lua_procedure.Execute(ctx, "return box.info.status", activerecord.ReplicaOrMasterInstanceType)
err = execute(ctx, cfgName)

if err != nil {
// подождем немного и попробуем сделать еще запрос
time.Sleep(10 * time.Millisecond)
st := time.Now()

err = nil //lua_procedure.Execute(ctx, "return box.info.status", activerecord.ReplicaOrMasterInstanceType)
err = execute(ctx, cfgName)
if qDuration(st) > 5 {
log.Printf("'%d' long query %d, time: %d ms\n", g, i, qDuration(st))
}
Expand Down Expand Up @@ -144,11 +145,10 @@ func TestConnectFailover(t *testing.T) {
}

// останавливаем мастер ноду
require.NoError(t, master.Stop(ctx))
// подождем пока пингер актуализирует кластер после остановки ноды
time.Sleep(pingInterval)
require.NoError(t, master.Terminate(ctx))
require.NoError(t, pinger.check(ctx))

instances = pinger.ObservedInstances(cfgName)
instances = pinger.ObservedInstances()
// проверяем что остановленный мастер пропал из доступных узлов
masters := availableInstances(instances, activerecord.ModeMaster)
require.Len(t, masters, 0)
Expand All @@ -159,16 +159,17 @@ func TestConnectFailover(t *testing.T) {

// останавливаем одну реплику (но в конфигурации активрекорд она по прежнему присутствует)
require.NoError(t, replica3.Stop(ctx))
// подождем пока пингер актуализирует кластер после остановки ноды
time.Sleep(pingInterval)
require.NoError(t, pinger.check(ctx))

instances = pinger.ObservedInstances(cfgName)
instances = pinger.ObservedInstances()
replicas = availableInstances(instances, activerecord.ModeReplica)
// осталась одна доступная реплика
require.Len(t, replicas, 1)
require.Equal(t, rHost1, replicas[0].Config.Addr)

require.NoError(t, master.Start(ctx))
// поднимает контейнер с мастером БД
master, err = tarantool.RunContainer(ctx, tarantool.WithTarantool15("tarantool/tarantool:1.5", time.Second))
require.NoError(t, err)
masterHost, err := master.ServerHostPort(ctx)
require.NoError(t, err)

Expand All @@ -179,11 +180,12 @@ func TestConnectFailover(t *testing.T) {
"arcfg/Timeout": arConnTimeout,
})

// подождем пока пингер актуализирует кластер после остановки ноды
time.Sleep(pingInterval)
log.Println("update arconfig")

require.NoError(t, pinger.check(ctx))

// обновленная конфигурация состоит из 2 узлов
instances = pinger.ObservedInstances(cfgName)
instances = pinger.ObservedInstances()
require.Len(t, instances, 2)

masters = availableInstances(instances, activerecord.ModeMaster)
Expand All @@ -199,6 +201,19 @@ func TestConnectFailover(t *testing.T) {
eg.Wait()
}

func execute(ctx context.Context, path string) error {
box, err := octopus.Box(ctx, 0, activerecord.ReplicaOrMasterInstanceType, path, nil)
if err != nil {
return err
}
_, err = octopus.CallLua(ctx, box, "box.dostring", "return box.info.status")
if err != nil {
return err
}

return nil
}

func availableInstances(instances []activerecord.ShardInstance, modeType activerecord.ServerModeType) []activerecord.ShardInstance {
ret := make([]activerecord.ShardInstance, 0, len(instances))
for _, instance := range instances {
Expand All @@ -210,6 +225,68 @@ func availableInstances(instances []activerecord.ShardInstance, modeType activer
return ret
}

type TestConnectionChecker struct {
checkParams activerecord.ClusterConfigParameters
cfgName string
instances []activerecord.ShardInstance
}

func (c *TestConnectionChecker) AddClusterChecker(ctx context.Context, cfgName string, params activerecord.ClusterConfigParameters) (*activerecord.Cluster, error) {
c.checkParams = params
c.cfgName = cfgName

return nil, c.check(ctx)
}

func NewBasicConnectionChecker() *TestConnectionChecker {
return &TestConnectionChecker{}
}

func (c *TestConnectionChecker) check(ctx context.Context) error {
clusterConfig, err := activerecord.ConfigCacher().Get(ctx, c.cfgName, c.checkParams.Globs, c.checkParams.OptionCreator)
if err != nil {
return fmt.Errorf("can't load cluster info: %w", err)
}

for i := 0; i < clusterConfig.Shards(); i++ {
curInstances := clusterConfig.ShardInstances(i)

var actualShard activerecord.Shard

for _, si := range curInstances {
opts, connErr := c.checkParams.OptionChecker(ctx, si)

if opts != nil {
si.Config.Mode = opts.InstanceMode()
}

instance := activerecord.ShardInstance{
ParamsID: si.ParamsID,
Config: si.Config,
Options: si.Options,
Offline: connErr != nil,
}

switch instance.Config.Mode {
case activerecord.ModeMaster:
actualShard.Masters = append(actualShard.Masters, instance)
case activerecord.ModeReplica:
actualShard.Replicas = append(actualShard.Replicas, instance)
}
}

instances := actualShard.Instances()
c.instances = instances
clusterConfig.SetShardInstances(i, instances)
}

return nil
}

func (c *TestConnectionChecker) ObservedInstances() []activerecord.ShardInstance {
return c.instances
}

type TestConfig struct {
cfg sync.Map
created time.Time
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/ebirukov/tnt-containers v1.0.2
github.com/mailru/activerecord v1.11.0-b3
github.com/mailru/activerecord v1.11.0
github.com/stretchr/testify v1.8.4
golang.org/x/sync v0.3.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mailru/activerecord v1.11.0-b3 h1:mJsp9cGhm+nhKrU4UDs7++kW/uZ1FZDAoJBODCqvEuo=
github.com/mailru/activerecord v1.11.0-b3/go.mod h1:Dg3GWaG8VauQujFrWsjlXX6I093sm3xq3945/aur+OM=
github.com/mailru/activerecord v1.11.0 h1:OwxTHyMmrNuSe/7syqYcfKkkEoBapBvrylWZPuL3d/g=
github.com/mailru/activerecord v1.11.0/go.mod h1:Dg3GWaG8VauQujFrWsjlXX6I093sm3xq3945/aur+OM=
github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45 h1:x3Zw96Gt6HbEPUWsTbQYj/nfaNv5lWHy6CeEkl8gwqw=
github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45/go.mod h1:guLmlFj8yjd0hoz+QWxRU4Gn+VOb2nOQZ4EqRmMHarw=
github.com/moby/patternmatcher v0.5.0 h1:YCZgJOeULcxLw1Q+sVR636pmS7sPEn1Qo2iAN6M7DBo=
Expand Down

0 comments on commit 6b8f104

Please sign in to comment.