Skip to content

Commit 16b05c1

Browse files
authored
VReplication: use new topo named locks and TTL override for workflow coordination (#16260)
Signed-off-by: Matt Lord <mattalord@gmail.com>
1 parent eb29999 commit 16b05c1

File tree

30 files changed

+724
-112
lines changed

30 files changed

+724
-112
lines changed

go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ func GetSwitchTrafficCommand(opts *SubCommandsOpts) *cobra.Command {
4848
topodatapb.TabletType_RDONLY,
4949
}
5050
}
51+
if SwitchTrafficOptions.Timeout.Seconds() < 1 {
52+
return fmt.Errorf("timeout value must be at least 1 second")
53+
}
5154
return nil
5255
},
5356
RunE: commandSwitchTraffic,

go/cmd/vtctldclient/command/vreplication/common/utils.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ var (
4747
}
4848
onDDLDefault = binlogdatapb.OnDDLAction_IGNORE.String()
4949
MaxReplicationLagDefault = 30 * time.Second
50-
TimeoutDefault = 30 * time.Second
5150

5251
BaseOptions = struct {
5352
Workflow string
@@ -245,7 +244,7 @@ var SwitchTrafficOptions = struct {
245244
func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences bool) {
246245
cmd.Flags().StringSliceVarP(&SwitchTrafficOptions.Cells, "cells", "c", nil, "Cells and/or CellAliases to switch traffic in.")
247246
cmd.Flags().Var((*topoproto.TabletTypeListFlag)(&SwitchTrafficOptions.TabletTypes), "tablet-types", "Tablet types to switch traffic for.")
248-
cmd.Flags().DurationVar(&SwitchTrafficOptions.Timeout, "timeout", TimeoutDefault, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.")
247+
cmd.Flags().DurationVar(&SwitchTrafficOptions.Timeout, "timeout", workflow.DefaultTimeout, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.")
249248
cmd.Flags().DurationVar(&SwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", MaxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this.")
250249
cmd.Flags().BoolVar(&SwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover.")
251250
cmd.Flags().BoolVar(&SwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred.")

go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
3636
"vitess.io/vitess/go/protoutil"
3737
"vitess.io/vitess/go/sqltypes"
38+
"vitess.io/vitess/go/vt/vtctl/workflow"
3839
"vitess.io/vitess/go/vt/vterrors"
3940
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
4041

@@ -874,7 +875,7 @@ func registerCommands(root *cobra.Command) {
874875
create.Flags().StringSliceVar(&createOptions.TargetCells, "target-cells", nil, "The target cell(s) to compare with; default is any available cell.")
875876
create.Flags().Var((*topoprotopb.TabletTypeListFlag)(&createOptions.TabletTypes), "tablet-types", "Tablet types to use on the source and target.")
876877
create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.")
877-
create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.")
878+
create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", workflow.DefaultTimeout, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.")
878879
create.Flags().Int64Var(&createOptions.Limit, "limit", math.MaxInt64, "Max rows to stop comparing after.")
879880
create.Flags().BoolVar(&createOptions.DebugQuery, "debug-query", false, "Adds a mysql query to the report that can be used for further debugging.")
880881
create.Flags().Int64Var(&createOptions.MaxReportSampleRows, "max-report-sample-rows", 10, "Maximum number of row differences to report (0 for all differences). NOTE: when increasing this value it is highly recommended to also specify --only-pks")

go/test/endtoend/topotest/etcd2/main_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package ectd2
1919
import (
2020
"context"
2121
"flag"
22+
"fmt"
2223
"os"
2324
"testing"
2425
"time"
@@ -201,6 +202,74 @@ func TestKeyspaceLocking(t *testing.T) {
201202
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
202203
}
203204

205+
// TestLockingWithTTL tests that locking with the TTL override works as intended.
206+
func TestLockingWithTTL(t *testing.T) {
207+
// Create the topo server connection.
208+
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
209+
require.NoError(t, err)
210+
211+
ctx := context.Background()
212+
213+
// Acquire a keyspace lock with a short custom TTL.
214+
ttl := 1 * time.Second
215+
ctx, unlock, err := ts.LockKeyspace(ctx, KeyspaceName, "TestLockingWithTTL", topo.WithTTL(ttl))
216+
require.NoError(t, err)
217+
defer unlock(&err)
218+
219+
// Check that CheckKeyspaceLocked DOES return an error after waiting more than
220+
// the specified TTL as we should have lost our lock.
221+
time.Sleep(ttl * 2)
222+
err = topo.CheckKeyspaceLocked(ctx, KeyspaceName)
223+
require.Error(t, err)
224+
}
225+
226+
// TestNamedLocking tests that named locking works as intended.
227+
func TestNamedLocking(t *testing.T) {
228+
// Create topo server connection.
229+
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
230+
require.NoError(t, err)
231+
232+
ctx := context.Background()
233+
lockName := "TestNamedLocking"
234+
action := "Testing"
235+
236+
// Acquire a named lock.
237+
ctx, unlock, err := ts.LockName(ctx, lockName, action)
238+
require.NoError(t, err)
239+
240+
// Check that we can't reacquire it from the same context.
241+
_, _, err = ts.LockName(ctx, lockName, action)
242+
require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName))
243+
244+
// Check that CheckNameLocked doesn't return an error as we should still be
245+
// holding the lock.
246+
err = topo.CheckNameLocked(ctx, lockName)
247+
require.NoError(t, err)
248+
249+
// We'll now try to acquire the lock from a different goroutine.
250+
secondCallerAcquired := false
251+
go func() {
252+
_, unlock, err := ts.LockName(context.Background(), lockName, action)
253+
defer unlock(&err)
254+
require.NoError(t, err)
255+
secondCallerAcquired = true
256+
}()
257+
258+
// Wait for some time and ensure that the second attempt at acquiring the lock
259+
// is blocked.
260+
time.Sleep(100 * time.Millisecond)
261+
require.False(t, secondCallerAcquired)
262+
263+
// Unlock the name.
264+
unlock(&err)
265+
// Check that we no longer have the named lock.
266+
err = topo.CheckNameLocked(ctx, lockName)
267+
require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName))
268+
269+
// Wait to see that the second goroutine WAS now able to acquire the named lock.
270+
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
271+
}
272+
204273
func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
205274
t.Helper()
206275
var res []*sqltypes.Result

go/test/endtoend/vreplication/vreplication_test_env.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ limitations under the License.
1717
package vreplication
1818

1919
var dryRunResultsSwitchWritesCustomerShard = []string{
20-
"Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [PRIMARY]",
2120
"Lock keyspace product",
2221
"Lock keyspace customer",
22+
"Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [PRIMARY]",
2323
"/Stop writes on keyspace product for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:product;shard:0;position:",
2424
"Wait for vreplication on stopped streams to catchup for up to 30s",
2525
"Create reverse vreplication workflow p2c_reverse",
@@ -36,8 +36,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{
3636
}
3737

3838
var dryRunResultsReadCustomerShard = []string{
39-
"Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [RDONLY,REPLICA]",
4039
"Lock keyspace product",
40+
"Mirroring 0.00 percent of traffic from keyspace product to keyspace customer for tablet types [RDONLY,REPLICA]",
4141
"Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]",
4242
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
4343
"Serving VSchema will be rebuilt for the customer keyspace",

go/vt/topo/conn.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package topo
1919
import (
2020
"context"
2121
"sort"
22+
"time"
2223
)
2324

2425
// Conn defines the interface that must be implemented by topology
@@ -120,6 +121,21 @@ type Conn interface {
120121
// Returns ErrInterrupted if ctx is canceled.
121122
Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error)
122123

124+
// LockWithTTL is similar to `Lock` but the difference is that it allows
125+
// you to override the global default TTL that is configured for the
126+
// implementation (--topo_etcd_lease_ttl and --topo_consul_lock_session_ttl).
127+
// Note: this is no different than `Lock` for ZooKeeper as it does not
128+
// support lock TTLs and they exist until released or the session ends.
129+
LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (LockDescriptor, error)
130+
131+
// LockName is similar to `Lock` but the difference is that it does not require
132+
// the path to exist and have children in order to lock it. This is because with
133+
// named locks you are NOT locking an actual topo entity such as a Keyspace record.
134+
// Because this lock is not blocking any Vitess operations OTHER than another
135+
// caller that is trying to get the same named lock, there is a static 24 hour
136+
// TTL on them to ensure that they are eventually cleaned up.
137+
LockName(ctx context.Context, dirPath, contents string) (LockDescriptor, error)
138+
123139
// TryLock takes lock on the given directory with a fail-fast approach.
124140
// It is similar to `Lock` but the difference is it attempts to acquire the lock
125141
// if it is likely to succeed. If there is already a lock on given path, then unlike `Lock`

go/vt/topo/consultopo/lock.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"path"
23+
"time"
2324

2425
"github.com/hashicorp/consul/api"
2526

@@ -49,7 +50,27 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD
4950
return nil, convertError(err, dirPath)
5051
}
5152

52-
return s.lock(ctx, dirPath, contents)
53+
return s.lock(ctx, dirPath, contents, s.lockTTL)
54+
}
55+
56+
// LockWithTTL is part of the topo.Conn interface.
57+
func (s *Server) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (topo.LockDescriptor, error) {
58+
// We list the directory first to make sure it exists.
59+
if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil {
60+
// We need to return the right error codes, like
61+
// topo.ErrNoNode and topo.ErrInterrupted, and the
62+
// easiest way to do this is to return convertError(err).
63+
// It may lose some of the context, if this is an issue,
64+
// maybe logging the error would work here.
65+
return nil, convertError(err, dirPath)
66+
}
67+
68+
return s.lock(ctx, dirPath, contents, ttl.String())
69+
}
70+
71+
// LockName is part of the topo.Conn interface.
72+
func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
73+
return s.lock(ctx, dirPath, contents, topo.NamedLockTTL.String())
5374
}
5475

5576
// TryLock is part of the topo.Conn interface.
@@ -74,11 +95,11 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo
7495
}
7596

7697
// everything is good let's acquire the lock.
77-
return s.lock(ctx, dirPath, contents)
98+
return s.lock(ctx, dirPath, contents, s.lockTTL)
7899
}
79100

