Skip to content
This repository has been archived by the owner on Dec 23, 2024. It is now read-only.

feature/provider: etcd v2 provider refactored #55

Merged
merged 2 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
*.out

# Tmp
*.tmp
*.tmp
tests/tnt/tmp
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Unreleased
## 0.0.12

BUG FIXES:

Expand Down
1 change: 1 addition & 0 deletions providers/etcd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Tarantool ETCD topology provider based on moonlibs/config
66 changes: 45 additions & 21 deletions providers/etcd/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,40 @@ func NewProvider(cfg Config) *Provider {
}
}

func (p *Provider) Init(c vshardrouter.TopologyController) error {
resp, err := p.kapi.Get(context.TODO(), p.path, &client.GetOptions{Recursive: true})
if err != nil {
return err
// mapCluster2Instances combines clusters with instances in map
func mapCluster2Instances(replicasets []vshardrouter.ReplicasetInfo,
instances map[string][]*vshardrouter.InstanceInfo) map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo {

currentTopology := map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{}

for _, replicasetInfo := range replicasets {
var resInst []vshardrouter.InstanceInfo

for _, inst := range instances[replicasetInfo.Name] {
resInst = append(resInst, *inst)
}

currentTopology[replicasetInfo] = resInst
}

if resp.Node.Nodes.Len() < 2 {
return fmt.Errorf("etcd path %s subnodes <2; minimum 2 (/clusters & /instances)", p.path)
return currentTopology
}

func (p *Provider) GetTopology(nodes client.Nodes) (map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo, error) {
if nodes.Len() < 2 {
return nil, fmt.Errorf("etcd path %s subnodes <2; minimum 2 (/clusters & /instances)", p.path)
}

replicasets := []vshardrouter.ReplicasetInfo{}
var replicasets []vshardrouter.ReplicasetInfo
instances := map[string][]*vshardrouter.InstanceInfo{} // cluster name to instance info

for _, node := range resp.Node.Nodes {
for _, node := range nodes {
var err error

switch filepath.Base(node.Key) {
case "clusters":
if len(node.Nodes) < 1 {
return fmt.Errorf("etcd path %s has no clusters", node.Key)
return nil, fmt.Errorf("etcd path %s has no clusters", node.Key)
}

for _, rsNode := range node.Nodes {
Expand All @@ -71,7 +87,7 @@ func (p *Provider) Init(c vshardrouter.TopologyController) error {
case "replicaset_uuid":
replicaset.UUID, err = uuid.Parse(rsInfoNode.Value)
if err != nil {
return fmt.Errorf("cant parse replicaset %s uuid %s", replicaset.Name, rsInfoNode.Value)
return nil, fmt.Errorf("cant parse replicaset %s uuid %s", replicaset.Name, rsInfoNode.Value)
}
case "master":
// TODO: now we dont support non master auto implementation
Expand All @@ -84,11 +100,15 @@ func (p *Provider) Init(c vshardrouter.TopologyController) error {
}
case "instances":
if len(node.Nodes) < 1 {
return fmt.Errorf("etcd path %s has no instances", node.Key)
return nil, fmt.Errorf("etcd path %s has no instances", node.Key)
}

for _, instanceNode := range node.Nodes {
instance := &vshardrouter.InstanceInfo{}
instanceName := filepath.Base(instanceNode.Key)

instance := &vshardrouter.InstanceInfo{
Name: instanceName,
}

for _, instanceInfoNode := range instanceNode.Nodes {
switch filepath.Base(instanceInfoNode.Key) {
Expand All @@ -102,7 +122,7 @@ func (p *Provider) Init(c vshardrouter.TopologyController) error {
case "instance_uuid":
instance.UUID, err = uuid.Parse(boxNode.Value)
if err != nil {
return fmt.Errorf("cant parse for instance uuid %s", boxNode.Value)
return nil, fmt.Errorf("cant parse for instance uuid %s", boxNode.Value)
}
}
}
Expand All @@ -114,19 +134,23 @@ func (p *Provider) Init(c vshardrouter.TopologyController) error {
}
}

currentTopology := map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{}
currentTopology := mapCluster2Instances(replicasets, instances)

for _, replicasetInfo := range replicasets {
var resInst []vshardrouter.InstanceInfo
return currentTopology, nil
}

for _, inst := range instances[replicasetInfo.Name] {
resInst = append(resInst, *inst)
}
func (p *Provider) Init(c vshardrouter.TopologyController) error {
resp, err := p.kapi.Get(context.TODO(), p.path, &client.GetOptions{Recursive: true})
if err != nil {
return err
}

currentTopology[replicasetInfo] = resInst
topology, err := p.GetTopology(resp.Node.Nodes)
if err != nil {
return err
}

return c.AddReplicasets(context.TODO(), currentTopology)
return c.AddReplicasets(context.TODO(), topology)
}

// Close must close connection, but etcd v2 client has no interfaces for this
Expand Down
4 changes: 4 additions & 0 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type BucketStatInfo struct {
}

type InstanceInfo struct {
// Name is human-readable id for instance
// Starting with tarantool 3.0, the definition is made into a human-readable name,
// so far it is not used directly inside the library
Name string
Addr string
UUID uuid.UUID
}
Expand Down
Loading