Skip to content

Commit 13ba09d

Browse files
authored
Merge branch 'slack-15.0' into logical-backups-backport2
2 parents 9b83c1e + 37b690c commit 13ba09d

File tree

16 files changed

+1451
-1049
lines changed

16 files changed

+1451
-1049
lines changed

go/cmd/vtctldclient/command/reparents.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ var emergencyReparentShardOptions = struct {
9292
Force bool
9393
WaitReplicasTimeout time.Duration
9494
NewPrimaryAliasStr string
95+
ExpectedPrimaryAliasStr string
9596
IgnoreReplicaAliasStrList []string
9697
PreventCrossCellPromotion bool
9798
}{}
@@ -104,6 +105,7 @@ func commandEmergencyReparentShard(cmd *cobra.Command, args []string) error {
104105

105106
var (
106107
newPrimaryAlias *topodatapb.TabletAlias
108+
expectedPrimaryAlias *topodatapb.TabletAlias
107109
ignoreReplicaAliases = make([]*topodatapb.TabletAlias, len(emergencyReparentShardOptions.IgnoreReplicaAliasStrList))
108110
)
109111

@@ -114,6 +116,13 @@ func commandEmergencyReparentShard(cmd *cobra.Command, args []string) error {
114116
}
115117
}
116118

119+
if emergencyReparentShardOptions.ExpectedPrimaryAliasStr != "" {
120+
expectedPrimaryAlias, err = topoproto.ParseTabletAlias(emergencyReparentShardOptions.ExpectedPrimaryAliasStr)
121+
if err != nil {
122+
return err
123+
}
124+
}
125+
117126
for i, aliasStr := range emergencyReparentShardOptions.IgnoreReplicaAliasStrList {
118127
alias, err := topoproto.ParseTabletAlias(aliasStr)
119128
if err != nil {
@@ -129,6 +138,7 @@ func commandEmergencyReparentShard(cmd *cobra.Command, args []string) error {
129138
Keyspace: keyspace,
130139
Shard: shard,
131140
NewPrimary: newPrimaryAlias,
141+
ExpectedPrimary: expectedPrimaryAlias,
132142
IgnoreReplicas: ignoreReplicaAliases,
133143
WaitReplicasTimeout: protoutil.DurationToProto(emergencyReparentShardOptions.WaitReplicasTimeout),
134144
PreventCrossCellPromotion: emergencyReparentShardOptions.PreventCrossCellPromotion,
@@ -181,9 +191,10 @@ func commandInitShardPrimary(cmd *cobra.Command, args []string) error {
181191
}
182192

183193
var plannedReparentShardOptions = struct {
184-
NewPrimaryAliasStr string
185-
AvoidPrimaryAliasStr string
186-
WaitReplicasTimeout time.Duration
194+
NewPrimaryAliasStr string
195+
AvoidPrimaryAliasStr string
196+
ExpectedPrimaryAliasStr string
197+
WaitReplicasTimeout time.Duration
187198
}{}
188199

189200
func commandPlannedReparentShard(cmd *cobra.Command, args []string) error {
@@ -193,8 +204,9 @@ func commandPlannedReparentShard(cmd *cobra.Command, args []string) error {
193204
}
194205

195206
var (
196-
newPrimaryAlias *topodatapb.TabletAlias
197-
avoidPrimaryAlias *topodatapb.TabletAlias
207+
newPrimaryAlias *topodatapb.TabletAlias
208+
avoidPrimaryAlias *topodatapb.TabletAlias
209+
expectedPrimaryAlias *topodatapb.TabletAlias
198210
)
199211

200212
if plannedReparentShardOptions.NewPrimaryAliasStr != "" {
@@ -211,13 +223,21 @@ func commandPlannedReparentShard(cmd *cobra.Command, args []string) error {
211223
}
212224
}
213225

226+
if plannedReparentShardOptions.ExpectedPrimaryAliasStr != "" {
227+
expectedPrimaryAlias, err = topoproto.ParseTabletAlias(plannedReparentShardOptions.ExpectedPrimaryAliasStr)
228+
if err != nil {
229+
return err
230+
}
231+
}
232+
214233
cli.FinishedParsing(cmd)
215234

216235
resp, err := client.PlannedReparentShard(commandCtx, &vtctldatapb.PlannedReparentShardRequest{
217236
Keyspace: keyspace,
218237
Shard: shard,
219238
NewPrimary: newPrimaryAlias,
220239
AvoidPrimary: avoidPrimaryAlias,
240+
ExpectedPrimary: expectedPrimaryAlias,
221241
WaitReplicasTimeout: protoutil.DurationToProto(plannedReparentShardOptions.WaitReplicasTimeout),
222242
})
223243
if err != nil {
@@ -280,6 +300,7 @@ func commandTabletExternallyReparented(cmd *cobra.Command, args []string) error
280300
func init() {
281301
EmergencyReparentShard.Flags().DurationVar(&emergencyReparentShardOptions.WaitReplicasTimeout, "wait-replicas-timeout", topo.RemoteOperationTimeout, "Time to wait for replicas to catch up in reparenting.")
282302
EmergencyReparentShard.Flags().StringVar(&emergencyReparentShardOptions.NewPrimaryAliasStr, "new-primary", "", "Alias of a tablet that should be the new primary. If not specified, the vtctld will select the best candidate to promote.")
303+
EmergencyReparentShard.Flags().StringVar(&emergencyReparentShardOptions.ExpectedPrimaryAliasStr, "expected-primary", "", "Alias of a tablet that must be the current primary in order for the reparent to be processed.")
283304
EmergencyReparentShard.Flags().BoolVar(&emergencyReparentShardOptions.PreventCrossCellPromotion, "prevent-cross-cell-promotion", false, "Only promotes a new primary from the same cell as the previous primary.")
284305
EmergencyReparentShard.Flags().StringSliceVarP(&emergencyReparentShardOptions.IgnoreReplicaAliasStrList, "ignore-replicas", "i", nil, "Comma-separated, repeated list of replica tablet aliases to ignore during the emergency reparent.")
285306
Root.AddCommand(EmergencyReparentShard)
@@ -291,6 +312,7 @@ func init() {
291312
PlannedReparentShard.Flags().DurationVar(&plannedReparentShardOptions.WaitReplicasTimeout, "wait-replicas-timeout", topo.RemoteOperationTimeout, "Time to wait for replicas to catch up on replication both before and after reparenting.")
292313
PlannedReparentShard.Flags().StringVar(&plannedReparentShardOptions.NewPrimaryAliasStr, "new-primary", "", "Alias of a tablet that should be the new primary.")
293314
PlannedReparentShard.Flags().StringVar(&plannedReparentShardOptions.AvoidPrimaryAliasStr, "avoid-primary", "", "Alias of a tablet that should not be the primary; i.e. \"reparent to any other tablet if this one is the primary\".")
315+
PlannedReparentShard.Flags().StringVar(&plannedReparentShardOptions.ExpectedPrimaryAliasStr, "expected-primary", "", "Alias of a tablet that must be the current primary in order for the reparent to be processed.")
294316
Root.AddCommand(PlannedReparentShard)
295317

296318
Root.AddCommand(ReparentTablet)

go/vt/discovery/healthcheck.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
365365
cellAliases: make(map[string]string),
366366
}
367367
var topoWatchers []*TopologyWatcher
368-
var filter TabletFilter
369368
cells := strings.Split(cellsToWatch, ",")
370369
if cellsToWatch == "" {
371370
cells = append(cells, localCell)
@@ -376,19 +375,8 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
376375
if c == "" {
377376
continue
378377
}
379-
if len(tabletFilters) > 0 {
380-
if len(KeyspacesToWatch) > 0 {
381-
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
382-
}
383-
fbs, err := NewFilterByShard(tabletFilters)
384-
if err != nil {
385-
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
386-
}
387-
filter = fbs
388-
} else if len(KeyspacesToWatch) > 0 {
389-
filter = NewFilterByKeyspace(KeyspacesToWatch)
390-
}
391-
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
378+
379+
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
392380
}
393381

394382
hc.topoWatchers = topoWatchers

go/vt/discovery/topology_watcher_test.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,11 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
112112
defer ts.Close()
113113
fhc := NewFakeHealthCheck(nil)
114114
defer fhc.Close()
115+
filter := NewFilterByKeyspace([]string{"keyspace"})
115116
logger := logutil.NewMemoryLogger()
116117
topologyWatcherOperations.ZeroAll()
117118
counts := topologyWatcherOperations.Counts()
118-
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5)
119+
tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5)
119120

120121
counts = checkOpCounts(t, counts, map[string]int64{})
121122
checkChecksum(t, tw, 0)
@@ -162,10 +163,31 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
162163
require.NoError(t, ts.CreateTablet(context.Background(), tablet2), "CreateTablet failed for %v", tablet2.Alias)
163164
tw.loadTablets()
164165

166+
// Confirm second tablet triggers ListTablets + AddTablet calls.
165167
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
166168
checkChecksum(t, tw, 2762153755)
167169

168-
// Check the new tablet is returned by GetAllTablets().
170+
// Add a third tablet in a filtered keyspace to the topology.
171+
tablet3 := &topodatapb.Tablet{
172+
Alias: &topodatapb.TabletAlias{
173+
Cell: "aa",
174+
Uid: 3,
175+
},
176+
Hostname: "host3",
177+
PortMap: map[string]int32{
178+
"vt": 789,
179+
},
180+
Keyspace: "excluded",
181+
Shard: "shard",
182+
}
183+
require.NoError(t, ts.CreateTablet(context.Background(), tablet3), "CreateTablet failed for %v", tablet3.Alias)
184+
tw.loadTablets()
185+
186+
// Confirm filtered tablet did not trigger an AddTablet call.
187+
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 0})
188+
checkChecksum(t, tw, 3177315266)
189+
190+
// Check the second tablet is returned by GetAllTablets(). This should not contain the filtered tablet.
169191
allTablets = fhc.GetAllTablets()
170192
key = TabletToMapKey(tablet2)
171193
assert.Len(t, allTablets, 2)
@@ -197,14 +219,14 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
197219
assert.Contains(t, allTablets, key)
198220
assert.True(t, proto.Equal(tablet, allTablets[key]))
199221
assert.NotContains(t, allTablets, origKey)
200-
checkChecksum(t, tw, 2762153755)
222+
checkChecksum(t, tw, 3177315266)
201223
} else {
202224
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 0})
203225
assert.Len(t, allTablets, 2)
204226
assert.Contains(t, allTablets, origKey)
205227
assert.True(t, proto.Equal(origTablet, allTablets[origKey]))
206228
assert.NotContains(t, allTablets, key)
207-
checkChecksum(t, tw, 2762153755)
229+
checkChecksum(t, tw, 3177315266)
208230
}
209231

