diff --git a/api/events.go b/api/events.go index f533bc5bf..dbfa68a3f 100644 --- a/api/events.go +++ b/api/events.go @@ -62,53 +62,61 @@ type ( } ) -func (e EventConsensusUpdate) Event() webhooks.Event { - return webhooks.Event{ - Module: ModuleConsensus, - Event: EventUpdate, - Payload: e, +var ( + WebhookConsensusUpdate = func(url string, headers map[string]string) webhooks.Webhook { + return webhooks.Webhook{ + Event: EventUpdate, + Headers: headers, + Module: ModuleConsensus, + URL: url, + } } -} -func (e EventContractArchive) Event() webhooks.Event { - return webhooks.Event{ - Module: ModuleContract, - Event: EventArchive, - Payload: e, + WebhookContractArchive = func(url string, headers map[string]string) webhooks.Webhook { + return webhooks.Webhook{ + Event: EventArchive, + Headers: headers, + Module: ModuleContract, + URL: url, + } } -} -func (e EventContractRenew) Event() webhooks.Event { - return webhooks.Event{ - Module: ModuleContract, - Event: EventRenew, - Payload: e, + WebhookContractRenew = func(url string, headers map[string]string) webhooks.Webhook { + return webhooks.Webhook{ + Event: EventRenew, + Headers: headers, + Module: ModuleContract, + URL: url, + } } -} -func (e EventContractSetUpdate) Event() webhooks.Event { - return webhooks.Event{ - Module: ModuleContractSet, - Event: EventUpdate, - Payload: e, + WebhookContractSetUpdate = func(url string, headers map[string]string) webhooks.Webhook { + return webhooks.Webhook{ + Event: EventUpdate, + Headers: headers, + Module: ModuleContractSet, + URL: url, + } } -} -func (e EventSettingUpdate) Event() webhooks.Event { - return webhooks.Event{ - Module: ModuleSetting, - Event: EventUpdate, - Payload: e, + WebhookSettingUpdate = func(url string, headers map[string]string) webhooks.Webhook { + return webhooks.Webhook{ + Event: EventUpdate, + Headers: headers, + Module: ModuleSetting, + URL: url, + } } -} -func (e EventSettingDelete) Event() webhooks.Event { - return webhooks.Event{ - Module: ModuleSetting, - Event: EventDelete, - Payload: e, + WebhookSettingDelete = func(url string, headers map[string]string) webhooks.Webhook { + return webhooks.Webhook{ + Event: EventDelete, + Headers: headers, + Module: ModuleSetting, + URL: url, + } } -} +) func ParseEventWebhook(event webhooks.Event) (interface{}, error) { bytes, err := json.Marshal(event.Payload) diff --git a/api/setting.go b/api/setting.go index 923863e58..ff93550ef 100644 --- a/api/setting.go +++ b/api/setting.go @@ -12,6 +12,7 @@ import ( const ( SettingContractSet = "contractset" SettingGouging = "gouging" + SettingPricePinning = "pricepinning" SettingRedundancy = "redundancy" SettingS3Authentication = "s3authentication" SettingUploadPacking = "uploadpacking" @@ -80,6 +81,55 @@ type ( MigrationSurchargeMultiplier uint64 `json:"migrationSurchargeMultiplier"` } + // PricePinSettings holds the configuration for pinning certain settings to + // a specific currency (e.g., USD). It uses a Forex API to fetch the current + // exchange rate, allowing users to set prices in USD instead of SC. + PricePinSettings struct { + // Enabled can be used to either enable or temporarily disable price + // pinning. If enabled, both the currency and the Forex endpoint URL + // must be valid. + Enabled bool `json:"enabled"` + + // Currency is the external three-letter currency code. + Currency string `json:"currency"` + + // ForexEndpointURL is the endpoint that returns the exchange rate for + // Siacoin against the underlying currency. + ForexEndpointURL string `json:"forexEndpointURL"` + + // Threshold is a percentage between 0 and 1 that determines when the + // pinned settings are updated based on the exchange rate at the time. + Threshold float64 `json:"threshold"` + + // Autopilots contains the pinned settings for every autopilot. + Autopilots map[string]AutopilotPins `json:"autopilots,omitempty"` + + // GougingSettingsPins contains the pinned settings for the gouging + // settings. + GougingSettingsPins GougingSettingsPins `json:"gougingSettingsPins,omitempty"` + } + + // AutopilotPins contains the available autopilot settings that can be + // pinned. + AutopilotPins struct { + Allowance Pin `json:"allowance"` + } + + // GougingSettingsPins contains the available gouging settings that can be + // pinned. + GougingSettingsPins struct { + MaxDownload Pin `json:"maxDownload"` + MaxRPCPrice Pin `json:"maxRPCPrice"` + MaxStorage Pin `json:"maxStorage"` + MaxUpload Pin `json:"maxUpload"` + } + + // A Pin is a pinned price in an external currency. + Pin struct { + Pinned bool `json:"pinned"` + Value float64 `json:"value"` + } + // RedundancySettings contain settings that dictate an object's redundancy. RedundancySettings struct { MinShards int `json:"minShards"` @@ -98,6 +148,28 @@ type ( } ) +// IsPinned returns true if the pin is enabled and the value is greater than 0. +func (p Pin) IsPinned() bool { + return p.Pinned && p.Value > 0 +} + +// Validate returns an error if the price pin settings are not considered valid. +func (pps PricePinSettings) Validate() error { + if !pps.Enabled { + return nil + } + if pps.ForexEndpointURL == "" { + return fmt.Errorf("price pin settings must have a forex endpoint URL") + } + if pps.Currency == "" { + return fmt.Errorf("price pin settings must have a currency") + } + if pps.Threshold <= 0 || pps.Threshold >= 1 { + return fmt.Errorf("price pin settings must have a threshold between 0 and 1") + } + return nil +} + // Validate returns an error if the gouging settings are not considered valid. func (gs GougingSettings) Validate() error { if gs.HostBlockHeightLeeway < 3 { diff --git a/build/env_default.go b/build/env_default.go index 83003de60..3730fd5b2 100644 --- a/build/env_default.go +++ b/build/env_default.go @@ -34,6 +34,13 @@ var ( MigrationSurchargeMultiplier: 10, // 10x } + // DefaultPricePinSettings define the default price pin settings the bus is + // configured with on startup. These values can be adjusted using the + // settings API. + DefaultPricePinSettings = api.PricePinSettings{ + Enabled: false, + } + // DefaultUploadPackingSettings define the default upload packing settings // the bus is configured with on startup. DefaultUploadPackingSettings = api.UploadPackingSettings{ diff --git a/build/env_testnet.go b/build/env_testnet.go index 0bdef28f2..5ccf6f24f 100644 --- a/build/env_testnet.go +++ b/build/env_testnet.go @@ -36,6 +36,13 @@ var ( MigrationSurchargeMultiplier: 10, // 10x } + // DefaultPricePinSettings define the default price pin settings the bus is + // configured with on startup. These values can be adjusted using the + // settings API. + DefaultPricePinSettings = api.PricePinSettings{ + Enabled: false, + } + // DefaultUploadPackingSettings define the default upload packing settings // the bus is configured with on startup. DefaultUploadPackingSettings = api.UploadPackingSettings{ diff --git a/bus/bus.go b/bus/bus.go index 5a9a983b1..1b70a7d87 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -31,6 +31,11 @@ import ( "go.uber.org/zap" ) +const ( + defaultPinUpdateInterval = 5 * time.Minute + defaultPinRateWindow = 6 * time.Hour +) + // Client re-exports the client from the client package. type Client struct { *client.Client @@ -227,11 +232,11 @@ type bus struct { contractLocks *contractLocks uploadingSectors *uploadingSectorsCache - alerts alerts.Alerter - alertMgr *alerts.Manager - events ibus.EventBroadcaster - hooks *webhooks.Manager - logger *zap.SugaredLogger + alerts alerts.Alerter + alertMgr *alerts.Manager + pinMgr ibus.PinManager + webhooksMgr *webhooks.Manager + logger *zap.SugaredLogger } // Handler returns an HTTP handler that serves the bus API. @@ -376,9 +381,14 @@ func (b *bus) Handler() http.Handler { }) } +// Setup starts the pin manager. +func (b *bus) Setup(ctx context.Context) error { + return b.pinMgr.Run(ctx) +} + // Shutdown shuts down the bus. func (b *bus) Shutdown(ctx context.Context) error { - b.hooks.Close() + b.webhooksMgr.Close() accounts := b.accounts.ToPersist() err := b.eas.SaveAccounts(ctx, accounts) if err != nil { @@ -386,7 +396,11 @@ func (b *bus) Shutdown(ctx context.Context) error { } else { b.logger.Infof("successfully saved %v accounts", len(accounts)) } - return err + + return errors.Join( + err, + b.pinMgr.Close(ctx), + ) } func (b *bus) fetchSetting(ctx context.Context, key string, value interface{}) error { @@ -956,10 +970,14 @@ func (b *bus) contractsArchiveHandlerPOST(jc jape.Context) { if jc.Check("failed to archive contracts", b.ms.ArchiveContracts(jc.Request.Context(), toArchive)) == nil { for fcid, reason := range toArchive { - b.events.BroadcastEvent(api.EventContractArchive{ - ContractID: fcid, - Reason: reason, - Timestamp: time.Now().UTC(), + b.broadcastAction(webhooks.Event{ + Module: api.ModuleContract, + Event: api.EventArchive, + Payload: api.EventContractArchive{ + ContractID: fcid, + Reason: reason, + Timestamp: time.Now().UTC(), + }, }) } } @@ -982,10 +1000,14 @@ func (b *bus) contractsSetHandlerPUT(jc jape.Context) { } else if jc.Check("could not add contracts to set", b.ms.SetContractSet(jc.Request.Context(), set, contractIds)) != nil { return } else { - b.events.BroadcastEvent(api.EventContractSetUpdate{ - Name: set, - ContractIDs: contractIds, - Timestamp: time.Now().UTC(), + b.broadcastAction(webhooks.Event{ + Module: api.ModuleContractSet, + Event: api.EventUpdate, + Payload: api.EventContractSetUpdate{ + Name: set, + ContractIDs: contractIds, + Timestamp: time.Now().UTC(), + }, }) } } @@ -1172,9 +1194,13 @@ func (b *bus) contractIDRenewedHandlerPOST(jc jape.Context) { } b.uploadingSectors.HandleRenewal(req.Contract.ID(), req.RenewedFrom) - b.events.BroadcastEvent(api.EventContractRenew{ - Renewal: r, - Timestamp: time.Now().UTC(), + b.broadcastAction(webhooks.Event{ + Module: api.ModuleContract, + Event: api.EventRenew, + Payload: api.EventContractRenew{ + Renewal: r, + Timestamp: time.Now().UTC(), + }, }) jc.Encode(r) @@ -1653,6 +1679,7 @@ func (b *bus) settingKeyHandlerPUT(jc jape.Context) { jc.Error(fmt.Errorf("couldn't update gouging settings, error: %v", err), http.StatusBadRequest) return } + b.pinMgr.TriggerUpdate() case api.SettingRedundancy: var rs api.RedundancySettings if err := json.Unmarshal(data, &rs); err != nil { @@ -1671,13 +1698,32 @@ func (b *bus) settingKeyHandlerPUT(jc jape.Context) { jc.Error(fmt.Errorf("couldn't update s3 authentication settings, error: %v", err), http.StatusBadRequest) return } + case api.SettingPricePinning: + var pps api.PricePinSettings + if err := json.Unmarshal(data, &pps); err != nil { + jc.Error(fmt.Errorf("couldn't update price pinning settings, invalid request body"), http.StatusBadRequest) + return + } else if err := pps.Validate(); err != nil { + jc.Error(fmt.Errorf("couldn't update price pinning settings, invalid settings, error: %v", err), http.StatusBadRequest) + return + } else if pps.Enabled { + if _, err := ibus.NewForexClient(pps.ForexEndpointURL).SiacoinExchangeRate(jc.Request.Context(), pps.Currency); err != nil { + jc.Error(fmt.Errorf("couldn't update price pinning settings, forex API unreachable,error: %v", err), http.StatusBadRequest) + return + } + } + b.pinMgr.TriggerUpdate() } if jc.Check("could not update setting", b.ss.UpdateSetting(jc.Request.Context(), key, string(data))) == nil { - b.events.BroadcastEvent(api.EventSettingUpdate{ - Key: key, - Update: value, - Timestamp: time.Now().UTC(), + b.broadcastAction(webhooks.Event{ + Module: api.ModuleSetting, + Event: api.EventUpdate, + Payload: api.EventSettingUpdate{ + Key: key, + Update: value, + Timestamp: time.Now().UTC(), + }, }) } } @@ -1690,9 +1736,13 @@ func (b *bus) settingKeyHandlerDELETE(jc jape.Context) { } if jc.Check("could not delete setting", b.ss.DeleteSetting(jc.Request.Context(), key)) == nil { - b.events.BroadcastEvent(api.EventSettingDelete{ - Key: key, - Timestamp: time.Now().UTC(), + b.broadcastAction(webhooks.Event{ + Module: api.ModuleSetting, + Event: api.EventDelete, + Payload: api.EventSettingDelete{ + Key: key, + Timestamp: time.Now().UTC(), + }, }) } } @@ -2012,7 +2062,9 @@ func (b *bus) autopilotsHandlerPUT(jc jape.Context) { return } - jc.Check("failed to update autopilot", b.as.UpdateAutopilot(jc.Request.Context(), ap)) + if jc.Check("failed to update autopilot", b.as.UpdateAutopilot(jc.Request.Context(), ap)) == nil { + b.pinMgr.TriggerUpdate() + } } func (b *bus) autopilotHostCheckHandlerPUT(jc jape.Context) { @@ -2038,6 +2090,16 @@ func (b *bus) autopilotHostCheckHandlerPUT(jc jape.Context) { } } +func (b *bus) broadcastAction(e webhooks.Event) { + log := b.logger.With("event", e.Event).With("module", e.Module) + err := b.webhooksMgr.BroadcastAction(context.Background(), e) + if err != nil { + log.With(zap.Error(err)).Error("failed to broadcast action") + } else { + log.Debug("successfully broadcast action") + } +} + func (b *bus) contractTaxHandlerGET(jc jape.Context) { var payout types.Currency if jc.DecodeParam("payout", (*api.ParamCurrency)(&payout)) != nil { @@ -2091,7 +2153,7 @@ func (b *bus) webhookActionHandlerPost(jc jape.Context) { if jc.Check("failed to decode action", jc.Decode(&action)) != nil { return } - b.hooks.BroadcastAction(jc.Request.Context(), action) + b.broadcastAction(action) } func (b *bus) webhookHandlerDelete(jc jape.Context) { @@ -2099,7 +2161,7 @@ func (b *bus) webhookHandlerDelete(jc jape.Context) { if jc.Decode(&wh) != nil { return } - err := b.hooks.Delete(jc.Request.Context(), wh) + err := b.webhooksMgr.Delete(jc.Request.Context(), wh) if errors.Is(err, webhooks.ErrWebhookNotFound) { jc.Error(fmt.Errorf("webhook for URL %v and event %v.%v not found", wh.URL, wh.Module, wh.Event), http.StatusNotFound) return @@ -2109,7 +2171,7 @@ func (b *bus) webhookHandlerDelete(jc jape.Context) { } func (b *bus) webhookHandlerGet(jc jape.Context) { - webhooks, queueInfos := b.hooks.Info() + webhooks, queueInfos := b.webhooksMgr.Info() jc.Encode(api.WebhookResponse{ Queues: queueInfos, Webhooks: webhooks, @@ -2122,7 +2184,7 @@ func (b *bus) webhookHandlerPost(jc jape.Context) { return } - err := b.hooks.Register(jc.Request.Context(), webhooks.Webhook{ + err := b.webhooksMgr.Register(jc.Request.Context(), webhooks.Webhook{ Event: req.Event, Module: req.Module, URL: req.URL, @@ -2392,21 +2454,21 @@ func (b *bus) multipartHandlerListPartsPOST(jc jape.Context) { func (b *bus) ProcessConsensusChange(cc modules.ConsensusChange) { if cc.Synced { - b.events.BroadcastEvent(api.EventConsensusUpdate{ - ConsensusState: b.consensusState(), - TransactionFee: b.tp.RecommendedFee(), - Timestamp: time.Now().UTC(), + b.broadcastAction(webhooks.Event{ + Module: api.ModuleConsensus, + Event: api.EventUpdate, + Payload: api.EventConsensusUpdate{ + ConsensusState: b.consensusState(), + TransactionFee: b.tp.RecommendedFee(), + Timestamp: time.Now().UTC(), + }, }) } } // New returns a new Bus. -func New(s Syncer, am *alerts.Manager, hm *webhooks.Manager, cm ChainManager, tp TransactionPool, w Wallet, hdb HostDB, as AutopilotStore, ms MetadataStore, ss SettingStore, eas EphemeralAccountStore, mtrcs MetricsStore, l *zap.Logger) (*bus, error) { +func New(s Syncer, am *alerts.Manager, whm *webhooks.Manager, cm ChainManager, tp TransactionPool, w Wallet, hdb HostDB, as AutopilotStore, ms MetadataStore, ss SettingStore, eas EphemeralAccountStore, mtrcs MetricsStore, l *zap.Logger) (*bus, error) { b := &bus{ - alerts: alerts.WithOrigin(am, "bus"), - alertMgr: am, - events: ibus.NewEventBroadcaster(hm, l.Named("events").Sugar()), - hooks: hm, s: s, cm: cm, tp: tp, @@ -2419,11 +2481,17 @@ func New(s Syncer, am *alerts.Manager, hm *webhooks.Manager, cm ChainManager, tp eas: eas, contractLocks: newContractLocks(), uploadingSectors: newUploadingSectorsCache(), - logger: l.Sugar().Named("bus"), + + alerts: alerts.WithOrigin(am, "bus"), + alertMgr: am, + webhooksMgr: whm, + logger: l.Sugar().Named("bus"), startTime: time.Now(), } + b.pinMgr = ibus.NewPinManager(whm, as, ss, defaultPinUpdateInterval, defaultPinRateWindow, b.logger.Desugar()) + // ensure we don't hang indefinitely ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() @@ -2431,6 +2499,7 @@ func New(s Syncer, am *alerts.Manager, hm *webhooks.Manager, cm ChainManager, tp // load default settings if the setting is not already set for key, value := range map[string]interface{}{ api.SettingGouging: build.DefaultGougingSettings, + api.SettingPricePinning: build.DefaultPricePinSettings, api.SettingRedundancy: build.DefaultRedundancySettings, api.SettingUploadPacking: build.DefaultUploadPackingSettings, } { diff --git a/bus/client/client_test.go b/bus/client/client_test.go index 3803584f0..92a7fdcd2 100644 --- a/bus/client/client_test.go +++ b/bus/client/client_test.go @@ -70,7 +70,7 @@ func newTestClient(dir string) (*client.Client, func() error, func(context.Conte // create client client := client.New("http://"+l.Addr().String(), "test") - b, cleanup, err := node.NewBus(node.BusConfig{ + b, _, cleanup, err := node.NewBus(node.BusConfig{ Bus: config.Bus{ AnnouncementMaxAgeHours: 24 * 7 * 52, // 1 year Bootstrap: false, diff --git a/bus/client/webhooks.go b/bus/client/webhooks.go index 5993e0c5b..769d1cf57 100644 --- a/bus/client/webhooks.go +++ b/bus/client/webhooks.go @@ -23,13 +23,7 @@ func (c *Client) DeleteWebhook(ctx context.Context, url, module, event string) e } // RegisterWebhook registers a new webhook for the given URL. -func (c *Client) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook, opts ...webhooks.HeaderOption) error { - if webhook.Headers == nil { - webhook.Headers = make(map[string]string) - } - for _, opt := range opts { - opt(webhook.Headers) - } +func (c *Client) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook) error { err := c.c.WithContext(ctx).POST("/webhooks", webhook, nil) return err } diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index d9d84a5fd..f803859ce 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -496,14 +496,16 @@ func main() { } busAddr, busPassword := cfg.Bus.RemoteAddr, cfg.Bus.RemotePassword + setupBusFn := node.NoopFn if cfg.Bus.RemoteAddr == "" { - b, fn, err := node.NewBus(busCfg, cfg.Directory, seed, logger) + b, setupFn, shutdownFn, err := node.NewBus(busCfg, cfg.Directory, seed, logger) if err != nil { logger.Fatal("failed to create bus, err: " + err.Error()) } + setupBusFn = setupFn shutdownFns = append(shutdownFns, shutdownFnEntry{ name: "Bus", - fn: fn, + fn: shutdownFn, }) mux.Sub["/api/bus"] = utils.TreeMux{Handler: auth(b)} @@ -591,6 +593,16 @@ func main() { // Start server. go srv.Serve(l) + // Finish bus setup. + if err := setupBusFn(context.Background()); err != nil { + logger.Fatal("failed to setup bus: " + err.Error()) + } + + // Finish worker setup. + if err := setupWorkerFn(context.Background()); err != nil { + logger.Fatal("failed to setup worker: " + err.Error()) + } + // Set initial S3 keys. if cfg.S3.Enabled && !cfg.S3.DisableAuth { as, err := bc.S3AuthenticationSettings(context.Background()) @@ -620,11 +632,6 @@ func main() { } } - // Finish worker setup. - if err := setupWorkerFn(context.Background()); err != nil { - logger.Fatal("failed to setup worker: " + err.Error()) - } - logger.Info("api: Listening on " + l.Addr().String()) if s3Srv != nil { diff --git a/internal/bus/events.go b/internal/bus/events.go deleted file mode 100644 index 507f33402..000000000 --- a/internal/bus/events.go +++ /dev/null @@ -1,31 +0,0 @@ -package bus - -import ( - "context" - "time" - - "go.sia.tech/renterd/webhooks" - "go.uber.org/zap" -) - -type ( - EventBroadcaster struct { - broadcaster webhooks.Broadcaster - logger *zap.SugaredLogger - } -) - -func NewEventBroadcaster(b webhooks.Broadcaster, l *zap.SugaredLogger) EventBroadcaster { - return EventBroadcaster{ - broadcaster: b, - logger: l, - } -} - -func (b EventBroadcaster) BroadcastEvent(e webhooks.EventWebhook) { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - if err := b.broadcaster.BroadcastAction(ctx, e.Event()); err != nil { - b.logger.Errorw("failed to broadcast event", "event", e, "error", err) - } - cancel() -} diff --git a/internal/bus/forex.go b/internal/bus/forex.go new file mode 100644 index 000000000..b6544b911 --- /dev/null +++ b/internal/bus/forex.go @@ -0,0 +1,51 @@ +package bus + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" +) + +type ( + client struct { + url string + } +) + +func NewForexClient(url string) *client { + return &client{url: url} +} + +func (f *client) SiacoinExchangeRate(ctx context.Context, currency string) (rate float64, err error) { + // create request + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/%s", f.url, currency), http.NoBody) + if err != nil { + return 0, fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Accept", "application/json") + + // create http client + resp, err := http.DefaultClient.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + + // check status code + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + var errorMessage string + if err := json.NewDecoder(io.LimitReader(resp.Body, 1024)).Decode(&errorMessage); err != nil { + return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + return 0, errors.New(errorMessage) + } + + // decode exchange rate + if err := json.NewDecoder(resp.Body).Decode(&rate); err != nil { + return 0, fmt.Errorf("failed to decode response: %w", err) + } + return +} diff --git a/internal/bus/pinmanager.go b/internal/bus/pinmanager.go new file mode 100644 index 000000000..02e4df79b --- /dev/null +++ b/internal/bus/pinmanager.go @@ -0,0 +1,438 @@ +package bus + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync" + "time" + + "github.com/montanaflynn/stats" + "github.com/shopspring/decimal" + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/webhooks" + "go.uber.org/zap" +) + +type ( + // An AutopilotStore stores autopilots. + AutopilotStore interface { + Autopilot(ctx context.Context, id string) (api.Autopilot, error) + UpdateAutopilot(ctx context.Context, ap api.Autopilot) error + } + + // PinManager is a service that manages price pinning. + PinManager interface { + Close(context.Context) error + Run(context.Context) error + TriggerUpdate() + } + + // A SettingStore stores settings. + SettingStore interface { + Setting(ctx context.Context, key string) (string, error) + UpdateSetting(ctx context.Context, key, value string) error + } +) + +type ( + pinManager struct { + as AutopilotStore + ss SettingStore + broadcaster webhooks.Broadcaster + + updateInterval time.Duration + rateWindow time.Duration + + triggerChan chan struct{} + closedChan chan struct{} + wg sync.WaitGroup + + logger *zap.SugaredLogger + + mu sync.Mutex + rates []float64 + ratesCurrency string + } +) + +func NewPinManager(broadcaster webhooks.Broadcaster, as AutopilotStore, ss SettingStore, updateInterval, rateWindow time.Duration, l *zap.Logger) *pinManager { + return &pinManager{ + as: as, + ss: ss, + broadcaster: broadcaster, + + logger: l.Sugar().Named("pricemanager"), + + updateInterval: updateInterval, + rateWindow: rateWindow, + + triggerChan: make(chan struct{}, 1), + closedChan: make(chan struct{}), + } +} + +func (pm *pinManager) Close(ctx context.Context) error { + close(pm.closedChan) + + doneChan := make(chan struct{}) + go func() { + pm.wg.Wait() + close(doneChan) + }() + + select { + case <-doneChan: + return nil + case <-ctx.Done(): + return context.Cause(ctx) + } +} + +func (pm *pinManager) Run(ctx context.Context) error { + // try to update prices + if err := pm.updatePrices(ctx, true); err != nil { + return err + } + + // start the update loop + pm.wg.Add(1) + go func() { + defer pm.wg.Done() + + t := time.NewTicker(pm.updateInterval) + defer t.Stop() + + var forced bool + for { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + err := pm.updatePrices(ctx, forced) + if err != nil { + pm.logger.Warn("failed to update prices", zap.Error(err)) + } + cancel() + + forced = false + select { + case <-pm.closedChan: + return + case <-pm.triggerChan: + forced = true + case <-t.C: + } + } + }() + + return nil +} + +func (pm *pinManager) TriggerUpdate() { + select { + case pm.triggerChan <- struct{}{}: + default: + } +} + +func (pm *pinManager) averageRate() decimal.Decimal { + pm.mu.Lock() + defer pm.mu.Unlock() + + median, _ := stats.Median(pm.rates) + return decimal.NewFromFloat(median) +} + +func (pm *pinManager) pinnedSettings(ctx context.Context) (api.PricePinSettings, error) { + var ps api.PricePinSettings + if pss, err := pm.ss.Setting(ctx, api.SettingPricePinning); err != nil { + return api.PricePinSettings{}, err + } else if err := json.Unmarshal([]byte(pss), &ps); err != nil { + pm.logger.Panicf("failed to unmarshal pinned settings '%s': %v", pss, err) + } + return ps, nil +} + +func (pm *pinManager) rateExceedsThreshold(threshold float64) bool { + pm.mu.Lock() + defer pm.mu.Unlock() + + // calculate mean + mean, err := stats.Mean(pm.rates) + if err != nil { + pm.logger.Warnw("failed to calculate average rate", zap.Error(err)) + return false + } + + // convert to decimals + avg := decimal.NewFromFloat(mean) + pct := decimal.NewFromFloat(threshold) + cur := decimal.NewFromFloat(pm.rates[len(pm.rates)-1]) + + // calculate whether the current rate exceeds the given threshold + delta := cur.Sub(avg).Abs() + exceeded := delta.GreaterThan(cur.Mul(pct)) + + // log the result + pm.logger.Debugw("rate exceeds threshold", + "last", cur, + "average", avg, + "percentage", threshold, + "delta", delta, + "threshold", cur.Mul(pct), + "exceeded", exceeded, + ) + return exceeded +} + +func (pm *pinManager) updateAutopilotSettings(ctx context.Context, autopilotID string, pins api.AutopilotPins, rate decimal.Decimal) error { + var updated bool + + ap, err := pm.as.Autopilot(ctx, autopilotID) + if err != nil { + return err + } + + // update allowance + if pins.Allowance.IsPinned() { + update, err := convertCurrencyToSC(decimal.NewFromFloat(pins.Allowance.Value), rate) + if err != nil { + pm.logger.Warnw("failed to convert allowance to currency", zap.Error(err)) + } else { + bkp := ap.Config.Contracts.Allowance + ap.Config.Contracts.Allowance = update + if err := ap.Config.Validate(); err != nil { + pm.logger.Warnw("failed to update autopilot setting, new allowance makes the setting invalid", zap.Error(err)) + ap.Config.Contracts.Allowance = bkp + } else { + pm.logger.Infow("updating autopilot allowance", "old", bkp, "new", ap.Config.Contracts.Allowance, "rate", rate, "autopilot", autopilotID) + updated = true + } + } + } + + // return early if no updates took place + if !updated { + pm.logger.Infow("autopilots did not require price update", "rate", rate) + return nil + } + + // validate config + err = ap.Config.Validate() + if err != nil { + pm.logger.Warnw("failed to update autopilot setting, new settings make the setting invalid", zap.Error(err)) + return err + } + + // update autopilto + return pm.as.UpdateAutopilot(ctx, ap) +} + +func (pm *pinManager) updateExchangeRates(currency string, rate float64) error { + pm.mu.Lock() + defer pm.mu.Unlock() + + // update last currency + if pm.ratesCurrency != currency { + pm.ratesCurrency = currency + pm.rates = nil + } + + // update last rate + pm.rates = append(pm.rates, rate) + if len(pm.rates) >= int(pm.rateWindow/pm.updateInterval) { + pm.rates = pm.rates[1:] + } + + return nil +} + +func (pm *pinManager) updateGougingSettings(ctx context.Context, pins api.GougingSettingsPins, rate decimal.Decimal) error { + var updated bool + + // fetch gouging settings + var gs api.GougingSettings + if gss, err := pm.ss.Setting(ctx, api.SettingGouging); err != nil { + return err + } else if err := json.Unmarshal([]byte(gss), &gs); err != nil { + pm.logger.Panicf("failed to unmarshal gouging settings '%s': %v", gss, err) + return err + } + + // update max download price + if pins.MaxDownload.IsPinned() { + update, err := convertCurrencyToSC(decimal.NewFromFloat(pins.MaxDownload.Value), rate) + if err != nil { + pm.logger.Warn("failed to convert max download price to currency") + } else if !gs.MaxDownloadPrice.Equals(update) { + bkp := gs.MaxDownloadPrice + gs.MaxDownloadPrice = update + if err := gs.Validate(); err != nil { + pm.logger.Warn("failed to update gouging setting, new download price makes the setting invalid", zap.Error(err)) + gs.MaxDownloadPrice = bkp + } else { + pm.logger.Infow("updating max download price", "old", bkp, "new", gs.MaxDownloadPrice, "rate", rate) + updated = true + } + } + } + + // update max RPC price + if pins.MaxRPCPrice.IsPinned() { + update, err := convertCurrencyToSC(decimal.NewFromFloat(pins.MaxRPCPrice.Value), rate) + if err != nil { + pm.logger.Warnw("failed to convert max RPC price to currency", zap.Error(err)) + } else if !gs.MaxRPCPrice.Equals(update) { + bkp := gs.MaxRPCPrice + gs.MaxRPCPrice = update + if err := gs.Validate(); err != nil { + pm.logger.Warnw("failed to update gouging setting, new RPC price makes the setting invalid", zap.Error(err)) + gs.MaxRPCPrice = bkp + } else { + pm.logger.Infow("updating max RPC price", "old", bkp, "new", gs.MaxRPCPrice, "rate", rate) + updated = true + } + } + } + + // update max storage price + if pins.MaxStorage.IsPinned() { + update, err := convertCurrencyToSC(decimal.NewFromFloat(pins.MaxStorage.Value), rate) + if err != nil { + pm.logger.Warnw("failed to convert max storage price to currency", zap.Error(err)) + } else if !gs.MaxStoragePrice.Equals(update) { + bkp := gs.MaxStoragePrice + gs.MaxStoragePrice = update + if err := gs.Validate(); err != nil { + pm.logger.Warnw("failed to update gouging setting, new storage price makes the setting invalid", zap.Error(err)) + gs.MaxStoragePrice = bkp + } else { + pm.logger.Infow("updating max storage price", "old", bkp, "new", gs.MaxStoragePrice, "rate", rate) + updated = true + } + } + } + + // update max upload price + if pins.MaxUpload.IsPinned() { + update, err := convertCurrencyToSC(decimal.NewFromFloat(pins.MaxUpload.Value), rate) + if err != nil { + pm.logger.Warnw("failed to convert max upload price to currency", zap.Error(err)) + } else if !gs.MaxUploadPrice.Equals(update) { + bkp := gs.MaxUploadPrice + gs.MaxUploadPrice = update + if err := gs.Validate(); err != nil { + pm.logger.Warnw("failed to update gouging setting, new upload price makes the setting invalid", zap.Error(err)) + gs.MaxUploadPrice = bkp + } else { + pm.logger.Infow("updating max upload price", "old", bkp, "new", gs.MaxUploadPrice, "rate", rate) + updated = true + } + } + } + + // return early if no updates took place + if !updated { + pm.logger.Infow("gouging prices did not require price update", "rate", rate) + return nil + } + + // validate settings + err := gs.Validate() + if err != nil { + pm.logger.Warnw("failed to update gouging setting, new settings make the setting invalid", zap.Error(err)) + return err + } + + // update settings + bytes, _ := json.Marshal(gs) + err = pm.ss.UpdateSetting(ctx, api.SettingGouging, string(bytes)) + + // broadcast event + if err == nil { + pm.broadcaster.BroadcastAction(ctx, webhooks.Event{ + Module: api.ModuleSetting, + Event: api.EventUpdate, + Payload: api.EventSettingUpdate{ + Key: api.SettingGouging, + Update: string(bytes), + Timestamp: time.Now().UTC(), + }, + }) + } + + return err +} + +func (pm *pinManager) updatePrices(ctx context.Context, forced bool) error { + pm.logger.Debugw("updating prices", zap.Bool("forced", forced)) + + // fetch pinned settings + settings, err := pm.pinnedSettings(ctx) + if errors.Is(err, api.ErrSettingNotFound) { + pm.logger.Debug("price pinning not configured, skipping price update") + return nil + } else if err != nil { + return fmt.Errorf("failed to fetch pinned settings: %w", err) + } else if !settings.Enabled { + pm.logger.Debug("price pinning is disabled, skipping price update") + return nil + } + + // fetch exchange rate + rate, err := NewForexClient(settings.ForexEndpointURL).SiacoinExchangeRate(ctx, settings.Currency) + if err != nil { + return fmt.Errorf("failed to fetch exchange rate for '%s': %w", settings.Currency, err) + } else if rate <= 0 { + return fmt.Errorf("exchange rate for '%s' must be positive: %f", settings.Currency, rate) + } + + // update exchange rates + err = pm.updateExchangeRates(settings.Currency, rate) + if err != nil { + return err + } + + // return early if the rate does not exceed the threshold + if !forced && !pm.rateExceedsThreshold(settings.Threshold) { + pm.logger.Debug( + "rate does not exceed threshold, skipping price update", + zap.Stringer("threshold", decimal.NewFromFloat(settings.Threshold)), + zap.Stringer("rate", decimal.NewFromFloat(rate)), + ) + return nil + } + + // update gouging settings + update := pm.averageRate() + err = pm.updateGougingSettings(ctx, settings.GougingSettingsPins, update) + if err != nil { + pm.logger.Warnw("failed to update gouging settings", zap.Error(err)) + } + + // update autopilot settings + for ap, pins := range settings.Autopilots { + err = pm.updateAutopilotSettings(ctx, ap, pins, update) + if err != nil { + pm.logger.Warnw("failed to update autopilot settings", zap.String("autopilot", ap), zap.Error(err)) + } + } + + return nil +} + +// convertCurrencyToSC converts a value in an external currency and an exchange +// rate to Siacoins. +func convertCurrencyToSC(target decimal.Decimal, rate decimal.Decimal) (types.Currency, error) { + if rate.IsZero() { + return types.Currency{}, nil + } + + i := target.Div(rate).Mul(decimal.New(1, 24)).BigInt() + if i.Sign() < 0 { + return types.Currency{}, errors.New("negative currency") + } else if i.BitLen() > 128 { + return types.Currency{}, errors.New("currency overflow") + } + return types.NewCurrency(i.Uint64(), i.Rsh(i, 64).Uint64()), nil +} diff --git a/internal/bus/pinmanager_test.go b/internal/bus/pinmanager_test.go new file mode 100644 index 000000000..a2af6e137 --- /dev/null +++ b/internal/bus/pinmanager_test.go @@ -0,0 +1,291 @@ +package bus + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "reflect" + "sync" + "testing" + "time" + + "github.com/shopspring/decimal" + "go.sia.tech/core/types" + "go.sia.tech/hostd/host/settings/pin" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/build" + "go.sia.tech/renterd/webhooks" + "go.uber.org/zap" +) + +const ( + testAutopilotID = "default" + testUpdateInterval = 100 * time.Millisecond +) + +type mockBroadcaster struct { + events []webhooks.Event +} + +func (meb *mockBroadcaster) BroadcastAction(ctx context.Context, e webhooks.Event) error { + meb.events = append(meb.events, e) + return nil +} + +type mockForexAPI struct { + s *httptest.Server + + mu sync.Mutex + rate float64 +} + +func newTestForexAPI() *mockForexAPI { + api := &mockForexAPI{rate: 1} + api.s = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + api.mu.Lock() + defer api.mu.Unlock() + json.NewEncoder(w).Encode(api.rate) + })) + return api +} + +func (api *mockForexAPI) Close() { + api.s.Close() +} + +func (api *mockForexAPI) updateRate(rate float64) { + api.mu.Lock() + defer api.mu.Unlock() + api.rate = rate +} + +type mockStore struct { + mu sync.Mutex + settings map[string]string + autopilots map[string]api.Autopilot +} + +func newTestStore() *mockStore { + s := &mockStore{ + autopilots: make(map[string]api.Autopilot), + settings: make(map[string]string), + } + + // add default price pin - and gouging settings + b, _ := json.Marshal(build.DefaultPricePinSettings) + s.settings[api.SettingPricePinning] = string(b) + b, _ = json.Marshal(build.DefaultGougingSettings) + s.settings[api.SettingGouging] = string(b) + + // add default autopilot + s.autopilots[testAutopilotID] = api.Autopilot{ + ID: testAutopilotID, + Config: api.AutopilotConfig{ + Contracts: api.ContractsConfig{ + Allowance: types.Siacoins(1), + }, + }, + } + + return s +} + +func (ms *mockStore) gougingSettings() api.GougingSettings { + val, err := ms.Setting(context.Background(), api.SettingGouging) + if err != nil { + panic(err) + } + var gs api.GougingSettings + if err := json.Unmarshal([]byte(val), &gs); err != nil { + panic(err) + } + return gs +} + +func (ms *mockStore) updatPinnedSettings(pps api.PricePinSettings) { + b, _ := json.Marshal(pps) + ms.UpdateSetting(context.Background(), api.SettingPricePinning, string(b)) + time.Sleep(2 * testUpdateInterval) +} + +func (ms *mockStore) Setting(ctx context.Context, key string) (string, error) { + ms.mu.Lock() + defer ms.mu.Unlock() + return ms.settings[key], nil +} + +func (ms *mockStore) UpdateSetting(ctx context.Context, key, value string) error { + ms.mu.Lock() + defer ms.mu.Unlock() + ms.settings[key] = value + return nil +} + +func (ms *mockStore) Autopilot(ctx context.Context, id string) (api.Autopilot, error) { + ms.mu.Lock() + defer ms.mu.Unlock() + return ms.autopilots[id], nil +} + +func (ms *mockStore) UpdateAutopilot(ctx context.Context, autopilot api.Autopilot) error { + ms.mu.Lock() + defer ms.mu.Unlock() + ms.autopilots[autopilot.ID] = autopilot + return nil +} + +func TestPinManager(t *testing.T) { + // mock dependencies + ms := newTestStore() + eb := &mockBroadcaster{} + + // mock forex api + forex := newTestForexAPI() + defer forex.Close() + + // start a pinmanager + pm := NewPinManager(eb, ms, ms, testUpdateInterval, time.Minute, zap.NewNop()) + if err := pm.Run(context.Background()); err != nil { + t.Fatal(err) + } + defer func() { + if err := pm.Close(context.Background()); err != nil { + t.Fatal(err) + } + }() + + // define a small helper to fetch the price manager's rates + rates := func() []float64 { + t.Helper() + pm.mu.Lock() + defer pm.mu.Unlock() + return pm.rates + } + + // assert price manager is disabled by default + if cnt := len(rates()); cnt != 0 { + t.Fatalf("expected no rates, got %d", cnt) + } + + // enable price pinning + pps := build.DefaultPricePinSettings + pps.Enabled = true + pps.Currency = "usd" + pps.Threshold = 0.5 + pps.ForexEndpointURL = forex.s.URL + ms.updatPinnedSettings(pps) + + // assert price manager is running now + if cnt := len(rates()); cnt < 1 { + t.Fatal("expected at least one rate") + } + + // update exchange rate and fetch current gouging settings + forex.updateRate(2.5) + gs := ms.gougingSettings() + + // configure all pins but disable them for now + pps.GougingSettingsPins.MaxDownload = api.Pin{Value: 3, Pinned: false} + pps.GougingSettingsPins.MaxRPCPrice = api.Pin{Value: 3, Pinned: false} + pps.GougingSettingsPins.MaxStorage = api.Pin{Value: 3, Pinned: false} + pps.GougingSettingsPins.MaxUpload = api.Pin{Value: 3, Pinned: false} + ms.updatPinnedSettings(pps) + + // assert gouging settings are unchanged + if gss := ms.gougingSettings(); !reflect.DeepEqual(gs, gss) { + t.Fatalf("expected gouging settings to be the same, got %v", gss) + } + + // enable the max download pin, with the threshold at 0.5 it should remain unchanged + pps.GougingSettingsPins.MaxDownload.Pinned = true + ms.updatPinnedSettings(pps) + if gss := ms.gougingSettings(); !reflect.DeepEqual(gs, gss) { + t.Fatalf("expected gouging settings to be the same, got %v", gss) + } + + // lower the threshold, gouging settings should be updated + pps.Threshold = 0.05 + ms.updatPinnedSettings(pps) + if gss := ms.gougingSettings(); gss.MaxContractPrice.Equals(gs.MaxDownloadPrice) { + t.Fatalf("expected gouging settings to be updated, got %v = %v", gss.MaxDownloadPrice, gs.MaxDownloadPrice) + } + + // enable the rest of the pins + pps.GougingSettingsPins.MaxDownload.Pinned = true + pps.GougingSettingsPins.MaxRPCPrice.Pinned = true + pps.GougingSettingsPins.MaxStorage.Pinned = true + pps.GougingSettingsPins.MaxUpload.Pinned = true + ms.updatPinnedSettings(pps) + + // assert they're all updated + if gss := ms.gougingSettings(); gss.MaxDownloadPrice.Equals(gs.MaxDownloadPrice) || + gss.MaxRPCPrice.Equals(gs.MaxRPCPrice) || + gss.MaxStoragePrice.Equals(gs.MaxStoragePrice) || + gss.MaxUploadPrice.Equals(gs.MaxUploadPrice) { + t.Fatalf("expected gouging settings to be updated, got %v = %v", gss, gs) + } + + // increase rate so average isn't catching up to us + forex.updateRate(3) + + // fetch autopilot + ap, _ := ms.Autopilot(context.Background(), testAutopilotID) + + // add autopilot pin, but disable it + pins := api.AutopilotPins{ + Allowance: api.Pin{ + Pinned: false, + Value: 2, + }, + } + pps.Autopilots = map[string]api.AutopilotPins{testAutopilotID: pins} + ms.updatPinnedSettings(pps) + + // assert autopilot was not updated + if app, _ := ms.Autopilot(context.Background(), testAutopilotID); !app.Config.Contracts.Allowance.Equals(ap.Config.Contracts.Allowance) { + t.Fatalf("expected autopilot to not be updated, got %v = %v", app.Config.Contracts.Allowance, ap.Config.Contracts.Allowance) + } + + // enable the pin + pins.Allowance.Pinned = true + pps.Autopilots[testAutopilotID] = pins + ms.updatPinnedSettings(pps) + + // assert autopilot was updated + if app, _ := ms.Autopilot(context.Background(), testAutopilotID); app.Config.Contracts.Allowance.Equals(ap.Config.Contracts.Allowance) { + t.Fatalf("expected autopilot to be updated, got %v = %v", app.Config.Contracts.Allowance, ap.Config.Contracts.Allowance) + } +} + +// TestConvertConvertCurrencyToSC tests the conversion of a currency to Siacoins. +func TestConvertConvertCurrencyToSC(t *testing.T) { + tests := []struct { + target decimal.Decimal + rate decimal.Decimal + expected types.Currency + err error + }{ + {decimal.NewFromFloat(1), decimal.NewFromFloat(1), types.Siacoins(1), nil}, + {decimal.NewFromFloat(1), decimal.NewFromFloat(2), types.Siacoins(1).Div64(2), nil}, + {decimal.NewFromFloat(1), decimal.NewFromFloat(0.5), types.Siacoins(2), nil}, + {decimal.NewFromFloat(0.5), decimal.NewFromFloat(0.5), types.Siacoins(1), nil}, + {decimal.NewFromFloat(1), decimal.NewFromFloat(0.001), types.Siacoins(1000), nil}, + {decimal.NewFromFloat(1), decimal.NewFromFloat(0), types.Currency{}, nil}, + {decimal.NewFromFloat(1), decimal.NewFromFloat(-1), types.Currency{}, errors.New("negative currency")}, + {decimal.NewFromFloat(-1), decimal.NewFromFloat(1), types.Currency{}, errors.New("negative currency")}, + {decimal.New(1, 50), decimal.NewFromFloat(0.1), types.Currency{}, errors.New("currency overflow")}, + } + for i, test := range tests { + if result, err := pin.ConvertCurrencyToSC(test.target, test.rate); test.err != nil { + if err == nil { + t.Fatalf("%d: expected error, got nil", i) + } else if err.Error() != test.err.Error() { + t.Fatalf("%d: expected %v, got %v", i, test.err, err) + } + } else if !test.expected.Equals(result) { + t.Fatalf("%d: expected %d, got %d", i, test.expected, result) + } + } +} diff --git a/internal/node/node.go b/internal/node/node.go index f07980a23..760ef45a8 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -54,31 +54,32 @@ type AutopilotConfig struct { } type ( - RunFn = func() error - SetupFn = func(context.Context, string, string) error - ShutdownFn = func(context.Context) error + RunFn = func() error + BusSetupFn = func(context.Context) error + WorkerSetupFn = func(context.Context, string, string) error + ShutdownFn = func(context.Context) error ) var NoopFn = func(context.Context) error { return nil } -func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { +func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (http.Handler, BusSetupFn, ShutdownFn, error) { gatewayDir := filepath.Join(dir, "gateway") if err := os.MkdirAll(gatewayDir, 0700); err != nil { - return nil, nil, err + return nil, nil, nil, err } g, err := gateway.New(cfg.GatewayAddr, cfg.Bootstrap, gatewayDir) if err != nil { - return nil, nil, err + return nil, nil, nil, err } consensusDir := filepath.Join(dir, "consensus") if err := os.MkdirAll(consensusDir, 0700); err != nil { - return nil, nil, err + return nil, nil, nil, err } cs, errCh := mconsensus.New(g, cfg.Bootstrap, consensusDir) select { case err := <-errCh: if err != nil { - return nil, nil, err + return nil, nil, nil, err } default: go func() { @@ -89,11 +90,11 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht } tpoolDir := filepath.Join(dir, "transactionpool") if err := os.MkdirAll(tpoolDir, 0700); err != nil { - return nil, nil, err + return nil, nil, nil, err } tp, err := transactionpool.New(cs, g, tpoolDir) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // create database connections @@ -116,7 +117,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht // create database directory dbDir := filepath.Join(dir, "db") if err := os.MkdirAll(dbDir, 0700); err != nil { - return nil, nil, err + return nil, nil, nil, err } // create SQLite connections @@ -155,11 +156,11 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht LongTxDuration: cfg.DatabaseLog.SlowThreshold, }) if err != nil { - return nil, nil, err + return nil, nil, nil, err } hooksMgr, err := webhooks.NewManager(l.Named("webhooks").Sugar(), sqlStore) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Hook up webhooks to alerts. @@ -189,24 +190,24 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht w := wallet.NewSingleAddressWallet(seed, sqlStore, cfg.UsedUTXOExpiry, zap.NewNop().Sugar()) tp.TransactionPoolSubscribe(w) if err := cs.ConsensusSetSubscribe(w, modules.ConsensusChangeRecent, nil); err != nil { - return nil, nil, err + return nil, nil, nil, err } if m := cfg.Miner; m != nil { if err := cs.ConsensusSetSubscribe(m, ccid, nil); err != nil { - return nil, nil, err + return nil, nil, nil, err } tp.TransactionPoolSubscribe(m) } cm, err := NewChainManager(cs, NewTransactionPool(tp), cfg.Network) if err != nil { - return nil, nil, err + return nil, nil, nil, err } b, err := bus.New(syncer{g, tp}, alertsMgr, hooksMgr, cm, NewTransactionPool(tp), w, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, l) if err != nil { - return nil, nil, err + return nil, nil, nil, err } shutdownFn := func(ctx context.Context) error { @@ -219,10 +220,10 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht sqlStore.Close(), ) } - return b.Handler(), shutdownFn, nil + return b.Handler(), b.Setup, shutdownFn, nil } -func NewWorker(cfg config.Worker, s3Opts s3.Opts, b Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, http.Handler, SetupFn, ShutdownFn, error) { +func NewWorker(cfg config.Worker, s3Opts s3.Opts, b Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, http.Handler, WorkerSetupFn, ShutdownFn, error) { workerKey := blake2b.Sum256(append([]byte("worker"), seed...)) w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.DownloadMaxMemory, cfg.UploadMaxMemory, cfg.AllowPrivateIPs, l) if err != nil { diff --git a/internal/test/config.go b/internal/test/config.go index 68a5fff5b..abf6caaac 100644 --- a/internal/test/config.go +++ b/internal/test/config.go @@ -52,6 +52,10 @@ var ( MinMaxEphemeralAccountBalance: types.Siacoins(1), // 1SC } + PricePinSettings = api.PricePinSettings{ + Enabled: false, + } + RedundancySettings = api.RedundancySettings{ MinShards: 2, TotalShards: 3, diff --git a/internal/test/e2e/cluster.go b/internal/test/e2e/cluster.go index 8d5f93ce5..6fd9f5673 100644 --- a/internal/test/e2e/cluster.go +++ b/internal/test/e2e/cluster.go @@ -313,7 +313,7 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { busCfg.Miner = node.NewMiner(busClient) // Create bus. - b, bStopFn, err := node.NewBus(busCfg, busDir, wk, logger) + b, bSetupFn, bShutdownFn, err := node.NewBus(busCfg, busDir, wk, logger) tt.OK(err) busAuth := jape.BasicAuth(busPassword) @@ -330,7 +330,7 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { var busShutdownFns []func(context.Context) error busShutdownFns = append(busShutdownFns, busServer.Shutdown) - busShutdownFns = append(busShutdownFns, bStopFn) + busShutdownFns = append(busShutdownFns, bShutdownFn) // Create worker. w, s3Handler, wSetupFn, wShutdownFn, err := node.NewWorker(workerCfg, s3.Opts{}, busClient, wk, logger) @@ -415,6 +415,16 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { }() } + // Finish bus setup. + if err := bSetupFn(ctx); err != nil { + tt.Fatalf("failed to setup bus, err: %v", err) + } + + // Finish worker setup. + if err := wSetupFn(ctx, workerAddr, workerPassword); err != nil { + tt.Fatalf("failed to setup worker, err: %v", err) + } + // Set the test contract set to make sure we can add objects at the // beginning of a test right away. tt.OK(busClient.SetContractSet(ctx, test.ContractSet, []types.FileContractID{})) @@ -429,8 +439,9 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { // Update the bus settings. tt.OK(busClient.UpdateSetting(ctx, api.SettingGouging, test.GougingSettings)) - tt.OK(busClient.UpdateSetting(ctx, api.SettingRedundancy, test.RedundancySettings)) tt.OK(busClient.UpdateSetting(ctx, api.SettingContractSet, test.ContractSetSettings)) + tt.OK(busClient.UpdateSetting(ctx, api.SettingPricePinning, test.PricePinSettings)) + tt.OK(busClient.UpdateSetting(ctx, api.SettingRedundancy, test.RedundancySettings)) tt.OK(busClient.UpdateSetting(ctx, api.SettingS3Authentication, api.S3AuthenticationSettings{ V4Keypairs: map[string]string{test.S3AccessKeyID: test.S3SecretAccessKey}, })) @@ -439,11 +450,6 @@ func newTestCluster(t *testing.T, opts testClusterOptions) *TestCluster { SlabBufferMaxSizeSoft: build.DefaultUploadPackingSettings.SlabBufferMaxSizeSoft, })) - // Register the worker - if err := wSetupFn(ctx, workerAddr, workerPassword); err != nil { - tt.Fatalf("failed to register worker, err: %v", err) - } - // Fund the bus. if funding { cluster.MineBlocks(latestHardforkHeight) diff --git a/internal/test/e2e/events_test.go b/internal/test/e2e/events_test.go index 8760a4e02..befa3194a 100644 --- a/internal/test/e2e/events_test.go +++ b/internal/test/e2e/events_test.go @@ -19,20 +19,21 @@ import ( // TestEvents is a test that verifies the bus sends webhooks for certain events, // providing an event webhook was registered. func TestEvents(t *testing.T) { - // list all events - allEvents := []webhooks.EventWebhook{ - api.EventConsensusUpdate{}, - api.EventContractArchive{}, - api.EventContractRenew{}, - api.EventContractSetUpdate{}, - api.EventSettingUpdate{}, - api.EventSettingDelete{}, + // list all webhooks + allEvents := []func(string, map[string]string) webhooks.Webhook{ + api.WebhookConsensusUpdate, + api.WebhookContractArchive, + api.WebhookContractRenew, + api.WebhookContractSetUpdate, + api.WebhookSettingDelete, + api.WebhookSettingUpdate, } // define helper to check if the event is known isKnownEvent := func(e webhooks.Event) bool { - for _, known := range allEvents { - if known.Event().Module == e.Module && known.Event().Event == e.Event { + for _, eFn := range allEvents { + known := eFn("", nil) + if known.Module == e.Module && known.Event == e.Event { return true } } @@ -83,7 +84,7 @@ func TestEvents(t *testing.T) { // register webhooks for _, e := range allEvents { - tt.OK(b.RegisterWebhook(context.Background(), webhooks.NewEventWebhook(server.URL, e))) + tt.OK(b.RegisterWebhook(context.Background(), e(server.URL, nil))) } // fetch our contract diff --git a/internal/worker/cache.go b/internal/worker/cache.go index 7841e9249..e223c82fe 100644 --- a/internal/worker/cache.go +++ b/internal/worker/cache.go @@ -75,7 +75,7 @@ type ( Bus interface { Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) GougingParams(ctx context.Context) (api.GougingParams, error) - RegisterWebhook(ctx context.Context, wh webhooks.Webhook, opts ...webhooks.HeaderOption) error + RegisterWebhook(ctx context.Context, wh webhooks.Webhook) error } WorkerCache interface { @@ -196,13 +196,17 @@ func (c *cache) HandleEvent(event webhooks.Event) (err error) { func (c *cache) Initialize(ctx context.Context, workerAPI string, webhookOpts ...webhooks.HeaderOption) error { eventsURL := fmt.Sprintf("%s/events", workerAPI) + headers := make(map[string]string) + for _, opt := range webhookOpts { + opt(headers) + } for _, wh := range []webhooks.Webhook{ - webhooks.NewEventWebhook(eventsURL, api.EventConsensusUpdate{}), - webhooks.NewEventWebhook(eventsURL, api.EventContractArchive{}), - webhooks.NewEventWebhook(eventsURL, api.EventContractRenew{}), - webhooks.NewEventWebhook(eventsURL, api.EventSettingUpdate{}), + api.WebhookConsensusUpdate(eventsURL, headers), + api.WebhookContractArchive(eventsURL, headers), + api.WebhookContractRenew(eventsURL, headers), + api.WebhookSettingUpdate(eventsURL, headers), } { - if err := c.b.RegisterWebhook(ctx, wh, webhookOpts...); err != nil { + if err := c.b.RegisterWebhook(ctx, wh); err != nil { return fmt.Errorf("failed to register webhook '%s', err: %v", wh, err) } } diff --git a/internal/worker/cache_test.go b/internal/worker/cache_test.go index 0b419a29c..e696ed02c 100644 --- a/internal/worker/cache_test.go +++ b/internal/worker/cache_test.go @@ -26,7 +26,7 @@ func (m *mockBus) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api. func (m *mockBus) GougingParams(ctx context.Context) (api.GougingParams, error) { return m.gougingParams, nil } -func (m *mockBus) RegisterWebhook(ctx context.Context, wh webhooks.Webhook, opts ...webhooks.HeaderOption) error { +func (m *mockBus) RegisterWebhook(ctx context.Context, wh webhooks.Webhook) error { return nil } @@ -145,14 +145,14 @@ func TestWorkerCache(t *testing.T) { // assert the worker cache handles every event _ = observedLogs.TakeAll() // clear logs - for _, event := range []webhooks.EventWebhook{ - api.EventConsensusUpdate{}, - api.EventContractArchive{}, - api.EventContractRenew{}, - api.EventSettingUpdate{}, - api.EventSettingDelete{}, + for _, event := range []webhooks.Event{ + {Module: api.ModuleConsensus, Event: api.EventUpdate, Payload: nil}, + {Module: api.ModuleContract, Event: api.EventArchive, Payload: nil}, + {Module: api.ModuleContract, Event: api.EventRenew, Payload: nil}, + {Module: api.ModuleSetting, Event: api.EventUpdate, Payload: nil}, + {Module: api.ModuleSetting, Event: api.EventDelete, Payload: nil}, } { - if err := c.HandleEvent(event.Event()); err != nil { + if err := c.HandleEvent(event); err != nil { t.Fatal(err) } } diff --git a/stores/sql/main.go b/stores/sql/main.go index b91926750..268743261 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -1015,7 +1015,7 @@ func RecordHostScans(ctx context.Context, tx sql.Tx, scans []api.HostScan) error total_scans = total_scans + 1, second_to_last_scan_success = last_scan_success, last_scan_success = ?, - recent_downtime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN recent_downtime + ? - last_scan ELSE CASE WHEN ? THEN 0 ELSE recent_downtime END END, + recent_downtime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN recent_downtime + ? - last_scan ELSE CASE WHEN ? THEN 0 ELSE recent_downtime END END, recent_scan_failures = CASE WHEN ? THEN 0 ELSE recent_scan_failures + 1 END, downtime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN downtime + ? - last_scan ELSE downtime END, uptime = CASE WHEN ? AND last_scan > 0 AND last_scan < ? THEN uptime + ? - last_scan ELSE uptime END, diff --git a/webhooks/webhooks.go b/webhooks/webhooks.go index 9a4d290fa..20bf94381 100644 --- a/webhooks/webhooks.go +++ b/webhooks/webhooks.go @@ -66,20 +66,8 @@ type ( Event string `json:"event"` Payload interface{} `json:"payload,omitempty"` } - - EventWebhook interface { - Event() Event - } ) -func NewEventWebhook(url string, e EventWebhook) Webhook { - return Webhook{ - Module: e.Event().Module, - Event: e.Event().Event, - URL: url, - } -} - type Manager struct { logger *zap.SugaredLogger wg sync.WaitGroup diff --git a/worker/mocks_test.go b/worker/mocks_test.go index b67d80c52..192f4c169 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -694,6 +694,6 @@ var _ WebhookStore = (*webhookStoreMock)(nil) type webhookStoreMock struct{} -func (*webhookStoreMock) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook, opts ...webhooks.HeaderOption) error { +func (*webhookStoreMock) RegisterWebhook(ctx context.Context, webhook webhooks.Webhook) error { return nil } diff --git a/worker/worker.go b/worker/worker.go index 842e541f2..49aaac184 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -158,7 +158,7 @@ type ( } WebhookStore interface { - RegisterWebhook(ctx context.Context, webhook webhooks.Webhook, opts ...webhooks.HeaderOption) error + RegisterWebhook(ctx context.Context, webhook webhooks.Webhook) error } ConsensusState interface { @@ -1301,9 +1301,8 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush return nil, errors.New("uploadMaxMemory cannot be 0") } - cache := iworker.NewCache(b, l) - l = l.Named("worker").Named(id) + cache := iworker.NewCache(b, l) shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) w := &worker{ alerts: alerts.WithOrigin(b, fmt.Sprintf("worker.%s", id)),