From fe751453a2eaf344cd21276296688435cacc82c5 Mon Sep 17 00:00:00 2001 From: twthorn Date: Tue, 6 Aug 2024 16:15:59 -0400 Subject: [PATCH] Fail immediately for partial journal events Signed-off-by: twthorn --- go/vt/vtgate/vstream_manager.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 5c6da742c3b..607da919f22 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -739,7 +739,10 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha vs.lastError.Record(err) - if vs.lastError.ShouldRetry() { + if shouldFailNow(err) { + log.Errorf("VStream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) + return err + } else if vs.lastError.ShouldRetry() { log.Infof("Retrying tablet, count: %d, alias: %v, hostname: %s", backoffIndex, tablet.GetAlias(), tablet.GetHostname()) retryDelay := vs.backoffStrategy.Backoff(backoffIndex) backoffIndex++ @@ -755,6 +758,14 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } +func shouldFailNow(err error) bool { + errCode := vterrors.Code(err) + if errCode == vtrpcpb.Code_UNKNOWN && strings.Contains(err.Error(), "not all journaling participants are in the stream") { + return true + } + return false +} + // sendAll sends a group of events together while holding the lock. func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock()