Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

Commit

Permalink
feature/provider: etcd v2 provider refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
Maksim Konovalov committed Sep 2, 2024
1 parent 69ca95e commit e91a8f7
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 22 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
*.out

# Tmp
*.tmp
*.tmp
tests/tnt/tmp
1 change: 1 addition & 0 deletions providers/etcd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Tarantool ETCD topology provider based on moonlibs/config
66 changes: 45 additions & 21 deletions providers/etcd/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,40 @@ func NewProvider(cfg Config) *Provider {
}
}

func (p *Provider) Init(c vshardrouter.TopologyController) error {
resp, err := p.kapi.Get(context.TODO(), p.path, &client.GetOptions{Recursive: true})
if err != nil {
return err
// mapCluster2Instances combines clusters with instances in map
func mapCluster2Instances(replicasets []vshardrouter.ReplicasetInfo,
instances map[string][]*vshardrouter.InstanceInfo) map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo {

currentTopology := map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{}

for _, replicasetInfo := range replicasets {
var resInst []vshardrouter.InstanceInfo

for _, inst := range instances[replicasetInfo.Name] {
resInst = append(resInst, *inst)
}

currentTopology[replicasetInfo] = resInst
}

if resp.Node.Nodes.Len() < 2 {
return fmt.Errorf("etcd path %s subnodes <2; minimum 2 (/clusters & /instances)", p.path)
return currentTopology
}

func (p *Provider) GetTopology(nodes client.Nodes) (map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo, error) {
if nodes.Len() < 2 {
return nil, fmt.Errorf("etcd path %s subnodes <2; minimum 2 (/clusters & /instances)", p.path)
}

replicasets := []vshardrouter.ReplicasetInfo{}
var replicasets []vshardrouter.ReplicasetInfo
instances := map[string][]*vshardrouter.InstanceInfo{} // cluster name to instance info

for _, node := range resp.Node.Nodes {
for _, node := range nodes {
var err error

switch filepath.Base(node.Key) {
case "clusters":
if len(node.Nodes) < 1 {
return fmt.Errorf("etcd path %s has no clusters", node.Key)
return nil, fmt.Errorf("etcd path %s has no clusters", node.Key)
}

for _, rsNode := range node.Nodes {
Expand All @@ -71,7 +87,7 @@ func (p *Provider) Init(c vshardrouter.TopologyController) error {
case "replicaset_uuid":
replicaset.UUID, err = uuid.Parse(rsInfoNode.Value)
if err != nil {
return fmt.Errorf("cant parse replicaset %s uuid %s", replicaset.Name, rsInfoNode.Value)
return nil, fmt.Errorf("cant parse replicaset %s uuid %s", replicaset.Name, rsInfoNode.Value)
}
case "master":
// TODO: now we dont support non master auto implementation
Expand All @@ -84,11 +100,15 @@ func (p *Provider) Init(c vshardrouter.TopologyController) error {
}
case "instances":
if len(node.Nodes) < 1 {
return fmt.Errorf("etcd path %s has no instances", node.Key)
return nil, fmt.Errorf("etcd path %s has no instances", node.Key)
}

for _, instanceNode := range node.Nodes {
instance := &vshardrouter.InstanceInfo{}
instanceName := filepath.Base(instanceNode.Key)

instance := &vshardrouter.InstanceInfo{
Name: instanceName,
}

for _, instanceInfoNode := range instanceNode.Nodes {
switch filepath.Base(instanceInfoNode.Key) {
Expand All @@ -102,7 +122,7 @@ func (p *Provider) Init(c vshardrouter.TopologyController) error {
case "instance_uuid":
instance.UUID, err = uuid.Parse(boxNode.Value)
if err != nil {
return fmt.Errorf("cant parse for instance uuid %s", boxNode.Value)
return nil, fmt.Errorf("cant parse for instance uuid %s", boxNode.Value)
}
}
}
Expand All @@ -114,19 +134,23 @@ func (p *Provider) Init(c vshardrouter.TopologyController) error {
}
}

currentTopology := map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{}
currentTopology := mapCluster2Instances(replicasets, instances)

for _, replicasetInfo := range replicasets {
var resInst []vshardrouter.InstanceInfo
return currentTopology, nil
}

for _, inst := range instances[replicasetInfo.Name] {
resInst = append(resInst, *inst)
}
func (p *Provider) Init(c vshardrouter.TopologyController) error {
resp, err := p.kapi.Get(context.TODO(), p.path, &client.GetOptions{Recursive: true})
if err != nil {
return err
}

currentTopology[replicasetInfo] = resInst
topology, err := p.GetTopology(resp.Node.Nodes)
if err != nil {
return err
}

return c.AddReplicasets(context.TODO(), currentTopology)
return c.AddReplicasets(context.TODO(), topology)
}

// Close must close connection, but etcd v2 client has no interfaces for this
Expand Down
4 changes: 4 additions & 0 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type BucketStatInfo struct {
}

type InstanceInfo struct {
// Name is human-readable id for instance
// Starting with tarantool 3.0, the definition is made into a human-readable name,
// so far it is not used directly inside the library
Name string
Addr string
UUID uuid.UUID
}
Expand Down

0 comments on commit e91a8f7

Please sign in to comment.