diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 5c6da742c3b..607da919f22 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -739,7 +739,10 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha vs.lastError.Record(err) - if vs.lastError.ShouldRetry() { + if shouldFailNow(err) { + log.Errorf("VStream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) + return err + } else if vs.lastError.ShouldRetry() { log.Infof("Retrying tablet, count: %d, alias: %v, hostname: %s", backoffIndex, tablet.GetAlias(), tablet.GetHostname()) retryDelay := vs.backoffStrategy.Backoff(backoffIndex) backoffIndex++ @@ -755,6 +758,14 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } +func shouldFailNow(err error) bool { + errCode := vterrors.Code(err) + if errCode == vtrpcpb.Code_UNKNOWN && strings.Contains(err.Error(), "not all journaling participants are in the stream") { + return true + } + return false +} + // sendAll sends a group of events together while holding the lock. func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock()