Skip to content

Commit 6951ea5

Browse files
committed
add aerolab conf sc and aerolab aerospike is-stable commands
1 parent 6d79b83 commit 6951ea5

File tree

8 files changed

+334
-5
lines changed

8 files changed

+334
-5
lines changed

CHANGELOG/7.7.1.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@ _Release Date: December 16th, 2024_
1111
* Fix: The `roster apply` command would not handle problematic nodes.
1212
* AGI: Add `device` filter in namespace view.
1313
* AWS: Fix handling of firewalls with `--aws-nopublic-ip` set.
14+
* Add `aerolab conf sc` option which wraps around adding strong consistency, roster and adjusting replication factor.
15+
* Add `aerolab aerospike is-stable` option to test if aerospike is stable and running on all cluster nodes.

src/cmdAerospike.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package main
22

3-
import "os"
3+
import (
4+
"os"
5+
)
46

57
type aerospikeCmd struct {
68
Start aerospikeStartCmd `command:"start" subcommands-optional:"true" description:"Start aerospike" webicon:"fas fa-play"`
@@ -9,6 +11,7 @@ type aerospikeCmd struct {
911
Status aerospikeStatusCmd `command:"status" subcommands-optional:"true" description:"Aerospike daemon status" webicon:"fas fa-circle-question"`
1012
Upgrade aerospikeUpgradeCmd `command:"upgrade" subcommands-optional:"true" description:"Upgrade aerospike daemon" webicon:"fas fa-circle-arrow-up"`
1113
ColdStart aerospikeColdStartCmd `command:"cold-start" subcommands-optional:"true" description:"Cold-Start aerospike" webicon:"fas fa-play"`
14+
IsStable aerospikeIsStableCmd `command:"is-stable" subcommands-optional:"true" description:"Check if, and optionally wait until, cluster is stable" webicon:"fas fa-circle-question"`
1215
Help helpCmd `command:"help" subcommands-optional:"true" description:"Print help"`
1316
}
1417

src/cmdAerospikeIsSrable.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package main
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"time"
9+
10+
"github.com/aerospike/aerolab/parallelize"
11+
)
12+
13+
type aerospikeIsStableCmd struct {
14+
ClusterName TypeClusterName `short:"n" long:"name" description:"Cluster names, comma separated OR 'all' to affect all clusters" default:"mydc"`
15+
Namespace string `short:"m" long:"namespace" description:"Namespace to change" default:"test"`
16+
Wait bool `short:"w" long:"wait" description:"If set, will wait in a loop until the cluster is stable, and then return"`
17+
WaitTimeout int `short:"o" long:"wait-timeout" description:"If set, will timeout if the cluster doesn't become stable by this many seconds"`
18+
IgnoreMigrations bool `short:"i" long:"ignore-migrations" description:"If set, will ignore migrations when checking if cluster is stable"`
19+
IgnoreClusterKey bool `short:"k" long:"ignore-cluster-key" description:"If set, will not check if the cluster key matches on all nodes in the cluster"`
20+
parallelThreadsCmd
21+
}
22+
23+
func (c *aerospikeIsStableCmd) Execute(args []string) error {
24+
if earlyProcessV2(nil, true) {
25+
return nil
26+
}
27+
startTime := time.Now()
28+
log.Println("Running aerospike.is-stable")
29+
// get node count
30+
log.Println("aerospike.is-stable: Getting cluster size")
31+
nodes, err := b.NodeListInCluster(c.ClusterName.String())
32+
if err != nil {
33+
return err
34+
}
35+
if len(nodes) == 0 {
36+
return errors.New("cluster does not exists")
37+
}
38+
// scripts
39+
waitScript := fmt.Sprintf(`timeout=%d
40+
start_time=$(date +%%s)
41+
while (( timeout == 0 || $(date +%%s) - start_time < timeout )); do
42+
RET=$(asinfo -v 'cluster-stable:size=%d;ignore-migrations=%t;namespace=%s' 2>&1)
43+
if [ $? -eq 0 ]; then
44+
echo ${RET}
45+
exit 0
46+
fi
47+
sleep 1
48+
done
49+
echo "${RET}"
50+
exit 1
51+
`, c.WaitTimeout, len(nodes), c.IgnoreMigrations, c.Namespace)
52+
noWaitCmd := []string{"asinfo", "-v", fmt.Sprintf("cluster-stable:size=%d;ignore-migrations=%t;namespace=%s", len(nodes), c.IgnoreMigrations, c.Namespace)}
53+
54+
firstLoop := true
55+
keysLock := new(sync.Mutex)
56+
for c.WaitTimeout == 0 || time.Since(startTime) < time.Duration(c.WaitTimeout)*time.Second {
57+
log.Println("aerospike.is-stable: Getting cluster keys")
58+
clusterKeys := []string{} // lets reset
59+
// get all cluster keys
60+
returns := parallelize.MapLimit(nodes, c.ParallelThreads, func(node int) error {
61+
var cmd []string
62+
if c.Wait {
63+
if firstLoop {
64+
// upload wait script
65+
err = b.CopyFilesToCluster(string(c.ClusterName), []fileList{{filePath: "/opt/is-stable.sh", fileContents: waitScript, fileSize: len(waitScript)}}, []int{node})
66+
if err != nil {
67+
return err
68+
}
69+
}
70+
cmd = []string{"/bin/bash", "/opt/is-stable.sh"}
71+
} else {
72+
cmd = noWaitCmd
73+
}
74+
// run cmd; if error, ret the error;; if success, capture cluster key in clusterKeys (lock with keysLock)
75+
out, err := b.RunCommands(c.ClusterName.String(), [][]string{cmd}, []int{node})
76+
if len(out) == 0 {
77+
out = [][]byte{{'-'}}
78+
}
79+
if err != nil {
80+
return fmt.Errorf("%s: %s", err, string(out[0]))
81+
}
82+
keysLock.Lock()
83+
clusterKeys = append(clusterKeys, string(out[0]))
84+
keysLock.Unlock()
85+
return nil
86+
})
87+
isError := false
88+
for i, ret := range returns {
89+
if ret != nil {
90+
log.Printf("Node %d returned %s", nodes[i], ret)
91+
isError = true
92+
}
93+
}
94+
if isError {
95+
return errors.New("some nodes returned errors")
96+
}
97+
firstLoop = false
98+
99+
same := true
100+
101+
if !c.IgnoreClusterKey {
102+
for _, k := range clusterKeys {
103+
if clusterKeys[0] != k {
104+
same = false
105+
break
106+
}
107+
}
108+
}
109+
110+
if same {
111+
log.Print("Cluster Stable")
112+
return nil
113+
}
114+
115+
if !c.Wait {
116+
return errors.New("cluster not stable")
117+
}
118+
time.Sleep(time.Second)
119+
}
120+
return errors.New("timeout reached, cluster unstable")
121+
}

