Skip to content

Commit

Permalink
Merge branch 'dev' into cache-dns
Browse files Browse the repository at this point in the history
  • Loading branch information
chris124567 authored Aug 16, 2024
2 parents 6d42449 + 8118f9f commit ca1e530
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 202 deletions.
80 changes: 32 additions & 48 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"encoding/json"
"errors"
"fmt"
"math/big"
"net/http"
"time"

"go.sia.tech/core/consensus"
"go.sia.tech/core/gateway"
rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/coreutils/syncer"
Expand Down Expand Up @@ -52,6 +54,18 @@ func NewClient(addr, password string) *Client {
}

type (
AccountManager interface {
Account(id rhpv3.Account, hostKey types.PublicKey) (api.Account, error)
Accounts() []api.Account
AddAmount(id rhpv3.Account, hk types.PublicKey, amt *big.Int)
LockAccount(ctx context.Context, id rhpv3.Account, hostKey types.PublicKey, exclusive bool, duration time.Duration) (api.Account, uint64)
ResetDrift(id rhpv3.Account) error
SetBalance(id rhpv3.Account, hk types.PublicKey, balance *big.Int)
ScheduleSync(id rhpv3.Account, hk types.PublicKey) error
Shutdown(context.Context) error
UnlockAccount(id rhpv3.Account, lockID uint64) error
}

AlertManager interface {
alerts.Alerter
RegisterWebhookBroadcaster(b webhooks.Broadcaster)
Expand Down Expand Up @@ -146,15 +160,24 @@ type (

// Store is a collection of stores used by the bus.
Store interface {
AccountStore
AutopilotStore
ChainStore
EphemeralAccountStore
HostStore
MetadataStore
MetricsStore
SettingStore
}

// AccountStore persists information about accounts. Since accounts
// are rapidly updated and can be recovered, they are only loaded upon
// startup and persisted upon shutdown.
AccountStore interface {
Accounts(context.Context) ([]api.Account, error)
SaveAccounts(context.Context, []api.Account) error
SetUncleanShutdown(context.Context) error
}

// An AutopilotStore stores autopilots.
AutopilotStore interface {
Autopilot(ctx context.Context, id string) (api.Autopilot, error)
Expand All @@ -168,15 +191,6 @@ type (
ProcessChainUpdate(ctx context.Context, applyFn func(sql.ChainUpdateTx) error) error
}

// EphemeralAccountStore persists information about accounts. Since accounts
// are rapidly updated and can be recovered, they are only loaded upon
// startup and persisted upon shutdown.
EphemeralAccountStore interface {
Accounts(context.Context) ([]api.Account, error)
SaveAccounts(context.Context, []api.Account) error
SetUncleanShutdown(context.Context) error
}

// A HostStore stores information about hosts.
HostStore interface {
Host(ctx context.Context, hostKey types.PublicKey) (api.Host, error)
Expand Down Expand Up @@ -284,6 +298,7 @@ type (
type Bus struct {
startTime time.Time

accountsMgr AccountManager
alerts alerts.Alerter
alertMgr AlertManager
pinMgr PinManager
Expand All @@ -294,21 +309,19 @@ type Bus struct {
w Wallet

as AutopilotStore
eas EphemeralAccountStore
hs HostStore
ms MetadataStore
mtrcs MetricsStore
ss SettingStore

accounts *accounts
contractLocker ContractLocker
sectors UploadingSectorsCache

logger *zap.SugaredLogger
}

// New returns a new Bus
func New(ctx context.Context, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, announcementMaxAge time.Duration, l *zap.Logger) (*Bus, error) {
func New(ctx context.Context, am AlertManager, wm WebhooksManager, cm ChainManager, s Syncer, w Wallet, store Store, announcementMaxAge time.Duration, l *zap.Logger) (_ *Bus, err error) {
l = l.Named("bus")

b := &Bus{
Expand All @@ -320,7 +333,6 @@ func New(ctx context.Context, am AlertManager, wm WebhooksManager, cm ChainManag
ms: store,
mtrcs: store,
ss: store,
eas: store,

alerts: alerts.WithOrigin(am, "bus"),
alertMgr: am,
Expand All @@ -330,13 +342,14 @@ func New(ctx context.Context, am AlertManager, wm WebhooksManager, cm ChainManag
startTime: time.Now(),
}

// init accounts
if err := b.initAccounts(ctx); err != nil {
// init settings
if err := b.initSettings(ctx); err != nil {
return nil, err
}

// init settings
if err := b.initSettings(ctx); err != nil {
// create account manager
b.accountsMgr, err = ibus.NewAccountManager(ctx, store, l)
if err != nil {
return nil, err
}

Expand Down Expand Up @@ -501,30 +514,13 @@ func (b *Bus) Handler() http.Handler {
// Shutdown shuts down the bus.
func (b *Bus) Shutdown(ctx context.Context) error {
return errors.Join(
b.saveAccounts(ctx),
b.accountsMgr.Shutdown(ctx),
b.webhooksMgr.Shutdown(ctx),
b.pinMgr.Shutdown(ctx),
b.cs.Shutdown(ctx),
)
}

// initAccounts loads the accounts into memory
func (b *Bus) initAccounts(ctx context.Context) error {
accounts, err := b.eas.Accounts(ctx)
if err != nil {
return err
}
b.accounts = newAccounts(accounts, b.logger)

// mark the shutdown as unclean, this will be overwritten when/if the
// accounts are saved on shutdown
if err := b.eas.SetUncleanShutdown(ctx); err != nil {
return fmt.Errorf("failed to mark account shutdown as unclean: %w", err)
}

return nil
}

// initSettings loads the default settings if the setting is not already set and
// ensures the settings are valid
func (b *Bus) initSettings(ctx context.Context) error {
Expand Down Expand Up @@ -602,15 +598,3 @@ func (b *Bus) initSettings(ctx context.Context) error {

return nil
}

// saveAccounts saves the accounts to the db when the bus is stopped
func (b *Bus) saveAccounts(ctx context.Context) error {
accounts := b.accounts.ToPersist()
if err := b.eas.SaveAccounts(ctx, accounts); err != nil {
b.logger.Errorf("failed to save %v accounts: %v", len(accounts), err)
return err
}

b.logger.Infof("successfully saved %v accounts", len(accounts))
return nil
}
20 changes: 10 additions & 10 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,7 +1719,7 @@ func (b *Bus) handlePOSTAlertsRegister(jc jape.Context) {
}

func (b *Bus) accountsHandlerGET(jc jape.Context) {
jc.Encode(b.accounts.Accounts())
jc.Encode(b.accountsMgr.Accounts())
}

func (b *Bus) accountHandlerGET(jc jape.Context) {
Expand All @@ -1731,7 +1731,7 @@ func (b *Bus) accountHandlerGET(jc jape.Context) {
if jc.Decode(&req) != nil {
return
}
acc, err := b.accounts.Account(id, req.HostKey)
acc, err := b.accountsMgr.Account(id, req.HostKey)
if jc.Check("failed to fetch account", err) != nil {
return
}
Expand All @@ -1755,16 +1755,16 @@ func (b *Bus) accountsAddHandlerPOST(jc jape.Context) {
jc.Error(errors.New("host needs to be set"), http.StatusBadRequest)
return
}
b.accounts.AddAmount(id, req.HostKey, req.Amount)
b.accountsMgr.AddAmount(id, req.HostKey, req.Amount)
}

func (b *Bus) accountsResetDriftHandlerPOST(jc jape.Context) {
var id rhpv3.Account
if jc.DecodeParam("id", &id) != nil {
return
}
err := b.accounts.ResetDrift(id)
if errors.Is(err, errAccountsNotFound) {
err := b.accountsMgr.ResetDrift(id)
if errors.Is(err, ibus.ErrAccountNotFound) {
jc.Error(err, http.StatusNotFound)
return
}
Expand All @@ -1790,7 +1790,7 @@ func (b *Bus) accountsUpdateHandlerPOST(jc jape.Context) {
jc.Error(errors.New("host needs to be set"), http.StatusBadRequest)
return
}
b.accounts.SetBalance(id, req.HostKey, req.Amount)
b.accountsMgr.SetBalance(id, req.HostKey, req.Amount)
}

func (b *Bus) accountsRequiresSyncHandlerPOST(jc jape.Context) {
Expand All @@ -1810,8 +1810,8 @@ func (b *Bus) accountsRequiresSyncHandlerPOST(jc jape.Context) {
jc.Error(errors.New("host needs to be set"), http.StatusBadRequest)
return
}
err := b.accounts.ScheduleSync(id, req.HostKey)
if errors.Is(err, errAccountsNotFound) {
err := b.accountsMgr.ScheduleSync(id, req.HostKey)
if errors.Is(err, ibus.ErrAccountNotFound) {
jc.Error(err, http.StatusNotFound)
return
}
Expand All @@ -1830,7 +1830,7 @@ func (b *Bus) accountsLockHandlerPOST(jc jape.Context) {
return
}

acc, lockID := b.accounts.LockAccount(jc.Request.Context(), id, req.HostKey, req.Exclusive, time.Duration(req.Duration))
acc, lockID := b.accountsMgr.LockAccount(jc.Request.Context(), id, req.HostKey, req.Exclusive, time.Duration(req.Duration))
jc.Encode(api.AccountsLockHandlerResponse{
Account: acc,
LockID: lockID,
Expand All @@ -1847,7 +1847,7 @@ func (b *Bus) accountsUnlockHandlerPOST(jc jape.Context) {
return
}

err := b.accounts.UnlockAccount(id, req.LockID)
err := b.accountsMgr.UnlockAccount(id, req.LockID)
if jc.Check("failed to unlock account", err) != nil {
return
}
Expand Down
Loading

0 comments on commit ca1e530

Please sign in to comment.