Skip to content

Commit

Permalink
Merge pull request #561 from cybozu-go/fix-failover-fail
Browse files Browse the repository at this point in the history
Kill old connections when demoting primary
  • Loading branch information
ymmt2005 authored Oct 23, 2023
2 parents ccea762 + 5dfcc4e commit b61bb8a
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 21 deletions.
189 changes: 173 additions & 16 deletions clustering/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ var _ = Describe("manager", func() {
Expect(otherEvents).To(Equal(0))

By("scaling out the cluster from 1 to 3 instances")
of.resetKillConnectionsCount()
cluster.Spec.Replicas = 3
err = k8sClient.Update(ctx, cluster)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -441,7 +442,10 @@ var _ = Describe("manager", func() {
Expect(st.ReplicaStatus.SlaveIORunning).To(Equal("Yes"))
}

Expect(of.getKillConnectionsCount(cluster.PodHostname(0))).To(Equal(0)) // connection should not be killed

By("doing a switchover")
of.resetKillConnectionsCount()
// advance the executed GTID set on the source and the primary
testSetGTID("external", "ex:1,ex:2,ex:3,ex:4,ex:5")
testSetGTID(cluster.PodHostname(0), "ex:1,ex:2,ex:3,ex:4,ex:5")
Expand Down Expand Up @@ -536,6 +540,15 @@ var _ = Describe("manager", func() {
}
}

for i := 0; i < 3; i++ {
switch i {
case 0: // connection in demoted instance should be killed
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(1))
default:
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0))
}
}

By("stopping replication from external mysqld")
// advance the source GTID beforehand
testSetGTID("external", "ex:1,ex:2,ex:3,ex:4,ex:5,ex:6")
Expand Down Expand Up @@ -583,7 +596,162 @@ var _ = Describe("manager", func() {
}
})

It("should handle failover and errant replicas", func() {
It("should handle failover", func() {
testSetupResources(ctx, 3, "")

cm := NewClusterManager(1*time.Second, mgr, of, af, stdr.New(nil))
defer cm.StopAll()

cluster, err := testGetCluster(ctx)
Expect(err).NotTo(HaveOccurred())
cm.Update(client.ObjectKeyFromObject(cluster), "test")
defer func() {
cm.Stop(client.ObjectKeyFromObject(cluster))
time.Sleep(400 * time.Millisecond)
Eventually(func(g Gomega) {
ch := make(chan prometheus.Metric, 2)
metrics.ErrantReplicasVec.Collect(ch)
g.Expect(ch).NotTo(Receive())
}).Should(Succeed())
}()

// wait for cluster's condition changes
Eventually(func(g Gomega) {
cluster, err = testGetCluster(ctx)
g.Expect(err).NotTo(HaveOccurred())

condHealthy, err := testGetCondition(cluster, mocov1beta2.ConditionHealthy)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue))
}).Should(Succeed())

// wait for the pods' metadata are updated
Eventually(func(g Gomega) {
for i := 0; i < 3; i++ {
pod := &corev1.Pod{}
err = k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: cluster.PodName(i)}, pod)
g.Expect(err).NotTo(HaveOccurred())
switch i {
case 0:
g.Expect(pod.Labels[constants.LabelMocoRole]).To(Equal(constants.RolePrimary))
default:
g.Expect(pod.Labels[constants.LabelMocoRole]).To(Equal(constants.RoleReplica))
}
}
}).Should(Succeed())

By("triggering a failover")
of.resetKillConnectionsCount()
testSetGTID(cluster.PodHostname(0), "p0:1,p0:2,p0:3") // primary
testSetGTID(cluster.PodHostname(1), "p0:1") // new primary
testSetGTID(cluster.PodHostname(2), "p0:1,p0:2,p0:3")
of.setRetrievedGTIDSet(cluster.PodHostname(1), "p0:1,p0:2,p0:3")
of.setRetrievedGTIDSet(cluster.PodHostname(2), "p0:1,p0:2,p0:3")
of.setFailing(cluster.PodHostname(0), true)

// wait for the new primary to be selected
Eventually(func(g Gomega) {
cluster, err = testGetCluster(ctx)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(cluster.Status.CurrentPrimaryIndex).To(Equal(1), "the primary is not switched yet")

condAvailable, err := testGetCondition(cluster, mocov1beta2.ConditionAvailable)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(condAvailable.Status).To(Equal(metav1.ConditionTrue))
condHealthy, err := testGetCondition(cluster, mocov1beta2.ConditionHealthy)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(condHealthy.Status).To(Equal(metav1.ConditionFalse))
}).Should(Succeed())