210232
// Both tablets restart on different hosts.
@@ -260,7 +282,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
260282
require.Nil(t, err, "FixShardReplication failed")
261283
tw.loadTablets()
262284
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1})
263-
checkChecksum(t, tw, 789108290)
285+
checkChecksum(t, tw, 852159264)
264286

265287
allTablets = fhc.GetAllTablets()
266288
assert.Len(t, allTablets, 1)
@@ -271,8 +293,10 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
271293
assert.Contains(t, allTablets, key)
272294
assert.True(t, proto.Equal(tablet2, allTablets[key]))
273295

274-
// Remove the other and check that it is detected as being gone.
296+
// Remove the other tablets and check that it is detected as being gone.
297+
// Deleting the filtered tablet should not trigger a RemoveTablet call.
275298
require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias))
299+
require.NoError(t, ts.DeleteTablet(context.Background(), tablet3.Alias))
276300
_, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard")
277301
require.Nil(t, err, "FixShardReplication failed")
278302
tw.loadTablets()

go/vt/mysqlctl/xtrabackupengine.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ const (
6868
streamModeTar = "tar"
6969
xtrabackupBinaryName = "xtrabackup"
7070
xtrabackupEngineName = "xtrabackup"
71+
xtrabackupInfoFile = "xtrabackup_info"
7172
xbstream = "xbstream"
7273

7374
// closeTimeout is the timeout for closing backup files after writing.
@@ -238,15 +239,22 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara
238239
return true, nil
239240
}
240241

