diff --git a/internal/strategy/git/bundle.go b/internal/strategy/git/bundle.go index faae6d8..874d3ce 100644 --- a/internal/strategy/git/bundle.go +++ b/internal/strategy/git/bundle.go @@ -5,60 +5,13 @@ import ( "io" "log/slog" "net/textproto" - "os" "strings" "time" - "github.com/alecthomas/errors" - "github.com/block/cachew/internal/cache" "github.com/block/cachew/internal/logging" ) -func (s *Strategy) cloneBundleLoop(ctx context.Context, c *clone) { - logger := logging.FromContext(ctx) - - s.generateAndUploadBundleIfMissing(ctx, c) - - ticker := time.NewTicker(s.config.BundleInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - logger.DebugContext(ctx, "Bundle generator shutting down", - slog.String("upstream", c.upstreamURL)) - return - - case <-ticker.C: - s.generateAndUploadBundle(ctx, c) - } - } -} - -func (s *Strategy) generateAndUploadBundleIfMissing(ctx context.Context, c *clone) { - logger := logging.FromContext(ctx) - - cacheKey := cache.NewKey(c.upstreamURL + ".bundle") - - reader, _, err := s.cache.Open(ctx, cacheKey) - if err == nil { - _ = reader.Close() - logger.DebugContext(ctx, "Bundle already exists in cache, skipping generation", - slog.String("upstream", c.upstreamURL)) - return - } - - if !errors.Is(err, os.ErrNotExist) { - logger.ErrorContext(ctx, "Failed to check for existing bundle", - slog.String("upstream", c.upstreamURL), - slog.String("error", err.Error())) - return - } - - s.generateAndUploadBundle(ctx, c) -} - func (s *Strategy) generateAndUploadBundle(ctx context.Context, c *clone) { logger := logging.FromContext(ctx) diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 94c7a74..88f2203 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -61,9 +61,10 @@ type Strategy struct { httpClient *http.Client proxy *httputil.ReverseProxy ctx context.Context + scheduler jobscheduler.Scheduler } -func New(ctx context.Context, _ jobscheduler.Scheduler, config Config, cache cache.Cache, mux strategy.Mux) (*Strategy, error) { +func New(ctx context.Context, scheduler jobscheduler.Scheduler, config Config, cache cache.Cache, mux strategy.Mux) (*Strategy, error) { logger := logging.FromContext(ctx) if config.MirrorRoot == "" { @@ -88,6 +89,7 @@ func New(ctx context.Context, _ jobscheduler.Scheduler, config Config, cache cac clones: make(map[string]*clone), httpClient: http.DefaultClient, ctx: ctx, + scheduler: scheduler.WithQueuePrefix("git"), } if err := s.discoverExistingClones(ctx); err != nil { @@ -170,7 +172,7 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { slog.String("error", err.Error())) } } - s.maybeBackgroundFetch(ctx, c) + s.maybeBackgroundFetch(c) s.serveFromBackend(w, r, c) case stateCloning: @@ -179,7 +181,10 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { case stateEmpty: logger.DebugContext(ctx, "Starting background clone, forwarding to upstream") - go s.startClone(context.WithoutCancel(ctx), c) + s.scheduler.Submit(c.upstreamURL, "clone", func(ctx context.Context) error { + s.startClone(ctx, c) + return nil + }) s.forwardToUpstream(w, r, host, pathValue) } } @@ -268,7 +273,7 @@ func (s *Strategy) getOrCreateClone(ctx context.Context, upstreamURL string) *cl slog.String("path", clonePath)) if s.config.BundleInterval > 0 { - go s.cloneBundleLoop(s.ctx, c) + s.scheduleBundleJobs(c) } } @@ -349,7 +354,7 @@ func (s *Strategy) discoverExistingClones(ctx context.Context) error { slog.String("upstream", upstreamURL)) if s.config.BundleInterval > 0 { - go s.cloneBundleLoop(s.ctx, c) + s.scheduleBundleJobs(c) } return nil @@ -397,11 +402,11 @@ func (s *Strategy) startClone(ctx context.Context, c *clone) { slog.String("path", c.path)) if s.config.BundleInterval > 0 { - go s.cloneBundleLoop(context.WithoutCancel(ctx), c) + s.scheduleBundleJobs(c) } } -func (s *Strategy) maybeBackgroundFetch(ctx context.Context, c *clone) { +func (s *Strategy) maybeBackgroundFetch(c *clone) { c.mu.RLock() lastFetch := c.lastFetch c.mu.RUnlock() @@ -410,7 +415,10 @@ func (s *Strategy) maybeBackgroundFetch(ctx context.Context, c *clone) { return } - go s.backgroundFetch(context.WithoutCancel(ctx), c) + s.scheduler.Submit(c.upstreamURL, "fetch", func(ctx context.Context) error { + s.backgroundFetch(ctx, c) + return nil + }) } func (s *Strategy) backgroundFetch(ctx context.Context, c *clone) { @@ -434,3 +442,10 @@ func (s *Strategy) backgroundFetch(ctx context.Context, c *clone) { slog.String("error", err.Error())) } } + +func (s *Strategy) scheduleBundleJobs(c *clone) { + s.scheduler.SubmitPeriodicJob(c.upstreamURL, "bundle-periodic", s.config.BundleInterval, func(ctx context.Context) error { + s.generateAndUploadBundle(ctx, c) + return nil + }) +}