diff --git a/cluster.go b/cluster.go deleted file mode 100644 index 592b7ed..0000000 --- a/cluster.go +++ /dev/null @@ -1,467 +0,0 @@ -/** - * OpenBmclAPI (Golang Edition) - * Copyright (C) 2023 Kevin Z - * All rights reserved - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package main - -import ( - "context" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - "net" - "net/http" - "os" - "path/filepath" - "runtime" - "sync" - "sync/atomic" - "time" - - "github.com/LiterMC/socket.io" - "github.com/LiterMC/socket.io/engine.io" - "github.com/gorilla/websocket" - "github.com/gregjones/httpcache" - - gocache "github.com/LiterMC/go-openbmclapi/cache" - "github.com/LiterMC/go-openbmclapi/database" - "github.com/LiterMC/go-openbmclapi/internal/build" - "github.com/LiterMC/go-openbmclapi/limited" - "github.com/LiterMC/go-openbmclapi/log" - "github.com/LiterMC/go-openbmclapi/notify" - "github.com/LiterMC/go-openbmclapi/notify/email" - "github.com/LiterMC/go-openbmclapi/notify/webpush" - "github.com/LiterMC/go-openbmclapi/storage" - "github.com/LiterMC/go-openbmclapi/utils" -) - -type Cluster struct { - host string // not the public access host, but maybe a public IP, or a host that will be resolved to the IP - publicHosts []string // should not contains port, can be nil - publicPort uint16 - clusterId string - clusterSecret string - prefix string - byoc bool - jwtIssuer string - - dataDir string - maxConn int - storageOpts []storage.StorageOption - storages []storage.Storage - storageWeights []uint - storageTotalWeight uint - cache gocache.Cache - apiHmacKey []byte - hijackProxy *HjProxy - - stats notify.Stats - lastHits, statOnlyHits atomic.Int32 - lastHbts, statOnlyHbts atomic.Int64 - issync atomic.Bool - syncProg atomic.Int64 - syncTotal atomic.Int64 - - mux sync.RWMutex - enabled atomic.Bool - disabled chan struct{} - waitEnable []chan struct{} - shouldEnable atomic.Bool - reconnectCount int - socket *socket.Socket - cancelKeepalive context.CancelFunc - downloadMux sync.RWMutex - downloading map[string]*downloadingItem - filesetMux sync.RWMutex - fileset map[string]int64 - authTokenMux sync.RWMutex - authToken *ClusterToken - - client *http.Client - cachedCli *http.Client - bufSlots *limited.BufSlots - database database.DB - notifyManager *notify.Manager - webpushKeyB64 string - updateChecker *time.Ticker - apiRateLimiter *limited.APIRateMiddleWare - - wsUpgrader *websocket.Upgrader - handlerAPIv0 http.Handler - handlerAPIv1 http.Handler - hijackHandler http.Handler -} - -func NewCluster( - ctx context.Context, - prefix string, - baseDir string, - host string, publicPort uint16, - clusterId string, clusterSecret string, - byoc bool, dialer *net.Dialer, - storageOpts []storage.StorageOption, - cache gocache.Cache, -) (cr *Cluster) { - transport := http.DefaultTransport - if dialer != nil { - transport = &http.Transport{ - DialContext: dialer.DialContext, - } - } - - cachedTransport := transport - if cache != gocache.NoCache { - cachedTransport = &httpcache.Transport{ - Transport: transport, - Cache: gocache.WrapToHTTPCache(gocache.NewCacheWithNamespace(cache, "http@")), - } - } - - cr = &Cluster{ - host: host, - publicPort: publicPort, - clusterId: clusterId, - clusterSecret: clusterSecret, - prefix: prefix, - byoc: byoc, - jwtIssuer: jwtIssuerPrefix + "#" + clusterId, - - dataDir: filepath.Join(baseDir, "data"), - maxConn: config.DownloadMaxConn, - storageOpts: storageOpts, - cache: cache, - - disabled: make(chan struct{}, 0), - fileset: make(map[string]int64, 0), - - downloading: make(map[string]*downloadingItem), - - client: &http.Client{ - Transport: transport, - }, - cachedCli: &http.Client{ - Transport: cachedTransport, - }, - - wsUpgrader: &websocket.Upgrader{ - HandshakeTimeout: time.Minute, - }, - } - close(cr.disabled) - - if cr.maxConn <= 0 { - panic("download-max-conn must be a positive integer") - } - cr.bufSlots = limited.NewBufSlots(cr.maxConn) - - { - var ( - n uint = 0 - wgs = make([]uint, len(storageOpts)) - sts = make([]storage.Storage, len(storageOpts)) - ) - for i, s := range storageOpts { - sts[i] = storage.NewStorage(s) - wgs[i] = s.Weight - n += s.Weight - } - cr.storages = sts - cr.storageWeights = wgs - cr.storageTotalWeight = n - } - return -} - -func (cr *Cluster) Init(ctx context.Context) (err error) { - // create data folder - os.MkdirAll(cr.dataDir, 0755) - - if config.Database.Driver == "memory" { - cr.database = database.NewMemoryDB() - } else if cr.database, err = database.NewSqlDB(config.Database.Driver, config.Database.DSN); err != nil { - return - } - - if config.Hijack.Enable { - cr.hijackProxy = NewHjProxy(cr.client, cr.database, cr.handleDownload) - if config.Hijack.EnableLocalCache { - os.MkdirAll(config.Hijack.LocalCachePath, 0755) - } - } - - // Init notification manager - cr.notifyManager = notify.NewManager(cr.dataDir, cr.database, cr.client, config.Dashboard.NotifySubject) - // Add notification plugins - webpushPlg := new(webpush.Plugin) - cr.notifyManager.AddPlugin(webpushPlg) - if config.Notification.EnableEmail { - emailPlg, err := email.NewSMTP( - config.Notification.EmailSMTP, config.Notification.EmailSMTPEncryption, - config.Notification.EmailSender, config.Notification.EmailSenderPassword, - ) - if err != nil { - return err - } - cr.notifyManager.AddPlugin(emailPlg) - } - - if err = cr.notifyManager.Init(ctx); err != nil { - return - } - cr.webpushKeyB64 = base64.RawURLEncoding.EncodeToString(webpushPlg.GetPublicKey()) - - // Init storages - vctx := context.WithValue(ctx, storage.ClusterCacheCtxKey, cr.cache) - for _, s := range cr.storages { - s.Init(vctx) - } - - // read old stats - if err := cr.stats.Load(cr.dataDir); err != nil { - log.Errorf("Could not load stats: %v", err) - } - if cr.apiHmacKey, err = utils.LoadOrCreateHmacKey(cr.dataDir); err != nil { - return fmt.Errorf("Cannot load hmac key: %w", err) - } - - cr.updateChecker = time.NewTicker(time.Hour) - - go func(ticker *time.Ticker) { - defer log.RecoverPanic(nil) - defer ticker.Stop() - - if err := cr.checkUpdate(); err != nil { - log.TrErrorf("error.update.check.failed", err) - } - for range ticker.C { - if err := cr.checkUpdate(); err != nil { - log.TrErrorf("error.update.check.failed", err) - } - } - }(cr.updateChecker) - return -} - -func (cr *Cluster) Destroy(ctx context.Context) { - if cr.database != nil { - cr.database.Cleanup() - } - cr.updateChecker.Stop() - if cr.apiRateLimiter != nil { - cr.apiRateLimiter.Destroy() - } -} - -func (cr *Cluster) allocBuf(ctx context.Context) (slotId int, buf []byte, free func()) { - return cr.bufSlots.Alloc(ctx) -} - -func (cr *Cluster) Connect(ctx context.Context) bool { - cr.mux.Lock() - defer cr.mux.Unlock() - - if cr.socket != nil { - log.Debug("Extra connect") - return true - } - - _, err := cr.GetAuthToken(ctx) - if err != nil { - log.TrErrorf("error.cluster.auth.failed", err) - osExit(CodeClientOrServerError) - } - - engio, err := engine.NewSocket(engine.Options{ - Host: cr.prefix, - Path: "/socket.io/", - ExtraHeaders: http.Header{ - "Origin": {cr.prefix}, - "User-Agent": {build.ClusterUserAgent}, - }, - DialTimeout: time.Minute * 6, - }) - if err != nil { - log.Errorf("Could not parse Engine.IO options: %v; exit.", err) - osExit(CodeClientUnexpectedError) - } - - cr.reconnectCount = 0 - connected := false - - if config.Advanced.SocketIOLog { - engio.OnRecv(func(_ *engine.Socket, data []byte) { - log.Debugf("Engine.IO recv: %q", (string)(data)) - }) - engio.OnSend(func(_ *engine.Socket, data []byte) { - log.Debugf("Engine.IO sending: %q", (string)(data)) - }) - } - engio.OnConnect(func(*engine.Socket) { - log.Info("Engine.IO connected") - }) - engio.OnDisconnect(func(_ *engine.Socket, err error) { - if ctx.Err() != nil { - // Ignore if the error is because context cancelled - return - } - if err != nil { - log.Warnf("Engine.IO disconnected: %v", err) - } - if config.MaxReconnectCount == 0 { - if cr.shouldEnable.Load() { - log.Errorf("Cluster disconnected from remote; exit.") - osExit(CodeServerOrEnvionmentError) - } - } - if !connected { - cr.reconnectCount++ - if config.MaxReconnectCount > 0 && cr.reconnectCount >= config.MaxReconnectCount { - if cr.shouldEnable.Load() { - log.TrErrorf("error.cluster.connect.failed.toomuch") - osExit(CodeServerOrEnvionmentError) - } - } - } - connected = false - go cr.disconnected() - }) - engio.OnDialError(func(_ *engine.Socket, err error) { - cr.reconnectCount++ - log.TrErrorf("error.cluster.connect.failed", cr.reconnectCount, config.MaxReconnectCount, err) - if config.MaxReconnectCount >= 0 && cr.reconnectCount >= config.MaxReconnectCount { - if cr.shouldEnable.Load() { - log.TrErrorf("error.cluster.connect.failed.toomuch") - osExit(CodeServerOrEnvionmentError) - } - } - }) - - cr.socket = socket.NewSocket(engio, socket.WithAuthTokenFn(func() string { - token, err := cr.GetAuthToken(ctx) - if err != nil { - log.TrErrorf("error.cluster.auth.failed", err) - osExit(CodeServerOrEnvionmentError) - } - return token - })) - cr.socket.OnBeforeConnect(func(*socket.Socket) { - log.Infof(Tr("info.cluster.connect.prepare"), cr.reconnectCount, config.MaxReconnectCount) - }) - cr.socket.OnConnect(func(*socket.Socket, string) { - connected = true - log.Debugf("shouldEnable is %v", cr.shouldEnable.Load()) - if cr.shouldEnable.Load() { - if err := cr.Enable(ctx); err != nil { - log.TrErrorf("error.cluster.enable.failed", err) - osExit(CodeClientOrEnvionmentError) - } - } - }) - cr.socket.OnDisconnect(func(*socket.Socket, string) { - go cr.disconnected() - }) - cr.socket.OnError(func(_ *socket.Socket, err error) { - if ctx.Err() != nil { - // Ignore if the error is because context cancelled - return - } - log.Errorf("Socket.IO error: %v", err) - }) - cr.socket.OnMessage(func(event string, data []any) { - if event == "message" { - log.Infof("[remote]: %v", data[0]) - } - }) - log.Infof("Dialing %s", engio.URL().String()) - if err := engio.Dial(ctx); err != nil { - log.Errorf("Dial error: %v", err) - return false - } - log.Info("Connecting to socket.io namespace") - if err := cr.socket.Connect(""); err != nil { - log.Errorf("Open namespace error: %v", err) - return false - } - return true -} - -func (cr *Cluster) disconnected() bool { - cr.mux.Lock() - defer cr.mux.Unlock() - - if cr.enabled.CompareAndSwap(true, false) { - return false - } - if cr.cancelKeepalive != nil { - cr.cancelKeepalive() - cr.cancelKeepalive = nil - } - cr.notifyManager.OnDisabled() - return true -} - -func (cr *Cluster) disable(ctx context.Context) (ok bool) { - cr.mux.Lock() - defer cr.mux.Unlock() - - if !cr.enabled.Load() { - log.Debug("Extra disable") - return false - } - - defer cr.notifyManager.OnDisabled() - - if cr.cancelKeepalive != nil { - cr.cancelKeepalive() - cr.cancelKeepalive = nil - } - if cr.socket == nil { - return false - } - log.Info(Tr("info.cluster.disabling")) - resCh, err := cr.socket.EmitWithAck("disable", nil) - if err == nil { - tctx, cancel := context.WithTimeout(ctx, time.Second*(time.Duration)(config.Advanced.KeepaliveTimeout)) - select { - case <-tctx.Done(): - cancel() - err = tctx.Err() - case data := <-resCh: - cancel() - log.Debug("disable ack:", data) - if ero := data[0]; ero != nil { - log.Errorf("Disable failed: %v", ero) - } else if !data[1].(bool) { - log.Error("Disable failed: acked non true value") - } else { - ok = true - } - } - } - if err != nil { - log.Errorf(Tr("error.cluster.disable.failed"), err) - } - - cr.enabled.Store(false) - go cr.socket.Close() - cr.socket = nil - close(cr.disabled) - log.Warn(Tr("warn.cluster.disabled")) - return -} diff --git a/cluster/cluster.go b/cluster/cluster.go index 67f93bc..835b5ff 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -21,7 +21,9 @@ package cluster import ( "context" + "errors" "fmt" + "net/http" "regexp" "runtime" "sync" @@ -66,6 +68,10 @@ type Cluster struct { mux sync.RWMutex status atomic.Int32 socket *socket.Socket + client *http.Client + + authTokenMux sync.RWMutex + authToken *ClusterToken } func NewCluster( @@ -78,6 +84,8 @@ func NewCluster( storageManager: storageManager, storages: storages, + + client: &http.Client{}, } return } @@ -87,6 +95,11 @@ func (cr *Cluster) ID() string { return cr.opts.Id } +// Secret returns the cluster secret +func (cr *Cluster) Secret() string { + return cr.opts.Secret +} + // Host returns the cluster public host func (cr *Cluster) Host() string { return cr.gcfg.Host @@ -181,7 +194,7 @@ func (cr *Cluster) enable(ctx context.Context) error { if msg, ok := ero["message"].(string); ok { if hashMismatch := reFileHashMismatchError.FindStringSubmatch(msg); hashMismatch != nil { hash := hashMismatch[1] - log.Warnf(Tr("warn.cluster.detected.hash.mismatch"), hash) + log.TrWarnf("warn.cluster.detected.hash.mismatch", hash) cr.storageManager.RemoveForAll(hash) } return fmt.Errorf("Enable failed: %v", msg) @@ -239,6 +252,7 @@ func (cr *Cluster) reEnable(disableSignal <-chan struct{}) { select { case <-ctx.Done(): case <-disableSignal: + timer.Stop() cancel() } }() diff --git a/cluster/config.go b/cluster/config.go index 7613cb1..2eebc61 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -86,7 +86,7 @@ func (cr *Cluster) fetchToken(ctx context.Context) (token *ClusterToken, err err } }() req, err := cr.makeReq(ctx, http.MethodGet, "/openbmclapi-agent/challenge", url.Values{ - "clusterId": {cr.clusterId}, + "clusterId": {cr.ID()}, }) if err != nil { return @@ -110,7 +110,7 @@ func (cr *Cluster) fetchToken(ctx context.Context) (token *ClusterToken, err err } var buf [32]byte - hs := hmac.New(crypto.SHA256.New, ([]byte)(cr.clusterSecret)) + hs := hmac.New(crypto.SHA256.New, ([]byte)(cr.Secret())) hs.Write(([]byte)(res1.Challenge)) signature := hex.EncodeToString(hs.Sum(buf[:0])) @@ -119,7 +119,7 @@ func (cr *Cluster) fetchToken(ctx context.Context) (token *ClusterToken, err err Challenge string `json:"challenge"` Signature string `json:"signature"` }{ - ClusterId: cr.clusterId, + ClusterId: cr.ID(), Challenge: res1.Challenge, Signature: signature, }) @@ -159,7 +159,7 @@ func (cr *Cluster) refreshToken(ctx context.Context, oldToken string) (token *Cl ClusterId string `json:"clusterId"` Token string `json:"token"` }{ - ClusterId: cr.clusterId, + ClusterId: cr.ID(), Token: oldToken, }) if err != nil { diff --git a/cluster/handler.go b/cluster/handler.go index 9f47f31..52c1ba1 100644 --- a/cluster/handler.go +++ b/cluster/handler.go @@ -21,29 +21,30 @@ package cluster import ( "net/http" + + "github.com/LiterMC/go-openbmclapi/log" + "github.com/LiterMC/go-openbmclapi/storage" ) -func (cr *Cluster) HandleFile(req *http.Request, rw http.ResponseWriter, hash string) { +func (cr *Cluster) HandleFile(req *http.Request, rw http.ResponseWriter, hash string, size int64) { + defer log.RecoverPanic(nil) + var err error if cr.storageManager.ForEachFromRandom(cr.storages, func(s storage.Storage) bool { - log.Debugf("[handler]: Checking %s on storage [%d] %s ...", hash, i, sto.String()) + opts := s.Options() + log.Debugf("[handler]: Checking %s on storage %s ...", hash, opts.Id) - sz, er := sto.ServeDownload(rw, req, hash, size) + sz, er := s.ServeDownload(rw, req, hash, size) if er != nil { - log.Debugf("[handler]: File %s failed on storage [%d] %s: %v", hash, i, sto.String(), er) + log.Debugf("[handler]: File %s failed on storage %s: %v", hash, opts.Id, er) err = er return false } if sz >= 0 { - opts := cr.storageOpts[i] - cr.AddHits(1, sz, s.Options().Id) - if !keepaliveRec { - cr.statOnlyHits.Add(1) - cr.statOnlyHbts.Add(sz) - } + cr.AddHits(1, sz, opts.Id) } return true }) { return } - http.Error(http.StatusInternation) + http.Error(rw, err.Error(), http.StatusInternalServerError) } diff --git a/cluster/http.go b/cluster/http.go new file mode 100644 index 0000000..d83245f --- /dev/null +++ b/cluster/http.go @@ -0,0 +1,25 @@ +/** + * OpenBmclAPI (Golang Edition) + * Copyright (C) 2024 Kevin Z + * All rights reserved + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package cluster + +import ( + "net/http" + "net/url" +) diff --git a/go.mod b/go.mod index 18d282b..9d665f9 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/LiterMC/go-openbmclapi -go 1.21.6 +go 1.22.0 require ( github.com/LiterMC/socket.io v0.2.4 diff --git a/log/tr.go b/log/tr.go index 0470801..13e1500 100644 --- a/log/tr.go +++ b/log/tr.go @@ -38,7 +38,3 @@ func TrWarnf(key string, vals ...any) { func TrErrorf(key string, vals ...any) { Errorf(lang.Tr(key), vals...) } - -func TrPanicf(key string, vals ...any) { - Panicf(lang.Tr(key), vals...) -} diff --git a/storage/manager.go b/storage/manager.go index 8747fef..bc348d2 100644 --- a/storage/manager.go +++ b/storage/manager.go @@ -20,6 +20,8 @@ package storage import ( + "errors" + "github.com/LiterMC/go-openbmclapi/log" "github.com/LiterMC/go-openbmclapi/utils" ) @@ -146,3 +148,17 @@ func forEachFromRandomIndexWithPossibility(poss []uint, total uint, cb func(i in } return false } + +func (m *Manager) RemoveForAll(hash string) error { + errCh := make(chan error, 0) + for _, s := range m.Storages { + go func(s Storage) { + errCh <- s.Remove(hash) + }(s) + } + errs := make([]error, len(m.Storages)) + for i := range len(m.Storages) { + errs[i] = <-errCh + } + return errors.Join(errs...) +}