241-
func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, backupFileName string, numStripes int, flavor string) (replicationPosition mysql.Position, finalErr error) {
242-
242+
func (be *XtrabackupEngine) backupFiles(
243+
ctx context.Context,
244+
params BackupParams,
245+
bh backupstorage.BackupHandle,
246+
backupFileName string,
247+
numStripes int,
248+
flavor string,
249+
) (replicationPosition mysql.Position, finalErr error) {
243250
backupProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName)
244251
flagsToExec := []string{"--defaults-file=" + params.Cnf.Path,
245252
"--backup",
246253
"--socket=" + params.Cnf.SocketFile,
247254
"--slave-info",
248255
"--user=" + xtrabackupUser,
249256
"--target-dir=" + params.Cnf.TmpDir,
257+
"--extra-lsndir=" + params.Cnf.TmpDir,
250258
}
251259
if xtrabackupStreamMode != "" {
252260
flagsToExec = append(flagsToExec, "--stream="+xtrabackupStreamMode)
@@ -345,27 +353,14 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
345353
// the replication position. Note that if we don't read stderr as we go, the
346354
// xtrabackup process gets blocked when the write buffer fills up.
347355
stderrBuilder := &strings.Builder{}
348-
posBuilder := &strings.Builder{}
349356
stderrDone := make(chan struct{})
350357
go func() {
351358
defer close(stderrDone)
352359

353360
scanner := bufio.NewScanner(backupErr)
354-
capture := false
355361
for scanner.Scan() {
356362
line := scanner.Text()
357363
params.Logger.Infof("xtrabackup stderr: %s", line)
358-
359-
// Wait until we see the first line of the binlog position.
360-
// Then capture all subsequent lines. We need multiple lines since
361-
// the value we're looking for has newlines in it.
362-
if !capture {
363-
if !strings.Contains(line, "MySQL binlog position") {
364-
continue
365-
}
366-
capture = true
367-
}
368-
fmt.Fprintln(posBuilder, line)
369364
}
370365
if err := scanner.Err(); err != nil {
371366
params.Logger.Errorf("error reading from xtrabackup stderr: %v", err)
@@ -409,8 +404,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
409404
return replicationPosition, vterrors.Wrap(err, fmt.Sprintf("xtrabackup failed with error. Output=%s", sterrOutput))
410405
}
411406

412-
posOutput := posBuilder.String()
413-
replicationPosition, rerr := findReplicationPosition(posOutput, flavor, params.Logger)
407+
replicationPosition, rerr := findReplicationPositionFromXtrabackupInfo(params.Cnf.TmpDir, flavor, params.Logger)
414408
if rerr != nil {
415409
return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position")
416410
}
@@ -694,6 +688,22 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
694688
return nil
695689
}
696690

691+
func findReplicationPositionFromXtrabackupInfo(directory, flavor string, logger logutil.Logger) (mysql.Position, error) {
692+
f, err := os.Open(path.Join(directory, xtrabackupInfoFile))
693+
if err != nil {
694+
return mysql.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT,
695+
"couldn't open %q to read GTID position", path.Join(directory, xtrabackupInfoFile))
696+
}
697+
defer f.Close()
698+
699+
contents, err := io.ReadAll(f)
700+
if err != nil {
701+
return mysql.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "couldn't read GTID position from %q", f.Name())
702+
}
703+
704+
return findReplicationPosition(string(contents), flavor, logger)
705+
}
706+
697707
var xtrabackupReplicationPositionRegexp = regexp.MustCompile(`GTID of the last change '([^']*)'`)
698708

