From c303404318efbc39678565916c0ced285f59e523 Mon Sep 17 00:00:00 2001 From: twthorn Date: Tue, 6 Aug 2024 17:24:01 -0400 Subject: [PATCH] If we run out of tablets, surface the previous error Signed-off-by: twthorn --- go/vt/vtgate/vstream_manager.go | 21 +++++++++------------ go/vt/vtgate/vstream_manager_test.go | 6 +++++- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 607da919f22..9cc6538eec1 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -511,6 +511,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // It will be closed when all journal events converge. var journalDone chan struct{} ignoreTablets := make([]*topodatapb.TabletAlias, 0) + var prevErr error backoffIndex := 0 for { @@ -552,6 +553,12 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if err != nil { return tabletPickerErr(err) } + if len(tp.GetMatchingTablets(ctx)) == 0 { + tperr := vterrors.Wrapf(prevErr, "zero matching tablets for %s tablet for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + log.Errorf("%v", tperr) + return tperr + } // Create a child context with a stricter timeout when picking a tablet. // This will prevent hanging in the case no tablets are found. tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout) @@ -738,11 +745,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } vs.lastError.Record(err) + prevErr = err - if shouldFailNow(err) { - log.Errorf("VStream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) - return err - } else if vs.lastError.ShouldRetry() { + if vs.lastError.ShouldRetry() { log.Infof("Retrying tablet, count: %d, alias: %v, hostname: %s", backoffIndex, tablet.GetAlias(), tablet.GetHostname()) retryDelay := vs.backoffStrategy.Backoff(backoffIndex) backoffIndex++ @@ -758,14 +763,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } -func shouldFailNow(err error) bool { - errCode := vterrors.Code(err) - if errCode == vtrpcpb.Code_UNKNOWN && strings.Contains(err.Error(), "not all journaling participants are in the stream") { - return true - } - return false -} - // sendAll sends a group of events together while holding the lock. func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 79742e1b411..af8eec9e462 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -460,7 +460,7 @@ func TestVStreamRetriableErrors(t *testing.T) { if tcase.shouldSwitchTablets { // Retry just once before trying another tablet. vsm.maxTimeInError = 1 * time.Nanosecond - vsm.baseRetryDelay = 1 * time.Millisecond + vsm.baseRetryDelay = 1 * time.Nanosecond } else { // Retry at least once on the same tablet. vsm.maxTimeInError = 1 * time.Second @@ -937,6 +937,8 @@ func TestVStreamJournalPartialMatch(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) vsm := newTestVStreamManager(ctx, hc, st, "aa") + vsm.maxTimeInError = 1 * time.Nanosecond + vsm.baseRetryDelay = 1 * time.Nanosecond sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-10", sbc1.Tablet()) sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -1584,6 +1586,8 @@ func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{shard}) vsm := newTestVStreamManager(ctx, hc, st, cell) + vsm.maxTimeInError = 1 * time.Nanosecond + vsm.baseRetryDelay = 1 * time.Nanosecond vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ Keyspace: ks,