// wait for the pods' metadata are updated
Eventually(func(g Gomega) {
for i := 0; i < 3; i++ {
pod := &corev1.Pod{}
err = k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: cluster.PodName(i)}, pod)
g.Expect(err).NotTo(HaveOccurred())
switch i {
case 1:
g.Expect(pod.Labels[constants.LabelMocoRole]).To(Equal(constants.RolePrimary))
default:
g.Expect(pod.Labels[constants.LabelMocoRole]).To(Equal(constants.RoleReplica))
}
}
}).Should(Succeed())

st1 := of.getInstanceStatus(cluster.PodHostname(1))
Expect(st1.GlobalVariables.ExecutedGTID).To(Equal("p0:1,p0:2,p0:3")) // confirm that MOCO waited fot the retrieved GTID set to be executed
Expect(st1.GlobalVariables.ReadOnly).To(BeFalse())

Expect(cluster.Status.ErrantReplicas).To(Equal(0))
Expect(cluster.Status.ErrantReplicaList).To(BeEmpty())

Expect(ms.available).To(MetricsIs("==", 1))
Expect(ms.healthy).To(MetricsIs("==", 0))
Expect(ms.replicas).To(MetricsIs("==", 3))
Expect(ms.errantReplicas).To(MetricsIs("==", 0))
Expect(ms.switchoverCount).To(MetricsIs("==", 0))
Expect(ms.failoverCount).To(MetricsIs("==", 1))

events := &corev1.EventList{}
err = k8sClient.List(ctx, events, client.InNamespace("test"))
Expect(err).NotTo(HaveOccurred())
var failOverEvents int
for _, ev := range events.Items {
switch ev.Reason {
case event.FailOverSucceeded.Reason:
failOverEvents++
}
}
Expect(failOverEvents).To(Equal(1))

for i := 0; i < 3; i++ {
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0))
}

By("recovering failed instance")
of.resetKillConnectionsCount()
of.setFailing(cluster.PodHostname(0), false)

// wait for cluster's condition changes
Eventually(func(g Gomega) {
cluster, err = testGetCluster(ctx)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(cluster.Status.CurrentPrimaryIndex).To(Equal(1), "the primary should not be switched")

condAvailable, err := testGetCondition(cluster, mocov1beta2.ConditionAvailable)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(condAvailable.Status).To(Equal(metav1.ConditionTrue))
condHealthy, err := testGetCondition(cluster, mocov1beta2.ConditionHealthy)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue))
}).Should(Succeed())

// confirm the pods' metadata are not updated
Consistently(func(g Gomega) {
for i := 0; i < 3; i++ {
pod := &corev1.Pod{}
err = k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: cluster.PodName(i)}, pod)
g.Expect(err).NotTo(HaveOccurred())
switch i {
case 1:
g.Expect(pod.Labels[constants.LabelMocoRole]).To(Equal(constants.RolePrimary))
default:
g.Expect(pod.Labels[constants.LabelMocoRole]).To(Equal(constants.RoleReplica))
}
}
}).Should(Succeed())

for i := 0; i < 3; i++ {
switch i {
case 0:
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(1))
default:
Expect(of.getKillConnectionsCount(cluster.PodHostname(i))).To(Equal(0))
}
}
})

