Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multihost synchronization #562

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
71 changes: 69 additions & 2 deletions cmd/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/gen/go/v1/v1connect"
"github.com/garethgeorge/backrest/internal/api"
syncengine "github.com/garethgeorge/backrest/internal/api/syncapi"
"github.com/garethgeorge/backrest/internal/auth"
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/env"
Expand Down Expand Up @@ -124,18 +125,65 @@ func main() {
}()

// Create and serve the HTTP gateway
syncHandler := syncengine.NewBackrestSyncHandler(configStore, log)

var syncWg sync.WaitGroup
var syncCtxCancel context.CancelFunc

syncConfigHook := &configHookForSyncEngine{
store: configStore,
onChange: func(cfg *v1.Config) {
if syncCtxCancel != nil {
zap.L().Info("cancelling existing sync context due to config change, preparing to reinitialize sync engine")
syncCtxCancel()
}
syncWg.Wait()

// Start running a sync loop with this context for each distinct URI
reposByURI := make(map[string][]*v1.Repo)

for _, repo := range cfg.Repos {
if !syncengine.IsBackrestRemoteRepoURI(repo.Uri) {
continue
}
zap.L()
reposByURI[repo.Uri] = append(reposByURI[repo.Uri], repo)
}

if len(reposByURI) == 0 {
return
}

ctx, cancel := context.WithCancel(context.Background())
syncCtxCancel = cancel

for uri, repos := range reposByURI {
syncWg.Add(1)

zap.S().Infof("starting sync engine for URI %q serving %d repos", uri, len(repos))

go func(uri string, repos []*v1.Repo) {
defer syncWg.Done()
// Create a new sync engine for this URI
syncClient := syncengine.NewSyncClient(cfg.Instance, uri, log)
syncClient.RunSyncForRepos(ctx, repos)
}(uri, repos)
}
},
}

apiBackrestHandler := api.NewBackrestHandler(
configStore,
syncConfigHook,
orchestrator,
log,
logStore,
)

authenticator := auth.NewAuthenticator(getSecret(), configStore)
apiAuthenticationHandler := api.NewAuthenticationHandler(authenticator)

mux := http.NewServeMux()
mux.Handle(v1connect.NewAuthenticationHandler(apiAuthenticationHandler))
mux.Handle(v1connect.NewBackrestSyncServiceHandler(syncHandler))
backrestHandlerPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler)
mux.Handle(backrestHandlerPath, auth.RequireAuthentication(backrestHandler, authenticator))
mux.Handle("/", webui.Handler())
Expand Down Expand Up @@ -294,3 +342,22 @@ func migrateBboltOplog(logstore oplog.OpStore) {
}
zap.S().Infof("migrated %d operations from old bbolt oplog to sqlite", count)
}

type configHookForSyncEngine struct {
store config.ConfigStore
onChange func(*v1.Config)
}

var _ config.ConfigStore = &configHookForSyncEngine{}

func (c *configHookForSyncEngine) Get() (*v1.Config, error) {
return c.store.Get()
}

func (c *configHookForSyncEngine) Update(cfg *v1.Config) error {
err := c.store.Update(cfg)
if err == nil {
c.onChange(cfg)
}
return err
}
Loading
Loading