Skip to content

Commit

Permalink
Kill existing connections when changing roles (#587)
Browse files Browse the repository at this point in the history
Signed-off-by: Masayuki Ishii <masa213f@gmail.com>
  • Loading branch information
masa213f authored Nov 1, 2023
1 parent 70e2e4f commit 3bad1f5
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ check-generate:
envtest: setup-envtest
source <($(SETUP_ENVTEST) use -p env); \
export MOCO_CHECK_INTERVAL=100ms; \
export MOCO_WAIT_INTERVAL=100ms; \
export MOCO_CLONE_WAIT_DURATION=100ms; \
go test -v -count 1 -race ./clustering -ginkgo.progress -ginkgo.v -ginkgo.failFast
source <($(SETUP_ENVTEST) use -p env); \
export DEBUG_CONTROLLER=1; \
Expand Down
28 changes: 26 additions & 2 deletions clustering/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,12 @@ var _ = Describe("manager", func() {
}
}

// confirm that connections of the mysql whose role has changed are killed
for i := 0; i < 3; i++ {
switch i {
case 0: // connection in demoted instance should be killed
case 0: // KilleConnection is called twice: when the start of the switchover and the changing of role.
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(2))
case newPrimary:
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(1))
default:
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0))
Expand Down Expand Up @@ -705,7 +708,12 @@ var _ = Describe("manager", func() {
Expect(failOverEvents).To(Equal(1))

for i := 0; i < 3; i++ {
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0))
switch i {
case 1:
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(1))
default:
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0))
}
}

By("recovering failed instance")
Expand Down Expand Up @@ -796,6 +804,8 @@ var _ = Describe("manager", func() {
}).Should(Succeed())

By("making an errant replica")
of.resetKillConnectionsCount()

// When the primary load is high, sometimes the gtid_executed of a replica precedes the primary.
// pod(4) is intended for such situations.
testSetGTID(cluster.PodHostname(0), "p0:1,p0:2,p0:3") // primary
Expand Down Expand Up @@ -846,6 +856,10 @@ var _ = Describe("manager", func() {
Expect(st1.ReplicaStatus.SlaveIORunning).NotTo(Equal("Yes"))
}

for i := 0; i < 5; i++ {
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0))
}

By("triggering a failover")
of.setRetrievedGTIDSet(cluster.PodHostname(2), "p0:1")
of.setRetrievedGTIDSet(cluster.PodHostname(3), "p0:1,p0:2,p0:3")
Expand Down Expand Up @@ -909,6 +923,7 @@ var _ = Describe("manager", func() {
Expect(failOverEvents).To(Equal(1))

By("re-initializing the errant replica")
of.resetKillConnectionsCount()
testSetGTID(cluster.PodHostname(1), "")
Eventually(func() interface{} {
return ms.errantReplicas
Expand Down Expand Up @@ -937,6 +952,15 @@ var _ = Describe("manager", func() {
}
}).Should(Succeed())

for i := 0; i < 5; i++ {
switch i {
case 1:
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(1))
default:
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0))
}
}

By("stopping instances to make the cluster lost")
of.setFailing(cluster.PodHostname(3), true)
of.setFailing(cluster.PodHostname(1), true)
Expand Down
151 changes: 106 additions & 45 deletions clustering/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,33 @@ const (
failOverTimeoutSeconds = 3600
)

var waitForRestartDuration = 3 * time.Second
var (
waitForCloneRestartDuration = 3 * time.Second
waitForRoleChangeDuration = 300 * time.Millisecond
)

func init() {
intervalStr := os.Getenv("MOCO_CLONE_WAIT_DURATION")
if intervalStr == "" {
return
}
interval, err := time.ParseDuration(intervalStr)
if err != nil {
return
}
waitForCloneRestartDuration = interval
}

func init() {
intervalStr := os.Getenv("MOCO_WAIT_INTERVAL")
intervalStr := os.Getenv("MOCO_ROLE_WAIT_DURATION")
if intervalStr == "" {
return
}
interval, err := time.ParseDuration(intervalStr)
if err != nil {
return
}
waitForRestartDuration = interval
waitForRoleChangeDuration = interval
}

