From d21d93358a4e543e72e963c066c09ab07d2d54b8 Mon Sep 17 00:00:00 2001 From: jzunigax2 <125698953+jzunigax2@users.noreply.github.com> Date: Fri, 6 Feb 2026 16:17:31 -0600 Subject: [PATCH] internxt: implement re-login under refresh logic, improve retry logic --- backend/internxt/auth.go | 102 ++++++++++++++++++++++- backend/internxt/internxt.go | 155 +++++++++++++++++++++++------------ 2 files changed, 203 insertions(+), 54 deletions(-) diff --git a/backend/internxt/auth.go b/backend/internxt/auth.go index 4bcac9ec4697f..63bdb5246e65d 100644 --- a/backend/internxt/auth.go +++ b/backend/internxt/auth.go @@ -13,8 +13,12 @@ import ( "github.com/golang-jwt/jwt/v5" internxtauth "github.com/internxt/rclone-adapter/auth" internxtconfig "github.com/internxt/rclone-adapter/config" + sdkerrors "github.com/internxt/rclone-adapter/errors" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/config/obscure" + "github.com/rclone/rclone/fs/fserrors" + "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/lib/oauthutil" "golang.org/x/oauth2" ) @@ -101,7 +105,6 @@ func jwtToOAuth2Token(jwtString string) (*oauth2.Token, error) { } // computeBasicAuthHeader creates the BasicAuthHeader for bucket operations -// Following the pattern from SDK's auth/access.go:96-102 func computeBasicAuthHeader(bridgeUser, userID string) string { sum := sha256.Sum256([]byte(userID)) hexPass := hex.EncodeToString(sum[:]) @@ -144,3 +147,100 @@ func refreshJWTToken(ctx context.Context, name string, m configmap.Mapper) error fs.Debugf(name, "Token refreshed successfully, new expiry: %v", token.Expiry) return nil } + +// reLogin performs a full re-login using stored email+password credentials. +// Returns the AccessResponse on success, or an error if 2FA is required or login fails. +func (f *Fs) reLogin(ctx context.Context) (*internxtauth.AccessResponse, error) { + password, err := obscure.Reveal(f.opt.Pass) + if err != nil { + return nil, fmt.Errorf("couldn't decrypt password: %w", err) + } + + cfg := internxtconfig.NewDefaultToken("") + cfg.HTTPClient = fshttp.NewClient(ctx) + + loginResp, err := internxtauth.Login(ctx, cfg, f.opt.Email) + if err != nil { + return nil, fmt.Errorf("re-login check failed: %w", err) + } + + if loginResp.TFA { + return nil, errors.New("account requires 2FA - please run: rclone config reconnect " + f.name + ":") + } + + resp, err := internxtauth.DoLogin(ctx, cfg, f.opt.Email, password, "") + if err != nil { + return nil, fmt.Errorf("re-login failed: %w", err) + } + + return resp, nil +} + +// refreshOrReLogin tries to refresh the JWT token first; if that fails with 401, +// it falls back to a full re-login using stored credentials. +func (f *Fs) refreshOrReLogin(ctx context.Context) error { + refreshErr := refreshJWTToken(ctx, f.name, f.m) + if refreshErr == nil { + newToken, err := oauthutil.GetToken(f.name, f.m) + if err != nil { + return fmt.Errorf("failed to get refreshed token: %w", err) + } + f.cfg.Token = newToken.AccessToken + f.cfg.BasicAuthHeader = computeBasicAuthHeader(f.bridgeUser, f.userID) + fs.Debugf(f, "Token refresh succeeded") + return nil + } + + var httpErr *sdkerrors.HTTPError + if !errors.As(refreshErr, &httpErr) || httpErr.StatusCode() != 401 { + if fserrors.ShouldRetry(refreshErr) { + return refreshErr + } + return refreshErr + } + + fs.Debugf(f, "Token refresh returned 401, attempting re-login with stored credentials") + + resp, err := f.reLogin(ctx) + if err != nil { + return fmt.Errorf("re-login fallback failed: %w", err) + } + + oauthToken, err := jwtToOAuth2Token(resp.NewToken) + if err != nil { + return fmt.Errorf("failed to parse re-login token: %w", err) + } + err = oauthutil.PutToken(f.name, f.m, oauthToken, true) + if err != nil { + return fmt.Errorf("failed to save re-login token: %w", err) + } + + f.cfg.Token = oauthToken.AccessToken + f.bridgeUser = resp.User.BridgeUser + f.userID = resp.User.UserID + f.cfg.BasicAuthHeader = computeBasicAuthHeader(f.bridgeUser, f.userID) + f.cfg.Bucket = resp.User.Bucket + f.cfg.RootFolderID = resp.User.RootFolderID + + fs.Debugf(f, "Re-login succeeded, new token expiry: %v", oauthToken.Expiry) + return nil +} + +// reAuthorize is called after getting 401 from the server. +// It serializes re-auth attempts and uses a circuit-breaker to avoid infinite loops. +func (f *Fs) reAuthorize(ctx context.Context) error { + f.authMu.Lock() + defer f.authMu.Unlock() + + if f.authFailed { + return errors.New("re-authorization permanently failed") + } + + err := f.refreshOrReLogin(ctx) + if err != nil { + f.authFailed = true + return err + } + + return nil +} diff --git a/backend/internxt/internxt.go b/backend/internxt/internxt.go index 0bf6f78a47139..3efeb5aac100c 100644 --- a/backend/internxt/internxt.go +++ b/backend/internxt/internxt.go @@ -11,6 +11,7 @@ import ( "path" "path/filepath" "strings" + "sync" "time" "github.com/internxt/rclone-adapter/auth" @@ -41,16 +42,34 @@ const ( decayConstant = 2 // bigger for slower decay, exponential ) -// shouldRetry determines if an error should be retried -func shouldRetry(ctx context.Context, err error) (bool, error) { +// shouldRetry determines if an error should be retried. +// On 401, it attempts to re-authorize before retrying. +// On 429, it honours the server's rate limit retry delay. +func (f *Fs) shouldRetry(ctx context.Context, err error) (bool, error) { if fserrors.ContextError(ctx, &err) { return false, err } var httpErr *sdkerrors.HTTPError - if errors.As(err, &httpErr) && httpErr.StatusCode() == 401 { - return true, err + if errors.As(err, &httpErr) { + switch httpErr.StatusCode() { + case 401: + if !f.authFailed { + authErr := f.reAuthorize(ctx) + if authErr != nil { + fs.Debugf(f, "Re-authorization failed: %v", authErr) + return false, err + } + return true, err + } + return false, err + case 429: + delay := httpErr.RetryAfter() + if delay <= 0 { + delay = time.Second + } + return true, pacer.RetryAfterError(err, delay) + } } - return fserrors.ShouldRetry(err), err } @@ -184,6 +203,7 @@ type Fs struct { name string root string opt Options + m configmap.Mapper dirCache *dircache.DirCache cfg *config.Config features *fs.Features @@ -191,6 +211,8 @@ type Fs struct { tokenRenewer *oauthutil.Renew bridgeUser string userID string + authMu sync.Mutex + authFailed bool } // Object holds the data for a remote file object @@ -263,25 +285,52 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e cfg.SkipHashValidation = opt.SkipHashValidation cfg.HTTPClient = fshttp.NewClient(ctx) - userInfo, err := getUserInfo(ctx, &userInfoConfig{Token: cfg.Token}) - if err != nil { - return nil, fmt.Errorf("failed to fetch user info: %w", err) + f := &Fs{ + name: name, + root: strings.Trim(root, "/"), + opt: *opt, + m: m, + cfg: cfg, } - cfg.RootFolderID = userInfo.RootFolderID - cfg.Bucket = userInfo.Bucket - cfg.BasicAuthHeader = computeBasicAuthHeader(userInfo.BridgeUser, userInfo.UserID) + f.pacer = fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))) - f := &Fs{ - name: name, - root: strings.Trim(root, "/"), - opt: *opt, - cfg: cfg, - bridgeUser: userInfo.BridgeUser, - userID: userInfo.UserID, + var userInfo *userInfo + const maxRetries = 3 + for attempt := 1; attempt <= maxRetries; attempt++ { + userInfo, err = getUserInfo(ctx, &userInfoConfig{Token: f.cfg.Token}) + if err == nil { + break + } + + var httpErr *sdkerrors.HTTPError + if errors.As(err, &httpErr) && httpErr.StatusCode() == 401 { + fs.Debugf(f, "getUserInfo returned 401, attempting re-auth") + authErr := f.refreshOrReLogin(ctx) + if authErr != nil { + return nil, fmt.Errorf("failed to fetch user info (re-auth failed): %w", err) + } + userInfo, err = getUserInfo(ctx, &userInfoConfig{Token: f.cfg.Token}) + if err == nil { + break + } + return nil, fmt.Errorf("failed to fetch user info after re-auth: %w", err) + } + + if fserrors.ShouldRetry(err) && attempt < maxRetries { + fs.Debugf(f, "getUserInfo transient error (attempt %d/%d): %v", attempt, maxRetries, err) + time.Sleep(time.Duration(attempt) * time.Second) + continue + } + + return nil, fmt.Errorf("failed to fetch user info: %w", err) } - f.pacer = fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))) + f.cfg.RootFolderID = userInfo.RootFolderID + f.cfg.Bucket = userInfo.Bucket + f.cfg.BasicAuthHeader = computeBasicAuthHeader(userInfo.BridgeUser, userInfo.UserID) + f.bridgeUser = userInfo.BridgeUser + f.userID = userInfo.UserID f.features = (&fs.Features{ CanHaveEmptyDirectories: true, @@ -289,19 +338,9 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e if ts != nil { f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { - err := refreshJWTToken(ctx, name, m) - if err != nil { - return err - } - - newToken, err := oauthutil.GetToken(name, m) - if err != nil { - return fmt.Errorf("failed to get refreshed token: %w", err) - } - f.cfg.Token = newToken.AccessToken - f.cfg.BasicAuthHeader = computeBasicAuthHeader(f.bridgeUser, f.userID) - - return nil + f.authMu.Lock() + defer f.authMu.Unlock() + return f.refreshOrReLogin(ctx) }) f.tokenRenewer.Start() } @@ -312,9 +351,19 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e if err != nil { // Assume it might be a file newRoot, remote := dircache.SplitPath(f.root) - tempF := *f - tempF.dirCache = dircache.New(newRoot, f.cfg.RootFolderID, &tempF) - tempF.root = newRoot + tempF := &Fs{ + name: f.name, + root: newRoot, + opt: f.opt, + m: f.m, + cfg: f.cfg, + features: f.features, + pacer: f.pacer, + tokenRenewer: f.tokenRenewer, + bridgeUser: f.bridgeUser, + userID: f.userID, + } + tempF.dirCache = dircache.New(newRoot, f.cfg.RootFolderID, tempF) err = tempF.dirCache.FindRoot(ctx, false) if err != nil { @@ -367,7 +416,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error { err = f.pacer.Call(func() (bool, error) { var err error childFolders, err = folders.ListAllFolders(ctx, f.cfg, id) - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { return err @@ -380,7 +429,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error { err = f.pacer.Call(func() (bool, error) { var err error childFiles, err = folders.ListAllFiles(ctx, f.cfg, id) - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { return err @@ -395,7 +444,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error { if err != nil && strings.Contains(err.Error(), "404") { return false, fs.ErrorDirNotFound } - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { return err @@ -412,7 +461,7 @@ func (f *Fs) FindLeaf(ctx context.Context, pathID, leaf string) (string, bool, e err := f.pacer.Call(func() (bool, error) { var err error entries, err = folders.ListAllFolders(ctx, f.cfg, pathID) - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { return "", false, err @@ -437,7 +486,7 @@ func (f *Fs) CreateDir(ctx context.Context, pathID, leaf string) (string, error) err := f.pacer.CallNoRetry(func() (bool, error) { var err error resp, err = folders.CreateFolder(ctx, f.cfg, request) - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { // If folder already exists (409 conflict), try to find it @@ -525,7 +574,7 @@ func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { err = f.pacer.Call(func() (bool, error) { var err error foldersList, err = folders.ListAllFolders(ctx, f.cfg, dirID) - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, err @@ -538,7 +587,7 @@ func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { err = f.pacer.Call(func() (bool, error) { var err error filesList, err = folders.ListAllFiles(ctx, f.cfg, dirID) - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, err @@ -616,7 +665,7 @@ func (f *Fs) Remove(ctx context.Context, remote string) error { } err = f.pacer.Call(func() (bool, error) { err := folders.DeleteFolder(ctx, f.cfg, dirID) - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { return err @@ -642,7 +691,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { err = f.pacer.Call(func() (bool, error) { var err error files, err = folders.ListAllFiles(ctx, f.cfg, dirID) - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, err @@ -720,7 +769,7 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { err := f.pacer.Call(func() (bool, error) { var err error internxtLimit, err = users.GetLimit(ctx, f.cfg) - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, err @@ -730,7 +779,7 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { err = f.pacer.Call(func() (bool, error) { var err error internxtUsage, err = users.GetUsage(ctx, f.cfg) - return shouldRetry(ctx, err) + return f.shouldRetry(ctx, err) }) if err != nil { return nil, err @@ -776,7 +825,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo err := o.f.pacer.Call(func() (bool, error) { var err error stream, err = buckets.DownloadFileStream(ctx, o.f.cfg, o.id, rangeValue) - return shouldRetry(ctx, err) + return o.f.shouldRetry(ctx, err) }) if err != nil { return nil, err @@ -826,7 +875,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return false, nil } } - return shouldRetry(ctx, err) + return o.f.shouldRetry(ctx, err) }) if err != nil { return fmt.Errorf("failed to rename existing file to backup: %w", err) @@ -847,7 +896,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op src.Size(), src.ModTime(ctx), ) - return shouldRetry(ctx, err) + return o.f.shouldRetry(ctx, err) }) if err != nil && isEmptyFileLimitError(err) { @@ -885,7 +934,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } } } - return shouldRetry(ctx, err) + return o.f.shouldRetry(ctx, err) }) if err != nil { fs.Errorf(o.f, "Failed to delete backup file %s.%s (UUID: %s): %v. This may leave an orphaned backup file.", @@ -939,7 +988,7 @@ func (o *Object) recoverFromTimeoutConflict(ctx context.Context, uploadErr error checkErr := o.f.pacer.Call(func() (bool, error) { existingFile, err := o.f.preUploadCheck(ctx, encodedName, dirID) if err != nil { - return shouldRetry(ctx, err) + return o.f.shouldRetry(ctx, err) } if existingFile != nil { name := strings.TrimSuffix(baseName, filepath.Ext(baseName)) @@ -978,7 +1027,7 @@ func (o *Object) restoreBackupFile(ctx context.Context, backupUUID, origName, or _ = o.f.pacer.Call(func() (bool, error) { err := files.RenameFile(ctx, o.f.cfg, backupUUID, o.f.opt.Encoding.FromStandardName(origName), origType) - return shouldRetry(ctx, err) + return o.f.shouldRetry(ctx, err) }) } @@ -986,6 +1035,6 @@ func (o *Object) restoreBackupFile(ctx context.Context, backupUUID, origName, or func (o *Object) Remove(ctx context.Context) error { return o.f.pacer.Call(func() (bool, error) { err := files.DeleteFile(ctx, o.f.cfg, o.uuid) - return shouldRetry(ctx, err) + return o.f.shouldRetry(ctx, err) }) }