It("should handle errant replicas and lost", func() {
testSetupResources(ctx, 5, "")

cm := NewClusterManager(1*time.Second, mgr, of, af, stdr.New(nil))
Expand Down Expand Up @@ -634,7 +802,7 @@ var _ = Describe("manager", func() {
testSetGTID(cluster.PodHostname(1), "p0:1,p0:2,p1:1") // errant replica
testSetGTID(cluster.PodHostname(2), "p0:1")
testSetGTID(cluster.PodHostname(3), "p0:1,p0:2,p0:3")
testSetGTID(cluster.PodHostname(4), "p0:1,p0:2,p0:3,p0:4,p0:5")
testSetGTID(cluster.PodHostname(4), "p0:1,p0:2,p0:3")

// wait for the errant replica is detected
Eventually(func(g Gomega) {
Expand Down Expand Up @@ -680,26 +848,15 @@ var _ = Describe("manager", func() {

By("triggering a failover")
of.setRetrievedGTIDSet(cluster.PodHostname(2), "p0:1")
of.setRetrievedGTIDSet(cluster.PodHostname(3), "p0:1,p0:2,p0:3,p0:4,p0:5")
of.setRetrievedGTIDSet(cluster.PodHostname(4), "p0:1,p0:2,p0:3,p0:4,p0:5")
of.setRetrievedGTIDSet(cluster.PodHostname(3), "p0:1,p0:2,p0:3")
of.setRetrievedGTIDSet(cluster.PodHostname(4), "p0:1,p0:2,p0:3")
of.setFailing(cluster.PodHostname(0), true)

// wait for the new primary to be selected
Eventually(func(g Gomega) {
cluster, err = testGetCluster(ctx)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(cluster.Status.CurrentPrimaryIndex).To(Equal(3), "the primary is not switched yet")
}).Should(Succeed())

// confirm that MOCO waited fot the retrieved GTID set to be executed
st3 := of.getInstanceStatus(cluster.PodHostname(3))
Expect(st3.GlobalVariables.ExecutedGTID).To(Equal("p0:1,p0:2,p0:3,p0:4,p0:5"))

// wait for cluster's condition changes
Eventually(func(g Gomega) {
cluster, err = testGetCluster(ctx)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(cluster.Status.CurrentPrimaryIndex).To(Equal(3))

condAvailable, err := testGetCondition(cluster, mocov1beta2.ConditionAvailable)
g.Expect(err).NotTo(HaveOccurred())
Expand All @@ -726,7 +883,7 @@ var _ = Describe("manager", func() {
}
}).Should(Succeed())

st3 = of.getInstanceStatus(cluster.PodHostname(3))
st3 := of.getInstanceStatus(cluster.PodHostname(3))
Expect(st3.GlobalVariables.ReadOnly).To(BeFalse())

Expect(cluster.Status.ErrantReplicas).To(Equal(1))
Expand Down
30 changes: 25 additions & 5 deletions clustering/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@ func (o *mockOperator) SetReadOnly(ctx context.Context, readonly bool) error {
}

func (o *mockOperator) KillConnections(ctx context.Context) error {
if o.failing {
return errors.New("mysqld is down")
}
o.factory.mu.Lock()
defer o.factory.mu.Unlock()
o.factory.countKillConnections[o.Name()]++
return nil
}

Expand Down Expand Up @@ -444,15 +450,17 @@ func (m *mockMySQL) setRetrievedGTIDSet(gtid string) {
type mockOpFactory struct {
orphaned int64

mu sync.Mutex
mysqls map[string]*mockMySQL
failing map[string]bool
mu sync.Mutex
mysqls map[string]*mockMySQL
failing map[string]bool
countKillConnections map[string]int
}

func newMockOpFactory() *mockOpFactory {
return &mockOpFactory{
mysqls: make(map[string]*mockMySQL),
failing: make(map[string]bool),
mysqls: make(map[string]*mockMySQL),
failing: make(map[string]bool),
countKillConnections: make(map[string]int),
}
}

Expand Down Expand Up @@ -528,3 +536,15 @@ func (f *mockOpFactory) setRetrievedGTIDSet(name string, gtid string) {
m := f.getInstance(name)
m.setRetrievedGTIDSet(gtid)
}

func (f *mockOpFactory) resetKillConnectionsCount() {
f.mu.Lock()
defer f.mu.Unlock()
f.countKillConnections = make(map[string]int)
}

func (f *mockOpFactory) getKillConnectionsCount(name string) int {
f.mu.Lock()
defer f.mu.Unlock()
return f.countKillConnections[name]
}
7 changes: 7 additions & 0 deletions clustering/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,13 @@ func (p *managerProcess) configureReplica(ctx context.Context, ss *StatusSet, in

if !st.GlobalVariables.SuperReadOnly {
redo = true

// When a primary is demoted due to network failure, old connections via the primary service may remain.
// In rare cases, the old connections running write events block `set super_read_only=1`.
if err := op.KillConnections(ctx); err != nil {
return false, fmt.Errorf("failed to kill connections in instance %d: %w", index, err)
}

log.Info("set super_read_only=1", "instance", index)
if err := op.SetReadOnly(ctx, true); err != nil {
return false, err
Expand Down

0 comments on commit b61bb8a

Please sign in to comment.