Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ARCO-291): Ordered callbacks #755

Merged
merged 18 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 21 additions & 31 deletions cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/callbacker"
"github.com/bitcoin-sv/arc/internal/callbacker/store"
"github.com/bitcoin-sv/arc/internal/callbacker/send_manager"
"github.com/bitcoin-sv/arc/internal/callbacker/store/postgresql"
"github.com/bitcoin-sv/arc/internal/grpc_opts"
"github.com/bitcoin-sv/arc/pkg/message_queue/nats/client/nats_jetstream"
Expand All @@ -48,7 +48,6 @@ func StartCallbacker(logger *slog.Logger, arcConfig *config.ArcConfig) (func(),
callbackerStore *postgresql.PostgreSQL
sender *callbacker.CallbackSender
dispatcher *callbacker.CallbackDispatcher
workers *callbacker.BackgroundWorkers
server *callbacker.Server
healthServer *grpc_opts.GrpcServer
mqClient callbacker.MessageQueueClient
Expand All @@ -58,7 +57,7 @@ func StartCallbacker(logger *slog.Logger, arcConfig *config.ArcConfig) (func(),

stopFn := func() {
logger.Info("Shutting down callbacker")
dispose(logger, server, workers, dispatcher, sender, callbackerStore, healthServer, processor, mqClient)
dispose(logger, server, dispatcher, sender, callbackerStore, healthServer, processor, mqClient)
logger.Info("Shutdown complete")
}

Expand All @@ -73,24 +72,18 @@ func StartCallbacker(logger *slog.Logger, arcConfig *config.ArcConfig) (func(),
return nil, fmt.Errorf("failed to create callback sender: %v", err)
}

sendConfig := callbacker.SendConfig{
Expiration: cfg.Expiration,
Delay: cfg.Delay,
DelayDuration: cfg.DelayDuration,
PauseAfterSingleModeSuccessfulSend: cfg.Pause,
BatchSendInterval: cfg.BatchSendInterval,
}
runNewManager := func(url string) callbacker.SendManagerI {
manager := send_manager.New(url, sender, callbackerStore, logger,
send_manager.WithQueueProcessInterval(cfg.Pause),
send_manager.WithBatchSendInterval(cfg.BatchSendInterval),
send_manager.WithExpiration(cfg.Expiration),
)
manager.Start()

dispatcher = callbacker.NewCallbackDispatcher(sender, callbackerStore, logger, &sendConfig)
workers = callbacker.NewBackgroundWorkers(callbackerStore, dispatcher, logger)
err = workers.DispatchPersistedCallbacks()
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to dispatch previously persisted callbacks: %v", err)
return manager
}

workers.StartCallbackStoreCleanup(cfg.PruneInterval, cfg.PruneOlderThan)
workers.StartFailedCallbacksDispatch(cfg.FailedCallbackCheckInterval)
dispatcher = callbacker.NewCallbackDispatcher(sender, runNewManager)

natsConnection, err := nats_connection.New(arcConfig.MessageQueue.URL, logger)
if err != nil {
Expand Down Expand Up @@ -119,6 +112,9 @@ func StartCallbacker(logger *slog.Logger, arcConfig *config.ArcConfig) (func(),
return nil, err
}

processor.StartCallbackStoreCleanup(cfg.PruneInterval, cfg.PruneOlderThan)
processor.DispatchPersistedCallbacks()

err = processor.Start()
if err != nil {
stopFn()
Expand Down Expand Up @@ -168,37 +164,31 @@ func newStore(dbConfig *config.DbConfig) (s *postgresql.PostgreSQL, err error) {
return s, err
}

func dispose(l *slog.Logger, server *callbacker.Server, workers *callbacker.BackgroundWorkers,
func dispose(l *slog.Logger, server *callbacker.Server,
dispatcher *callbacker.CallbackDispatcher, sender *callbacker.CallbackSender,
store store.CallbackerStore, healthServer *grpc_opts.GrpcServer, processor *callbacker.Processor, mqClient callbacker.MessageQueueClient) {
store *postgresql.PostgreSQL, healthServer *grpc_opts.GrpcServer, processor *callbacker.Processor, mqClient callbacker.MessageQueueClient) {
// dispose the dependencies in the correct order:
// 1. server - ensure no new callbacks will be received
// 2. background workers - ensure no callbacks from background will be accepted
// 3. dispatcher - ensure all already accepted callbacks are proccessed
// 2. dispatcher - ensure all already accepted callbacks are processed
// 3. processor - remove all URL mappings
// 4. sender - finally, stop the sender as there are no callbacks left to send
// 5. store

if server != nil {
server.GracefulStop()
}
if workers != nil {
workers.GracefulStop()
}
if dispatcher != nil {
dispatcher.GracefulStop()
}
if sender != nil {
sender.GracefulStop()
}

if processor != nil {
processor.GracefulStop()
}

if sender != nil {
sender.GracefulStop()
}
if mqClient != nil {
mqClient.Shutdown()
}

if store != nil {
err := store.Close()
if err != nil {
Expand Down
21 changes: 9 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,13 @@ type K8sWatcherConfig struct {
}

type CallbackerConfig struct {
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Delay time.Duration `mapstructure:"delay"`
Pause time.Duration `mapstructure:"pause"`
BatchSendInterval time.Duration `mapstructure:"batchSendInterval"`
Db *DbConfig `mapstructure:"db"`
PruneInterval time.Duration `mapstructure:"pruneInterval"`
PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"`
DelayDuration time.Duration `mapstructure:"delayDuration"`
FailedCallbackCheckInterval time.Duration `mapstructure:"failedCallbackCheckInterval"`
Expiration time.Duration `mapstructure:"expiration"`
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Pause time.Duration `mapstructure:"pause"`
BatchSendInterval time.Duration `mapstructure:"batchSendInterval"`
PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"`
PruneInterval time.Duration `mapstructure:"pruneInterval"`
Expiration time.Duration `mapstructure:"expiration"`
Db *DbConfig `mapstructure:"db"`
}
14 changes: 6 additions & 8 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,12 @@ func getCallbackerConfig() *CallbackerConfig {
Health: &HealthConfig{
SeverDialAddr: "localhost:8025",
},
Delay: 0,
Pause: 0,
BatchSendInterval: time.Duration(5 * time.Second),
Db: getDbConfig("callbacker"),
PruneInterval: 24 * time.Hour,
PruneOlderThan: 14 * 24 * time.Hour,
FailedCallbackCheckInterval: time.Minute,
Expiration: 24 * time.Hour,
Pause: 0,
BatchSendInterval: 5 * time.Second,
PruneOlderThan: 14 * 24 * time.Hour,
PruneInterval: 24 * time.Hour,
Expiration: 24 * time.Hour,
Db: getDbConfig("callbacker"),
}
}

Expand Down
11 changes: 4 additions & 7 deletions config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ callbacker:
dialAddr: localhost:8021 # address for other services to dial callbacker service
health:
serverDialAddr: localhost:8025 # address at which the grpc health server is exposed
delay: 0s # delay before the callback (or batch of callbacks) is actually sent
pause: 0s # pause between sending next callback to the same receiver
pause: 1s # pause between sending next callback to the same receiver - must be greater 0s
batchSendInterval: 5s # interval at witch batched callbacks are send (default 5s)
pruneOlderThan: 336h # age threshold for pruning callbacks (older than this value will be removed)
pruneInterval: 24h # interval at which old or failed callbacks are pruned from the store
expiration: 24h # maximum time a callback can remain unsent before it's put as permanently failed
db:
mode: postgres # db mode indicates which db to use. At the moment only postgres is offered
postgres: # postgres db configuration in case that mode: postgres
Expand All @@ -194,8 +196,3 @@ callbacker:
maxIdleConns: 10 # maximum idle connections
maxOpenConns: 80 # maximum open connections
sslMode: disable
pruneInterval: 24h # interval at which old or failed callbacks are pruned from the store
pruneOlderThan: 336h # age threshold for pruning callbacks (older than this value will be removed)
failedCallbackCheckInterval: 1m # interval at which the store is checked for failed callbacks to be re-sent
delayDuration: 5s # we try callbacks a few times with this delay after which if it fails consistently we store them in db
expiration: 24h # maximum time a callback can remain unsent before it's put as permanently failed
3 changes: 1 addition & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ services:
condition: service_healthy
migrate-blocktx:
condition: service_completed_successfully

healthcheck:
test: ["CMD", "/bin/grpc_health_probe", "-addr=:8006", "-service=liveness", "-rpc-timeout=5s"]
interval: 10s
Expand All @@ -204,7 +203,7 @@ services:
migrate-callbacker:
condition: service_completed_successfully
healthcheck:
test: ["CMD", "/bin/grpc_health_probe", "-addr=:8022", "-service=liveness", "-rpc-timeout=5s"]
test: ["CMD", "/bin/grpc_health_probe", "-addr=:8025", "-service=liveness", "-rpc-timeout=5s"]
interval: 10s
timeout: 5s
retries: 3
Expand Down
160 changes: 0 additions & 160 deletions internal/callbacker/background_workers.go

This file was deleted.

Loading
Loading