Skip to content

Commit

Permalink
Simplify
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 13, 2025
1 parent e1acdf5 commit 6f2a0cc
Showing 1 changed file with 10 additions and 29 deletions.
39 changes: 10 additions & 29 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,6 @@ const tabletPickerContextTimeout = 90 * time.Second
// ending the stream from the tablet.
const stopOnReshardDelay = 500 * time.Millisecond

const partialJournalingParticipantsMsg = "not all journaling participants are in the stream"

type partialJournalingParticipantsError struct {
details string
}

func (p *partialJournalingParticipantsError) Error() string {
return fmt.Sprintf("%s: %s", partialJournalingParticipantsMsg, p.details)
}

func NewPartialJournalingParticipantsError(format string, args ...any) error {
return &partialJournalingParticipantsError{
details: fmt.Sprintf(format, args...),
}
}

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -816,12 +800,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// A tablet should be ignored upon retry if it's likely another tablet will not
// produce the same error.
func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) {
// Not having all journaling participants available is considered fatal and
// requires a new VStream.
if _, ok := vterrors.UnwrapAll(err).(*partialJournalingParticipantsError); ok {
return false, false
}

errCode := vterrors.Code(err)
// In this context, where we will run the tablet picker again on retry, these
// codes indicate that it's worth a retry as the error is likely a transient
Expand All @@ -840,13 +818,14 @@ func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) {
}
return false, false
}

// For anything else, if this is a recoverable/ephemeral error then retry.
if !vreplication.IsUnrecoverableError(err) {
return true, false
// Internal errors such as not having all journaling partipants require a new
// VStream.
if errCode == vtrpcpb.Code_INTERNAL {
return false, false
}

return false, false
// For anything else, if this is a recoverable/ephemeral error then retry.
return vreplication.IsUnrecoverableError(err), false
}

// sendAll sends a group of events together while holding the lock.
Expand Down Expand Up @@ -987,7 +966,8 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar
mode = matchAll
je.participants[inner] = false
case matchNone:
return nil, NewPartialJournalingParticipantsError("journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not all journaling participants are in the stream: journal: %v, stream: %v",
journal.Participants, vs.vgtid.ShardGtids)
}
continue nextParticipant
}
Expand All @@ -996,7 +976,8 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar
case undecided, matchNone:
mode = matchNone
case matchAll:
return nil, NewPartialJournalingParticipantsError("journal: %v, stream: %v", journal.Participants, vs.vgtid.ShardGtids)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "not all journaling participants are in the stream: journal: %v, stream: %v",
journal.Participants, vs.vgtid.ShardGtids)
}
}
if mode == matchNone {
Expand Down

0 comments on commit 6f2a0cc

Please sign in to comment.