80101
// Lock is part of the topo.Conn interface.
81-
func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
102+
func (s *Server) lock(ctx context.Context, dirPath, contents, ttl string) (topo.LockDescriptor, error) {
82103
lockPath := path.Join(s.root, dirPath, locksFilename)
83104

84105
lockOpts := &api.LockOptions{
@@ -90,12 +111,19 @@ func (s *Server) lock(ctx context.Context, dirPath, contents string) (topo.LockD
90111
},
91112
}
92113
lockOpts.SessionOpts.Checks = s.lockChecks
93-
if s.lockDelay > 0 {
94-
lockOpts.SessionOpts.LockDelay = s.lockDelay
95-
}
96114
if s.lockTTL != "" {
115+
// Override the API default with the global default from
116+
// --topo_consul_lock_session_ttl.
97117
lockOpts.SessionOpts.TTL = s.lockTTL
98118
}
119+
if ttl != "" {
120+
// Override the global default with the one provided by the
121+
// caller.
122+
lockOpts.SessionOpts.TTL = ttl
123+
}
124+
if s.lockDelay > 0 {
125+
lockOpts.SessionOpts.LockDelay = s.lockDelay
126+
}
99127
// Build the lock structure.
100128
l, err := s.client.LockOpts(lockOpts)
101129
if err != nil {

go/vt/topo/consultopo/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ type Server struct {
111111
locks map[string]*lockInstance
112112

113113
lockChecks []string
114-
lockTTL string
114+
lockTTL string // This is the default used for all non-named locks
115115
lockDelay time.Duration
116116
}
117117

go/vt/topo/etcd2topo/election.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (mp *etcdLeaderParticipation) WaitForLeadership() (context.Context, error)
9191

9292
// Try to get the primaryship, by getting a lock.
9393
var err error
94-
ld, err = mp.s.lock(lockCtx, electionPath, mp.id)
94+
ld, err = mp.s.lock(lockCtx, electionPath, mp.id, leaseTTL)
9595
if err != nil {
9696
// It can be that we were interrupted.
9797
return nil, err

go/vt/topo/etcd2topo/lock.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"path"
23+
"time"
2324

2425
"github.com/spf13/pflag"
2526

@@ -34,7 +35,7 @@ import (
3435
)
3536

3637
var (
37-
leaseTTL = 30
38+
leaseTTL = 30 // This is the default used for all non-named locks
3839
)
3940

4041
func init() {
@@ -153,7 +154,7 @@ func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.Lo
153154
}
154155

155156
// everything is good let's acquire the lock.
156-
return s.lock(ctx, dirPath, contents)
157+
return s.lock(ctx, dirPath, contents, leaseTTL)
157158
}
158159

159160
// Lock is part of the topo.Conn interface.
@@ -168,15 +169,35 @@ func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockD
168169
return nil, convertError(err, dirPath)
169170
}
170171

171-
return s.lock(ctx, dirPath, contents)
172+
return s.lock(ctx, dirPath, contents, leaseTTL)
173+
}
174+
175+
// LockWithTTL is part of the topo.Conn interface.
176+
func (s *Server) LockWithTTL(ctx context.Context, dirPath, contents string, ttl time.Duration) (topo.LockDescriptor, error) {
177+
// We list the directory first to make sure it exists.
178+
if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil {
179+
// We need to return the right error codes, like
180+
// topo.ErrNoNode and topo.ErrInterrupted, and the
181+
// easiest way to do this is to return convertError(err).
182+
// It may lose some of the context, if this is an issue,
183+
// maybe logging the error would work here.
184+
return nil, convertError(err, dirPath)
185+
}
186+
187+
return s.lock(ctx, dirPath, contents, int(ttl.Seconds()))
188+
}
189+
190+
// LockName is part of the topo.Conn interface.
191+
func (s *Server) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
192+
return s.lock(ctx, dirPath, contents, int(topo.NamedLockTTL.Seconds()))
172193
}
173194

174195
// lock is used by both Lock() and primary election.
175-
func (s *Server) lock(ctx context.Context, nodePath, contents string) (topo.LockDescriptor, error) {
196+
func (s *Server) lock(ctx context.Context, nodePath, contents string, ttl int) (topo.LockDescriptor, error) {
176197
nodePath = path.Join(s.root, nodePath, locksPath)
177198

178199
// Get a lease, set its KeepAlive.
179-
lease, err := s.cli.Grant(ctx, int64(leaseTTL))
200+
lease, err := s.cli.Grant(ctx, int64(ttl))
180201
if err != nil {
181202
return nil, convertError(err, nodePath)
182203
}

go/vt/topo/faketopo/faketopo.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"strings"
2222
"sync"
23+
"time"
2324

2425
"vitess.io/vitess/go/vt/log"
2526
"vitess.io/vitess/go/vt/topo"
@@ -291,6 +292,20 @@ func (f *FakeConn) Lock(ctx context.Context, dirPath, contents string) (topo.Loc
291292
return &fakeLockDescriptor{}, nil
292293
}
293294

295+
// LockWithTTL implements the Conn interface.
296+
func (f *FakeConn) LockWithTTL(ctx context.Context, dirPath, contents string, _ time.Duration) (topo.LockDescriptor, error) {
297+
f.mu.Lock()
298+
defer f.mu.Unlock()
299+
return &fakeLockDescriptor{}, nil
300+
}
301+
302+
// LockName implements the Conn interface.
303+
func (f *FakeConn) LockName(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
304+
f.mu.Lock()
305+
defer f.mu.Unlock()
306+
return &fakeLockDescriptor{}, nil
307+
}
308+
294309
// TryLock is part of the topo.Conn interface. Its implementation is same as Lock
295310
func (f *FakeConn) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
296311
return f.Lock(ctx, dirPath, contents)

go/vt/topo/keyspace_lock.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ func (s *keyspaceLock) Path() string {
4343
// - a context with a locksInfo structure for future reference.
4444
// - an unlock method
4545
// - an error if anything failed.
46-
func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) {
46+
func (ts *Server) LockKeyspace(ctx context.Context, keyspace, action string, opts ...LockOption) (context.Context, func(*error), error) {
4747
return ts.internalLock(ctx, &keyspaceLock{
4848
keyspace: keyspace,
49-
}, action, true)
49+
}, action, opts...)
5050
}
5151

5252
// CheckKeyspaceLocked can be called on a context to make sure we have the lock

0 commit comments

Comments
 (0)