src/cmdAerospikeStartStopRestart.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ func (c *aerospikeStartCmd) run(args []string, command string, stdout *os.File)
5353

5454
if c.ParallelThreads == 1 || len(nodes) == 1 {
5555
var out [][]byte
56-
if command == "start" {
56+
if command == "cold-start" {
57+
var commands [][]string
58+
commands = append(commands, []string{"ipcrm", "--all"})
59+
commands = append(commands, []string{"service", "aerospike", "start"})
60+
out, err = b.RunCommands(string(c.ClusterName), commands, nodes)
61+
} else if command == "start" {
5762
var commands [][]string
5863
commands = append(commands, []string{"service", "aerospike", "start"})
5964
out, err = b.RunCommands(string(c.ClusterName), commands, nodes)

src/cmdConf.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
type confCmd struct {
88
Generator confGeneratorCmd `command:"generate" subcommands-optional:"true" description:"Generate or modify Aerospike configuration files" webicon:"fas fa-gears" webhidden:"true"`
9+
SC confSCCmd `command:"sc" subcommands-optional:"true" description:"Configure the cluster to use strong-consistency, with roster and optional RF changes" webicon:"fas fa-gears"`
910
FixMesh confFixMeshCmd `command:"fix-mesh" subcommands-optional:"true" description:"Fix mesh configuration in the cluster" webicon:"fas fa-screwdriver"`
1011
RackID confRackIdCmd `command:"rackid" subcommands-optional:"true" description:"Change/add rack-id to namespaces in the existing cluster nodes" webicon:"fas fa-id-badge"`
1112
NamespaceMemory confNamespaceMemoryCmd `command:"namespace-memory" subcommands-optional:"true" description:"Adjust memory for a namespace using total percentages" webicon:"fas fa-sd-card"`

src/cmdConfAdjust.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ func (c *confAdjustCmd) Execute(args []string) error {
2727
if earlyProcessV2(nil, true) {
2828
return nil
2929
}
30+
if c.Command == "get" {
31+
c.Values = []string{}
32+
}
3033
if len(args) < 1 {
3134
if c.Command != "" || c.Key != "" || len(c.Values) > 0 {
3235
args = append([]string{c.Command, c.Key}, c.Values...)
@@ -126,6 +129,7 @@ func (c *confAdjustCmd) Execute(args []string) error {
126129
valstring = valstring + *vv
127130
}
128131
fmt.Printf("%s%s %s\n", prefix, i, valstring)
132+
c.Values = append(c.Values, valstring)
129133
return nil
130134
} else {
131135
sa = sa.Stanza(i)

src/cmdConfSC.go

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"os"
9+
"strconv"
10+
11+
"github.com/aerospike/aerolab/parallelize"
12+
aeroconf "github.com/rglonek/aerospike-config-file-parser"
13+
)
14+
15+
type confSCCmd struct {
16+
ClusterName TypeClusterName `short:"n" long:"name" description:"Cluster name" default:"mydc"`
17+
Namespace string `short:"m" long:"namespace" description:"Namespace to change" default:"test"`
18+
Path string `short:"p" long:"path" description:"Path to aerospike.conf" default:"/etc/aerospike/aerospike.conf"`
19+
Force bool `short:"f" long:"force" description:"If set, will zero out the devices even if strong-consistency was already configured"`
20+
parallelThreadsCmd
21+
}
22+
23+
func (c *confSCCmd) Execute(args []string) error {
24+
if earlyProcessV2(nil, true) {
25+
return nil
26+
}
27+
log.Println("Running conf.sc")
28+
// stop aerospike
29+
log.Println("conf.sc: Stopping aerospike")
30+
a.opts.Aerospike.Stop.ClusterName = c.ClusterName
31+
a.opts.Aerospike.Stop.ParallelThreads = c.ParallelThreads
32+
err := a.opts.Aerospike.Stop.run(nil, "stop", os.Stdout)
33+
if err != nil {
34+
return err
35+
}
36+
// get node count
37+
log.Println("conf.sc: Getting cluster size")
38+
nodes, err := b.NodeListInCluster(c.ClusterName.String())
39+
if err != nil {
40+
return err
41+
}
42+
// patch aerospike.conf
43+
log.Println("conf.sc: Patching aerospike.conf")
44+
returns := parallelize.MapLimit(nodes, c.ParallelThreads, func(node int) error {
45+
// read config file
46+
out, err := b.RunCommands(c.ClusterName.String(), [][]string{{"cat", c.Path}}, []int{node})
47+
if err != nil {
48+
nout := ""
49+
for _, n := range out {
50+
nout = nout + "\n" + string(n)
51+
}
52+
return fmt.Errorf("error on cluster %s: %s: %s", c.ClusterName, nout, err)
53+
}
54+
fileContents := bytes.NewReader(out[0])
55+
// edit actual file contents
56+
s, err := aeroconf.Parse(fileContents)
57+
if err != nil {
58+
return err
59+
}
60+
if s.Type("namespace "+c.Namespace) != aeroconf.ValueStanza {
61+
return errors.New("namespace not found")
62+
}
63+
changes := false
64+
x := s.Stanza("namespace " + c.Namespace)
65+
// check RF
66+
if x.Type("replication-factor") == aeroconf.ValueString {
67+
vals, err := x.GetValues("replication-factor")
68+
if err != nil {
69+
return err
70+
}
71+
if len(vals) != 1 {
72+
return errors.New("replication-factor parameter error")
73+
}
74+
rf, err := strconv.Atoi(*vals[0])
75+
if err != nil {
76+
return errors.New("replication-factor parameter invalid value found")
77+
}
78+
if rf > len(nodes) {
79+
x.SetValue("replication-factor", strconv.Itoa(len(nodes)))
80+
changes = true
81+
}
82+
} else if len(nodes) == 1 {
83+
x.SetValue("replication-factor", "1")
84+
changes = true
85+
}
86+
// get SC
87+
rmFiles := false
88+
if x.Type("strong-consistency") != aeroconf.ValueString {
89+
x.SetValue("strong-consistency", "true")
90+
changes = true
91+
rmFiles = true
92+
} else {
93+
vals, err := x.GetValues("strong-consistency")
94+
if err != nil {
95+
return err
96+
}
97+
if len(vals) != 1 {
98+
return errors.New("strong-consistency parameter error")
99+
}
100+
if *vals[0] != "true" {
101+
x.SetValue("strong-consistency", "true")
102+
changes = true
103+
rmFiles = true
104+
}
105+
}
106+
// remove storage files
107+
if rmFiles || c.Force {
108+
if x.Type("storage-engine device") == aeroconf.ValueStanza {
109+
x = x.Stanza("storage-engine device")
110+
if x.Type("file") == aeroconf.ValueString {
111+
files, err := x.GetValues("file")
112+
if err != nil {
113+
return err
114+
}
115+
cmd := []string{"rm", "-f"}
116+
for _, file := range files {
117+
cmd = append(cmd, *file)
118+
}
119+
data, err := b.RunCommands(string(c.ClusterName), [][]string{cmd}, []int{node})
120+
if len(data) == 0 {
121+
data = [][]byte{{'-'}}
122+
}
123+
if err != nil {
124+
return fmt.Errorf("%s: %s", err, string(data[0]))
125+
}
126+
}
127+
}
128+
}
129+
// store changes back
130+
if changes {
131+
var buf bytes.Buffer
132+
err = s.Write(&buf, "", " ", true)
133+
if err != nil {
134+
return err
135+
}
136+
contents := buf.Bytes()
137+
fileContents = bytes.NewReader(contents)
138+
// edit end
139+
err = b.CopyFilesToClusterReader(c.ClusterName.String(), []fileListReader{{filePath: c.Path, fileContents: fileContents, fileSize: len(contents)}}, []int{node})
140+
if err != nil {
141+
return err
142+
}
143+
}
144+
return nil
145+
})
146+
isError := false
147+
for i, ret := range returns {
148+
if ret != nil {
149+
log.Printf("Node %d returned %s", nodes[i], ret)
150+
isError = true
151+
}
152+
}
153+
if isError {
154+
return errors.New("some nodes returned errors")
155+
}
156+
// restart aerospike
157+
log.Println("conf.sc: Cold-Starting aerospike")
158+
a.opts.Aerospike.ColdStart.ClusterName = c.ClusterName
159+
a.opts.Aerospike.ColdStart.ParallelThreads = c.ParallelThreads
160+
err = a.opts.Aerospike.ColdStart.run(nil, "cold-start", os.Stdout)
161+
if err != nil {
162+
return err
163+
}
164+
// wait for cluster to be stable
165+
log.Println("conf.sc: Waiting for cluster to be stable")
166+
a.opts.Aerospike.IsStable.ClusterName = c.ClusterName
167+
a.opts.Aerospike.IsStable.ParallelThreads = c.ParallelThreads
168+
a.opts.Aerospike.IsStable.Wait = true
169+
a.opts.Aerospike.IsStable.IgnoreMigrations = true
170+
a.opts.Aerospike.IsStable.Namespace = c.Namespace
171+
err = a.opts.Aerospike.IsStable.Execute(nil)
172+
if err != nil {
173+
return err
174+
}
175+
// apply roster
176+
log.Println("conf.sc: Applying roster")
177+
a.opts.Roster.Apply.ClusterName = c.ClusterName
178+
a.opts.Roster.Apply.Namespace = c.Namespace
179+
a.opts.Roster.Apply.ParallelThreads = c.ParallelThreads
180+
a.opts.Roster.Apply.Quiet = true
181+
err = a.opts.Roster.Apply.Execute(nil)
182+
if err != nil {
183+
return err
184+
}
185+
log.Println("conf.sc: Done")
186+
return nil
187+
}

0 commit comments

Comments
 (0)