699709
func findReplicationPosition(input, flavor string, logger logutil.Logger) (mysql.Position, error) {

go/vt/mysqlctl/xtrabackupengine_test.go

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@ import (
2020
"bytes"
2121
"io"
2222
"math/rand"
23+
"os"
24+
"path"
2325
"testing"
2426

27+
"github.com/stretchr/testify/assert"
28+
2529
"vitess.io/vitess/go/vt/logutil"
2630
)
2731

@@ -51,26 +55,48 @@ func TestFindReplicationPosition(t *testing.T) {
5155
}
5256
}
5357

54-
func TestFindReplicationPositionNoMatch(t *testing.T) {
58+
func TestFindReplicationPositionFromXtrabackupInfo(t *testing.T) {
59+
input := `tool_version = 8.0.35-30
60+
binlog_pos = filename 'vt-0476396352-bin.000005', position '310088991', GTID of the last change '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,
61+
1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,
62+
47b59de1-b368-11e9-b48b-624401d35560:1-152981,
63+
557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,
64+
599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,
65+
b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262'
66+
format = xbstream
67+
`
68+
want := "145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262"
69+
70+
tmp, err := os.MkdirTemp(t.TempDir(), "test")
71+
assert.NoError(t, err)
72+
73+
f, err := os.Create(path.Join(tmp, xtrabackupInfoFile))
74+
assert.NoError(t, err)
75+
_, err = f.WriteString(input)
76+
assert.NoError(t, err)
77+
assert.NoError(t, f.Close())
78+
79+
pos, err := findReplicationPositionFromXtrabackupInfo(tmp, "MySQL56", logutil.NewConsoleLogger())
80+
assert.NoError(t, err)
81+
assert.Equal(t, want, pos.String())
82+
}
83+
84+
func TestFindReplicationPositionNoMatchFromXtrabackupInfo(t *testing.T) {
5585
// Make sure failure to find a match triggers an error.
5686
input := `nothing`
5787

58-
_, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger())
59-
if err == nil {
60-
t.Fatalf("expected error from findReplicationPosition but got nil")
61-
}
88+
_, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger())
89+
assert.Error(t, err)
6290
}
6391

64-
func TestFindReplicationPositionEmptyMatch(t *testing.T) {
92+
func TestFindReplicationPositionEmptyMatchFromXtrabackupInfo(t *testing.T) {
6593
// Make sure failure to find a match triggers an error.
6694
input := `GTID of the last change '
6795
6896
'`
6997

70-
_, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger())
71-
if err == nil {
72-
t.Fatalf("expected error from findReplicationPosition but got nil")
73-
}
98+
_, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger())
99+
assert.Error(t, err)
74100
}
75101

76102
func TestStripeRoundTrip(t *testing.T) {

0 commit comments

Comments
 (0)