Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions backend/cmd/server/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func provideCleanup(
opsScheduledReport *service.OpsScheduledReportService,
schedulerSnapshot *service.SchedulerSnapshotService,
tokenRefresh *service.TokenRefreshService,
oauthProbe *service.OAuthProbeService,
accountExpiry *service.AccountExpiryService,
usageCleanup *service.UsageCleanupService,
pricing *service.PricingService,
Expand Down Expand Up @@ -134,6 +135,12 @@ func provideCleanup(
tokenRefresh.Stop()
return nil
}},
{"OAuthProbeService", func() error {
if oauthProbe != nil {
oauthProbe.Stop()
}
return nil
}},
{"AccountExpiryService", func() error {
accountExpiry.Stop()
return nil
Expand Down
16 changes: 12 additions & 4 deletions backend/cmd/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions backend/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Config struct {
UsageCleanup UsageCleanupConfig `mapstructure:"usage_cleanup"`
Concurrency ConcurrencyConfig `mapstructure:"concurrency"`
TokenRefresh TokenRefreshConfig `mapstructure:"token_refresh"`
OAuthProbe OAuthProbeConfig `mapstructure:"oauth_probe"`
RunMode string `mapstructure:"run_mode" yaml:"run_mode"`
Timezone string `mapstructure:"timezone"` // e.g. "Asia/Shanghai", "UTC"
Gemini GeminiConfig `mapstructure:"gemini"`
Expand Down Expand Up @@ -127,6 +128,24 @@ type TokenRefreshConfig struct {
RetryBackoffSeconds int `mapstructure:"retry_backoff_seconds"`
}

// OAuthProbeConfig OAuth2 账号探活/配额同步配置
// 通过“测试连接”式的最小请求,定期更新 OAuth 账号可用性与配额信息(OpenAI/Gemini/Claude)。
type OAuthProbeConfig struct {
// 是否启用探活任务(默认关闭,避免对配额产生额外消耗)
Enabled bool `mapstructure:"enabled"`
// 检查间隔(分钟)
CheckIntervalMinutes int `mapstructure:"check_interval_minutes"`
// 闲置阈值(分钟):仅当账号在该时长内未被使用,才会触发探活(兜底机制)。
// <=0 表示不按闲置过滤(每轮都会探活符合条件的账号)。
IdleThresholdMinutes int `mapstructure:"idle_threshold_minutes"`
// 单次请求超时(秒)
RequestTimeoutSeconds int `mapstructure:"request_timeout_seconds"`
// 并发探活数(>0)
MaxConcurrency int `mapstructure:"max_concurrency"`
// 每轮最多探活账号数(<=0 表示不限制)
MaxAccountsPerCycle int `mapstructure:"max_accounts_per_cycle"`
}

type PricingConfig struct {
// 价格数据远程URL(默认使用LiteLLM镜像)
RemoteURL string `mapstructure:"remote_url"`
Expand Down Expand Up @@ -862,6 +881,14 @@ func setDefaults() {
viper.SetDefault("token_refresh.max_retries", 3) // 最多重试3次
viper.SetDefault("token_refresh.retry_backoff_seconds", 2) // 重试退避基础2秒

// OAuthProbe (disabled by default to avoid extra quota consumption)
viper.SetDefault("oauth_probe.enabled", false)
viper.SetDefault("oauth_probe.check_interval_minutes", 15)
viper.SetDefault("oauth_probe.idle_threshold_minutes", 15)
viper.SetDefault("oauth_probe.request_timeout_seconds", 20)
viper.SetDefault("oauth_probe.max_concurrency", 2)
viper.SetDefault("oauth_probe.max_accounts_per_cycle", 0)

// Gemini OAuth - configure via environment variables or config file
// GEMINI_OAUTH_CLIENT_ID and GEMINI_OAUTH_CLIENT_SECRET
// Default: uses Gemini CLI public credentials (set via environment)
Expand Down
102 changes: 89 additions & 13 deletions backend/internal/service/account_test_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"regexp"
"strings"
"time"

"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
Expand Down Expand Up @@ -46,7 +47,9 @@ type TestEvent struct {
type AccountTestService struct {
accountRepo AccountRepository
geminiTokenProvider *GeminiTokenProvider
openAITokenProvider *OpenAITokenProvider
antigravityGatewayService *AntigravityGatewayService
accountUsageService *AccountUsageService
httpUpstream HTTPUpstream
cfg *config.Config
}
Expand All @@ -55,14 +58,18 @@ type AccountTestService struct {
func NewAccountTestService(
accountRepo AccountRepository,
geminiTokenProvider *GeminiTokenProvider,
openAITokenProvider *OpenAITokenProvider,
antigravityGatewayService *AntigravityGatewayService,
accountUsageService *AccountUsageService,
httpUpstream HTTPUpstream,
cfg *config.Config,
) *AccountTestService {
return &AccountTestService{
accountRepo: accountRepo,
geminiTokenProvider: geminiTokenProvider,
openAITokenProvider: openAITokenProvider,
antigravityGatewayService: antigravityGatewayService,
accountUsageService: accountUsageService,
httpUpstream: httpUpstream,
cfg: cfg,
}
Expand Down Expand Up @@ -97,13 +104,19 @@ func generateSessionString() (string, error) {
return fmt.Sprintf("user_%s_account__session_%s", hex64, sessionUUID), nil
}

// createTestPayload creates a Claude Code style test request payload
func createTestPayload(modelID string) (map[string]any, error) {
// createTestPayload creates a Claude Code style test request payload.
// nonce is optional and can be used to avoid upstream caching when probing periodically.
func createTestPayload(modelID string, nonce string) (map[string]any, error) {
sessionID, err := generateSessionString()
if err != nil {
return nil, err
}

text := "hi"
if strings.TrimSpace(nonce) != "" {
text = fmt.Sprintf("hi-%s", strings.TrimSpace(nonce))
}

return map[string]any{
"model": modelID,
"messages": []map[string]any{
Expand All @@ -112,7 +125,7 @@ func createTestPayload(modelID string) (map[string]any, error) {
"content": []map[string]any{
{
"type": "text",
"text": "hi",
"text": text,
"cache_control": map[string]string{
"type": "ephemeral",
},
Expand Down Expand Up @@ -175,6 +188,7 @@ func (s *AccountTestService) testClaudeAccountConnection(c *gin.Context, account
if testModelID == "" {
testModelID = claude.DefaultTestModel
}
nonce, _ := randomHexString(8)

// For API Key accounts with model mapping, map the model
if account.Type == "apikey" {
Expand Down Expand Up @@ -228,7 +242,7 @@ func (s *AccountTestService) testClaudeAccountConnection(c *gin.Context, account
c.Writer.Flush()

// Create Claude Code style payload (same for all account types)
payload, err := createTestPayload(testModelID)
payload, err := createTestPayload(testModelID, nonce)
if err != nil {
return s.sendErrorAndEnd(c, "Failed to create test payload")
}
Expand Down Expand Up @@ -271,6 +285,16 @@ func (s *AccountTestService) testClaudeAccountConnection(c *gin.Context, account
}
defer func() { _ = resp.Body.Close() }()

// For Claude OAuth / Setup-Token accounts, sync usage snapshot (5h/7d windows) to DB extra.
// Do this regardless of status code so users can observe quota + reset times even when rate-limited.
if account.IsOAuth() && s.accountUsageService != nil {
go func(a *Account) {
updateCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = s.accountUsageService.SyncUsageSnapshotToExtra(updateCtx, a, usageSnapshotSourceTest)
}(account)
}

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return s.sendErrorAndEnd(c, fmt.Sprintf("API returned %d: %s", resp.StatusCode, string(body)))
Expand All @@ -290,6 +314,8 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
testModelID = openai.DefaultTestModel
}

nonce, _ := randomHexString(8)

// For API Key accounts with model mapping, map the model
if account.Type == "apikey" {
mapping := account.GetModelMapping()
Expand All @@ -309,8 +335,16 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
if account.IsOAuth() {
isOAuth = true
// OAuth - use Bearer token with ChatGPT internal API
authToken = account.GetOpenAIAccessToken()
if authToken == "" {
if s.openAITokenProvider != nil && account.Platform == PlatformOpenAI && account.Type == AccountTypeOAuth {
token, err := s.openAITokenProvider.GetAccessToken(ctx, account)
if err != nil {
return s.sendErrorAndEnd(c, fmt.Sprintf("Failed to get access token: %s", err.Error()))
}
authToken = token
} else {
authToken = account.GetOpenAIAccessToken()
}
if strings.TrimSpace(authToken) == "" {
return s.sendErrorAndEnd(c, "No access token available")
}

Expand Down Expand Up @@ -345,7 +379,7 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
c.Writer.Flush()

// Create OpenAI Responses API payload
payload := createOpenAITestPayload(testModelID, isOAuth)
payload := createOpenAITestPayload(testModelID, isOAuth, nonce)
payloadBytes, _ := json.Marshal(payload)

// Send test_start event
Expand Down Expand Up @@ -381,6 +415,23 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account
}
defer func() { _ = resp.Body.Close() }()

// For OpenAI OAuth (ChatGPT Codex), extract and persist quota windows from response headers.
// Do this even when status != 200 so that users can see 5h/7d usage + reset times.
if isOAuth {
if snapshot := extractCodexUsageHeaders(resp.Header); snapshot != nil {
if derived := deriveCodexUsageSnapshot(snapshot); derived != nil && len(derived.updates) > 0 {
go func(d *codexDerivedUsageSnapshot) {
updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = s.accountRepo.UpdateExtra(updateCtx, account.ID, d.updates)
if resetAt := codexRateLimitResetAt(d); resetAt != nil && resetAt.After(time.Now()) {
_ = s.accountRepo.SetRateLimited(updateCtx, account.ID, *resetAt)
}
}(derived)
}
}
}

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return s.sendErrorAndEnd(c, fmt.Sprintf("API returned %d: %s", resp.StatusCode, string(body)))
Expand Down Expand Up @@ -418,7 +469,8 @@ func (s *AccountTestService) testGeminiAccountConnection(c *gin.Context, account
c.Writer.Flush()

// Create test payload (Gemini format)
payload := createGeminiTestPayload()
nonce, _ := randomHexString(8)
payload := createGeminiTestPayload(nonce)

// Build request based on account type
var req *http.Request
Expand Down Expand Up @@ -452,6 +504,16 @@ func (s *AccountTestService) testGeminiAccountConnection(c *gin.Context, account
}
defer func() { _ = resp.Body.Close() }()

// Persist (simulated) Gemini quota snapshot to DB extra so account list can show it without per-row API calls.
// This is local computation (usage_logs + quota policy), not an upstream quota API.
if s.accountUsageService != nil {
go func(a *Account) {
updateCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = s.accountUsageService.SyncUsageSnapshotToExtra(updateCtx, a, usageSnapshotSourceTest)
}(account)
}

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return s.sendErrorAndEnd(c, fmt.Sprintf("API returned %d: %s", resp.StatusCode, string(body)))
Expand Down Expand Up @@ -602,14 +664,19 @@ func (s *AccountTestService) buildCodeAssistRequest(ctx context.Context, accessT
return req, nil
}

// createGeminiTestPayload creates a minimal test payload for Gemini API
func createGeminiTestPayload() []byte {
// createGeminiTestPayload creates a minimal test payload for Gemini API.
// nonce is optional and can be used to avoid upstream caching when probing periodically.
func createGeminiTestPayload(nonce string) []byte {
text := "hi"
if strings.TrimSpace(nonce) != "" {
text = fmt.Sprintf("hi-%s", strings.TrimSpace(nonce))
}
payload := map[string]any{
"contents": []map[string]any{
{
"role": "user",
"parts": []map[string]any{
{"text": "hi"},
{"text": text},
},
},
},
Expand Down Expand Up @@ -694,7 +761,12 @@ func (s *AccountTestService) processGeminiStream(c *gin.Context, body io.Reader)
}

// createOpenAITestPayload creates a test payload for OpenAI Responses API
func createOpenAITestPayload(modelID string, isOAuth bool) map[string]any {
func createOpenAITestPayload(modelID string, isOAuth bool, nonce string) map[string]any {
text := "hi"
if strings.TrimSpace(nonce) != "" {
text = fmt.Sprintf("hi-%s", nonce)
}

payload := map[string]any{
"model": modelID,
"input": []map[string]any{
Expand All @@ -703,7 +775,7 @@ func createOpenAITestPayload(modelID string, isOAuth bool) map[string]any {
"content": []map[string]any{
{
"type": "input_text",
"text": "hi",
"text": text,
},
},
},
Expand All @@ -714,6 +786,10 @@ func createOpenAITestPayload(modelID string, isOAuth bool) map[string]any {
// OAuth accounts using ChatGPT internal API require store: false
if isOAuth {
payload["store"] = false
// Randomize prompt_cache_key to avoid upstream caching between probes/tests.
if strings.TrimSpace(nonce) != "" {
payload["prompt_cache_key"] = "sub2api_test_" + strings.TrimSpace(nonce)
}
}

// All accounts require instructions for Responses API
Expand Down
1 change: 1 addition & 0 deletions backend/internal/service/account_usage_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type AccountUsageService struct {
antigravityQuotaFetcher *AntigravityQuotaFetcher
cache *UsageCache
identityCache IdentityCache
usageSnapshotSyncCache sync.Map // accountID -> time.Time (best-effort throttle)
}

// NewAccountUsageService 创建AccountUsageService实例
Expand Down
Loading
Loading