Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions cmd/cachewd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import (

"github.com/block/cachew/internal/config"
"github.com/block/cachew/internal/httputil"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
)

var cli struct {
Config *os.File `hcl:"-" help:"Configuration file path." placeholder:"PATH" required:"" default:"cachew.hcl"`
Bind string `hcl:"bind" default:"127.0.0.1:8080" help:"Bind address for the server."`
LoggingConfig logging.Config `embed:"" prefix:"log-"`
Config *os.File `hcl:"-" help:"Configuration file path." placeholder:"PATH" required:"" default:"cachew.hcl"`
Bind string `hcl:"bind" default:"127.0.0.1:8080" help:"Bind address for the server."`
SchedulerConfig jobscheduler.Config `embed:"" prefix:"scheduler-"`
LoggingConfig logging.Config `embed:"" prefix:"log-"`
}

func main() {
Expand All @@ -30,7 +32,9 @@ func main() {

mux := http.NewServeMux()

err := config.Load(ctx, cli.Config, mux, parseEnvars())
scheduler := jobscheduler.New(ctx, cli.SchedulerConfig)

err := config.Load(ctx, cli.Config, scheduler, mux, parseEnvars())
kctx.FatalIfErrorf(err)

logger.InfoContext(ctx, "Starting cachewd", slog.String("bind", cli.Bind))
Expand Down
3 changes: 2 additions & 1 deletion internal/cache/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/cache/cachetest"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/strategy"
)
Expand All @@ -26,7 +27,7 @@ func TestRemoteClient(t *testing.T) {
t.Cleanup(func() { memCache.Close() })

mux := http.NewServeMux()
_, err = strategy.NewAPIV1(ctx, struct{}{}, memCache, mux)
_, err = strategy.NewAPIV1(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), struct{}{}, memCache, mux)
assert.NoError(t, err)
ts := httptest.NewServer(mux)
t.Cleanup(ts.Close)
Expand Down
5 changes: 3 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/alecthomas/hcl/v2"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/strategy"
_ "github.com/block/cachew/internal/strategy/git" // Register git strategy
Expand All @@ -35,7 +36,7 @@ func (l *loggingMux) HandleFunc(pattern string, handler func(http.ResponseWriter
var _ strategy.Mux = (*loggingMux)(nil)

// Load HCL configuration and uses that to construct the cache backend, and proxy strategies.
func Load(ctx context.Context, r io.Reader, mux *http.ServeMux, vars map[string]string) error {
func Load(ctx context.Context, r io.Reader, scheduler jobscheduler.Scheduler, mux *http.ServeMux, vars map[string]string) error {
logger := logging.FromContext(ctx)
ast, err := hcl.Parse(r)
if err != nil {
Expand Down Expand Up @@ -79,7 +80,7 @@ func Load(ctx context.Context, r io.Reader, mux *http.ServeMux, vars map[string]
for _, block := range strategyCandidates {
logger := logger.With("strategy", block.Name)
mlog := &loggingMux{logger: logger, mux: mux}
_, err := strategy.Create(ctx, block.Name, block, cache, mlog)
_, err := strategy.Create(ctx, scheduler.WithQueuePrefix(block.Name), block.Name, block, cache, mlog)
if err != nil {
return errors.Errorf("%s: %w", block.Pos, err)
}
Expand Down
50 changes: 30 additions & 20 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,47 @@ type queueJob struct {
func (j *queueJob) String() string { return fmt.Sprintf("job-%s-%s", j.queue, j.id) }
func (j *queueJob) Run(ctx context.Context) error { return errors.WithStack(j.run(ctx)) }

// Scheduler runs background jobs concurrently across multiple serialised queues.
//
// That is, each queue can have at most one job running at a time, but multiple queues can run concurrently.
//
// Its primary role is to rate limit concurrent background tasks so that we don't DoS the host when, for example,
// generating git snapshots, GCing git repos, etc.
type Scheduler interface {
// WithQueuePrefix creates a new Scheduler that prefixes all queue names with the given prefix.
//
// This is useful to avoid collisions across strategies.
WithQueuePrefix(prefix string) Scheduler
// Submit a job to the queue.
//
// Jobs run concurrently across queues, but never within a queue.
Submit(queue, id string, run func(ctx context.Context) error)
// SubmitPeriodicJob submits a job to the queue that runs immediately, and then periodically after the interval.
//
// Jobs run concurrently across queues, but never within a queue.
SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error)
}

type PrefixedScheduler struct {
type prefixedScheduler struct {
prefix string
scheduler Scheduler
}

func (ps *PrefixedScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
ps.scheduler.Submit(ps.prefix+queue, id, run)
func (p *prefixedScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
p.scheduler.Submit(p.prefix+queue, id, run)
}

func (ps *PrefixedScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
ps.scheduler.SubmitPeriodicJob(ps.prefix+queue, id, interval, run)
func (p *prefixedScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
p.scheduler.SubmitPeriodicJob(p.prefix+queue, id, interval, run)
}

func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler {
return &prefixedScheduler{
prefix: p.prefix + "-" + prefix,
scheduler: p.scheduler,
}
}

// RootScheduler runs jobs from multiple queues.
//
// Its primary role is to rate limit concurrent background tasks so that we don't DoS the host when, for example,
// generating git snapshots, GCing git repos, etc.
type RootScheduler struct {
workAvailable chan bool
lock sync.Mutex
Expand Down Expand Up @@ -75,29 +94,20 @@ func New(ctx context.Context, config Config) Scheduler {
return q
}

// WithQueuePrefix creates a new Scheduler that prefixes all queue names with the given prefix.
//
// This is useful to avoid collisions across strategies.
func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler {
return &PrefixedScheduler{
prefix: prefix,
return &prefixedScheduler{
prefix: prefix + "-",
scheduler: q,
}
}

// Submit a job to the queue.
//
// Jobs run concurrently across queues, but never within a queue.
func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
q.lock.Lock()
defer q.lock.Unlock()
q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run})
q.workAvailable <- true
}

// SubmitPeriodicJob submits a job to the queue that runs immediately, and then periodically after the interval.
//
// Jobs run concurrently across queues, but never within a queue.
func (q *RootScheduler) SubmitPeriodicJob(queue, description string, interval time.Duration, run func(ctx context.Context) error) {
q.Submit(queue, description, func(ctx context.Context) error {
err := run(ctx)
Expand Down
20 changes: 14 additions & 6 deletions internal/strategy/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/alecthomas/hcl/v2"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/jobscheduler"
)

// ErrNotFound is returned when a strategy is not found.
Expand All @@ -19,27 +20,34 @@ type Mux interface {
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
}

var registry = map[string]func(ctx context.Context, config *hcl.Block, cache cache.Cache, mux Mux) (Strategy, error){}
var registry = map[string]func(ctx context.Context, scheduler jobscheduler.Scheduler, config *hcl.Block, cache cache.Cache, mux Mux) (Strategy, error){}

type Factory[Config any, S Strategy] func(ctx context.Context, config Config, cache cache.Cache, mux Mux) (S, error)
type Factory[Config any, S Strategy] func(ctx context.Context, scheduler jobscheduler.Scheduler, config Config, cache cache.Cache, mux Mux) (S, error)

// Register a new proxy strategy.
func Register[Config any, S Strategy](id string, factory Factory[Config, S]) {
registry[id] = func(ctx context.Context, config *hcl.Block, cache cache.Cache, mux Mux) (Strategy, error) {
registry[id] = func(ctx context.Context, scheduler jobscheduler.Scheduler, config *hcl.Block, cache cache.Cache, mux Mux) (Strategy, error) {
var cfg Config
if err := hcl.UnmarshalBlock(config, &cfg, hcl.AllowExtra(false)); err != nil {
return nil, errors.WithStack(err)
}
return factory(ctx, cfg, cache, mux)
return factory(ctx, scheduler, cfg, cache, mux)
}
}

// Create a new proxy strategy.
//
// Will return "ErrNotFound" if the strategy is not found.
func Create(ctx context.Context, name string, config *hcl.Block, cache cache.Cache, mux Mux) (Strategy, error) {
func Create(
ctx context.Context,
scheduler jobscheduler.Scheduler,
name string,
config *hcl.Block,
cache cache.Cache,
mux Mux,
) (Strategy, error) {
if factory, ok := registry[name]; ok {
return errors.WithStack2(factory(ctx, config, cache, mux))
return errors.WithStack2(factory(ctx, scheduler.WithQueuePrefix(name), config, cache, mux))
}
return nil, errors.Errorf("%s: %w", name, ErrNotFound)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/strategy/apiv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
)

Expand All @@ -27,7 +28,7 @@ type APIV1 struct {
logger *slog.Logger
}

func NewAPIV1(ctx context.Context, _ struct{}, cache cache.Cache, mux Mux) (*APIV1, error) {
func NewAPIV1(ctx context.Context, _ jobscheduler.Scheduler, _ struct{}, cache cache.Cache, mux Mux) (*APIV1, error) {
s := &APIV1{
logger: logging.FromContext(ctx),
cache: cache,
Expand Down
3 changes: 2 additions & 1 deletion internal/strategy/artifactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/strategy/handler"
)
Expand Down Expand Up @@ -52,7 +53,7 @@ type Artifactory struct {

var _ Strategy = (*Artifactory)(nil)

func NewArtifactory(ctx context.Context, config ArtifactoryConfig, cache cache.Cache, mux Mux) (*Artifactory, error) {
func NewArtifactory(ctx context.Context, _ jobscheduler.Scheduler, config ArtifactoryConfig, cache cache.Cache, mux Mux) (*Artifactory, error) {
u, err := url.Parse(config.Target)
if err != nil {
return nil, fmt.Errorf("invalid target URL: %w", err)
Expand Down
7 changes: 4 additions & 3 deletions internal/strategy/artifactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/strategy"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func setupArtifactoryTest(t *testing.T, config strategy.ArtifactoryConfig) (*moc
t.Cleanup(func() { memCache.Close() })

mux := http.NewServeMux()
_, err = strategy.NewArtifactory(ctx, config, memCache, mux)
_, err = strategy.NewArtifactory(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), config, memCache, mux)
assert.NoError(t, err)

return mock, mux, ctx
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestArtifactoryString(t *testing.T) {
defer memCache.Close()

mux := http.NewServeMux()
artifactory, err := strategy.NewArtifactory(ctx, strategy.ArtifactoryConfig{
artifactory, err := strategy.NewArtifactory(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), strategy.ArtifactoryConfig{
Target: "https://ec2.example.jfrog.io",
}, memCache, mux)
assert.NoError(t, err)
Expand All @@ -225,7 +226,7 @@ func TestArtifactoryInvalidTargetURL(t *testing.T) {
defer memCache.Close()

mux := http.NewServeMux()
_, err = strategy.NewArtifactory(ctx, strategy.ArtifactoryConfig{
_, err = strategy.NewArtifactory(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), strategy.ArtifactoryConfig{
Target: "://invalid-url",
}, memCache, mux)
assert.Error(t, err)
Expand Down
5 changes: 3 additions & 2 deletions internal/strategy/git/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/strategy/git"
)
Expand All @@ -22,7 +23,7 @@ func TestBundleHTTPEndpoint(t *testing.T) {
assert.NoError(t, err)
mux := newTestMux()

_, err = git.New(ctx, git.Config{
_, err = git.New(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), git.Config{
MirrorRoot: tmpDir,
BundleInterval: 24 * time.Hour,
}, memCache, mux)
Expand Down Expand Up @@ -98,7 +99,7 @@ func TestBundleInterval(t *testing.T) {
assert.NoError(t, err)
mux := newTestMux()

s, err := git.New(ctx, git.Config{
s, err := git.New(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), git.Config{
MirrorRoot: tmpDir,
BundleInterval: tt.bundleInterval,
}, memCache, mux)
Expand Down
3 changes: 2 additions & 1 deletion internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/alecthomas/errors"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/strategy"
)
Expand Down Expand Up @@ -62,7 +63,7 @@ type Strategy struct {
ctx context.Context
}

func New(ctx context.Context, config Config, cache cache.Cache, mux strategy.Mux) (*Strategy, error) {
func New(ctx context.Context, _ jobscheduler.Scheduler, config Config, cache cache.Cache, mux strategy.Mux) (*Strategy, error) {
logger := logging.FromContext(ctx)

if config.MirrorRoot == "" {
Expand Down
7 changes: 4 additions & 3 deletions internal/strategy/git/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/strategy/git"
)
Expand Down Expand Up @@ -64,7 +65,7 @@ func TestNew(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mux := newTestMux()
s, err := git.New(ctx, tt.config, nil, mux)
s, err := git.New(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), tt.config, nil, mux)
if tt.wantError != "" {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.wantError)
Expand Down Expand Up @@ -144,7 +145,7 @@ func TestNewWithExistingCloneOnDisk(t *testing.T) {
assert.NoError(t, err)

mux := newTestMux()
s, err := git.New(ctx, git.Config{
s, err := git.New(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), git.Config{
MirrorRoot: tmpDir,
FetchInterval: 15,
}, nil, mux)
Expand All @@ -167,7 +168,7 @@ func TestIntegrationWithMockUpstream(t *testing.T) {

// Create strategy - it will register handlers
mux := newTestMux()
_, err := git.New(ctx, git.Config{
_, err := git.New(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), git.Config{
MirrorRoot: tmpDir,
FetchInterval: 15,
}, nil, mux)
Expand Down
7 changes: 4 additions & 3 deletions internal/strategy/git/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/strategy/git"
)
Expand Down Expand Up @@ -53,7 +54,7 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) {

// Create the git strategy
mux := http.NewServeMux()
strategy, err := git.New(ctx, git.Config{
strategy, err := git.New(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), git.Config{
MirrorRoot: clonesDir,
FetchInterval: 15,
}, nil, mux)
Expand Down Expand Up @@ -131,7 +132,7 @@ func TestIntegrationGitFetchViaProxy(t *testing.T) {
assert.NoError(t, err)

mux := http.NewServeMux()
_, err = git.New(ctx, git.Config{
_, err = git.New(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), git.Config{
MirrorRoot: clonesDir,
FetchInterval: 15,
}, nil, mux)
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestIntegrationPushForwardsToUpstream(t *testing.T) {
defer upstreamServer.Close()

mux := http.NewServeMux()
_, err = git.New(ctx, git.Config{
_, err = git.New(ctx, jobscheduler.New(ctx, jobscheduler.Config{}), git.Config{
MirrorRoot: clonesDir,
FetchInterval: 15,
}, nil, mux)
Expand Down
Loading