func (p *managerProcess) isCloning(ctx context.Context, ss *StatusSet) bool {
Expand Down Expand Up @@ -115,7 +130,7 @@ func (p *managerProcess) clone(ctx context.Context, ss *StatusSet) (bool, error)

// wait until the instance restarts after clone
op := ss.DBOps[ss.Primary]
time.Sleep(waitForRestartDuration)
time.Sleep(waitForCloneRestartDuration)
for i := 0; i < 60; i++ {
select {
case <-time.After(1 * time.Second):
Expand Down Expand Up @@ -249,9 +264,90 @@ func (p *managerProcess) failover(ctx context.Context, ss *StatusSet) error {
return nil
}

func (p *managerProcess) removeRoleLabel(ctx context.Context, ss *StatusSet) ([]int, error) {
var noRoles []int
for i, pod := range ss.Pods {
v := pod.Labels[constants.LabelMocoRole]
if v == "" {
noRoles = append(noRoles, i)
continue
}

if i == ss.Primary && v == constants.RolePrimary {
continue
}
if i != ss.Primary && !isErrantReplica(ss, i) && v == constants.RoleReplica {
continue
}

noRoles = append(noRoles, i)
modified := pod.DeepCopy()
delete(modified.Labels, constants.LabelMocoRole)
if err := p.client.Patch(ctx, modified, client.MergeFrom(pod)); err != nil {
return nil, fmt.Errorf("failed to remove %s label from %s/%s: %w", constants.LabelMocoRole, pod.Namespace, pod.Name, err)
}
}
return noRoles, nil
}

func (p *managerProcess) addRoleLabel(ctx context.Context, ss *StatusSet, noRoles []int) error {
for _, i := range noRoles {
if isErrantReplica(ss, i) {
continue
}

var newValue string
if i == ss.Primary {
newValue = constants.RolePrimary
} else {
newValue = constants.RoleReplica
}

pod := ss.Pods[i]
modified := pod.DeepCopy()
if modified.Labels == nil {
modified.Labels = make(map[string]string)
}
modified.Labels[constants.LabelMocoRole] = newValue
if err := p.client.Patch(ctx, modified, client.MergeFrom(pod)); err != nil {
return fmt.Errorf("failed to add %s label to pod %s/%s: %w", constants.LabelMocoRole, pod.Namespace, pod.Name, err)
}
}
return nil
}

func (p *managerProcess) configure(ctx context.Context, ss *StatusSet) (bool, error) {
redo := false

// remove old role label from mysql pods whose role is changed
// NOTE:
// I want to redo if even one pod is updated to refresh pod resources in StatusSet.
// But if some mysql instances are down, there is a wait of about 9 seconds at "(*managerProdess).GatherStatus()" after redo.
// The wait slows the recovery process, and downtime becomes longer. To prevent that, continue processing without redoing.
noRoles, err := p.removeRoleLabel(ctx, ss)
if err != nil {
return false, err
}

// if the role of alive instances is changed, kill the connections on those instances
var alive []int
for _, i := range noRoles {
if ss.MySQLStatus[i] == nil || isErrantReplica(ss, i) {
continue
}
alive = append(alive, i)
}
if len(alive) > 0 {
// I hope the backend pods of primary and replica services will be updated during this sleep.
time.Sleep(waitForRoleChangeDuration)
}
for _, i := range alive {
if err := ss.DBOps[i].KillConnections(ctx); err != nil {
return false, fmt.Errorf("failed to kill connections in instance %d: %w", i, err)
}
}

// configure primary instance
if ss.Cluster.Spec.ReplicationSourceSecretName != nil {
r, err := p.configureIntermediatePrimary(ctx, ss)
if err != nil {
Expand All @@ -266,6 +362,7 @@ func (p *managerProcess) configure(ctx context.Context, ss *StatusSet) (bool, er
redo = redo || r
}

// configure replica instances
for i, ist := range ss.MySQLStatus {
if i == ss.Primary {
continue
Expand All @@ -280,46 +377,10 @@ func (p *managerProcess) configure(ctx context.Context, ss *StatusSet) (bool, er
redo = redo || r
}

// update labels
for i, pod := range ss.Pods {
if i == ss.Primary {
if pod.Labels[constants.LabelMocoRole] != constants.RolePrimary {
redo = true
modified := pod.DeepCopy()
if modified.Labels == nil {
modified.Labels = make(map[string]string)
}
modified.Labels[constants.LabelMocoRole] = constants.RolePrimary
if err := p.client.Patch(ctx, modified, client.MergeFrom(pod)); err != nil {
return false, fmt.Errorf("failed to set role for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
}
continue
}

if ss.MySQLStatus[i] != nil && ss.MySQLStatus[i].IsErrant {
if _, ok := pod.Labels[constants.LabelMocoRole]; ok {
redo = true
modified := pod.DeepCopy()
delete(modified.Labels, constants.LabelMocoRole)
if err := p.client.Patch(ctx, modified, client.MergeFrom(pod)); err != nil {
return false, fmt.Errorf("failed to set role for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
}
continue
}

if pod.Labels[constants.LabelMocoRole] != constants.RoleReplica {
redo = true
modified := pod.DeepCopy()
if modified.Labels == nil {
modified.Labels = make(map[string]string)
}
modified.Labels[constants.LabelMocoRole] = constants.RoleReplica
if err := p.client.Patch(ctx, modified, client.MergeFrom(pod)); err != nil {
return false, fmt.Errorf("failed to set role for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
}
// add new role label
err = p.addRoleLabel(ctx, ss, noRoles)
if err != nil {
return false, err
}

// make the primary writable if it is not an intermediate primary
Expand Down Expand Up @@ -484,7 +545,7 @@ func (p *managerProcess) configureReplica(ctx context.Context, ss *StatusSet, in
log.Info("clone succeeded", "instance", index)

// wait until the instance restarts after clone
time.Sleep(waitForRestartDuration)
time.Sleep(waitForCloneRestartDuration)
for i := 0; i < 60; i++ {
select {
case <-time.After(1 * time.Second):
Expand Down
12 changes: 10 additions & 2 deletions clustering/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -208,10 +209,10 @@ func (p *managerProcess) GatherStatus(ctx context.Context) (*StatusSet, error) {
}
// process errors
if j == statusCheckRetryMax {
logFromContext(ctx).Error(err, "failed to get mysqld status, mysqld is not ready")
logFromContext(ctx).Error(err, "failed to get mysqld status, mysqld is not ready", "instance", index)
return
}
logFromContext(ctx).Error(err, "failed to get mysqld status, will retry")
logFromContext(ctx).Error(err, "failed to get mysqld status, will retry", "instance", index)
time.Sleep(statusCheckRetryInterval)
}
}(i)
Expand Down Expand Up @@ -299,6 +300,13 @@ func containErrantTransactions(primaryUUID, gtidSet string) bool {
return false
}

func isErrantReplica(ss *StatusSet, index int) bool {
if ss.MySQLStatus[index] != nil && ss.MySQLStatus[index].IsErrant {
return true
}
return slices.Contains(ss.Errants, index)
}

func isPodReady(pod *corev1.Pod) bool {
for _, cond := range pod.Status.Conditions {
if cond.Type != corev1.PodReady {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,12 @@ require (
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -417,8 +417,8 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc=
golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down

0 comments on commit 3bad1f5

Please sign in to comment.