Skip to content

Commit

Permalink
[PSL-1231] remove storage-challenge & self-healing stale data
Browse files Browse the repository at this point in the history
  • Loading branch information
j-rafique committed Jul 25, 2024
1 parent e9fb880 commit e5114e1
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 3 deletions.
18 changes: 18 additions & 0 deletions common/storage/queries/self_healing.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type SelfHealingQueries interface {
GetLastNSHChallenges(ctx context.Context, n int) (types.SelfHealingReports, error)
GetSHChallengeReport(ctx context.Context, challengeID string) (types.SelfHealingReports, error)
GetSHExecutionMetrics(ctx context.Context, from time.Time) (metrics.SHExecutionMetrics, error)
RemoveSelfHealingStaleData(ctx context.Context, threshold string) error
}

var (
Expand Down Expand Up @@ -642,3 +643,20 @@ func (s *SQLiteStore) CleanupSelfHealingChallenges() (err error) {
_, err = s.db.Exec(delQuery)
return err
}

func (s *SQLiteStore) RemoveSelfHealingStaleData(ctx context.Context, threshold string) error {
queries := []string{
"DELETE FROM self_healing_execution_metrics WHERE created_at < $1",
"DELETE FROM self_healing_generation_metrics WHERE created_at < $1",
"DELETE from self_healing_challenge_events where is_processed = true and created_at < $1",
}

for _, query := range queries {
if _, err := s.db.ExecContext(ctx, query, threshold); err != nil {
return fmt.Errorf("failed to delete old metrics: %v", err)
}
}

fmt.Println("Old metrics deleted successfully.")
return nil
}
17 changes: 17 additions & 0 deletions common/storage/queries/storage_challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type StorageChallengeQueries interface {
GetDistinctChallengeIDsCountForScoreAggregation(after, before time.Time) (int, error)
GetDistinctChallengeIDs(after, before time.Time, batchNumber int) ([]string, error)
BatchInsertScoreAggregationChallenges(challengeIDs []string, isAggregated bool) error
RemoveStorageChallengeStaleData(ctx context.Context, threshold string) error
}

// InsertStorageChallengeMessage inserts failed storage challenge to db
Expand Down Expand Up @@ -490,3 +491,19 @@ func (s *SQLiteStore) BatchInsertScoreAggregationChallenges(challengeIDs []strin
// Commit the transaction
return tx.Commit()
}

func (s *SQLiteStore) RemoveStorageChallengeStaleData(ctx context.Context, threshold string) error {

queries := []string{
"DELETE FROM storage_challenge_metrics WHERE created_at < $1",
"DELETE FROM storage_challenge_messages WHERE created_at < $1",
}

for _, query := range queries {
if _, err := s.db.Exec(query, threshold); err != nil {
return fmt.Errorf("failed to delete old metrics: %v", err)
}
}

return nil
}
9 changes: 8 additions & 1 deletion hermes/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"context"
"fmt"
"github.com/pastelnetwork/gonode/common/storage/queries"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -149,14 +150,20 @@ func runApp(ctx context.Context, conf *config.Config) error {

nodeClient := grpc.NewSNClient(pastelClient, secInfo)

hDB, err := queries.OpenHistoryDB()
if err != nil {
log.WithContext(ctx).WithError(err).Error("error connecting history db")
}
defer hDB.CloseHistoryDB(ctx)

hermesConfig := hermes.NewConfig()
hermesConfig.SNHost = conf.SNHost
hermesConfig.SNPort = conf.SNPort
hermesConfig.CreatorPastelID = conf.PastelID
hermesConfig.CreatorPastelIDPassphrase = conf.PassPhrase
hermesConfig.SetWorkDir(conf.DdWorkDir)

service, err := hermes.NewService(hermesConfig, pastelClient, nodeClient)
service, err := hermes.NewService(hermesConfig, pastelClient, nodeClient, hDB)
if err != nil {
return fmt.Errorf("start hermes: %w", err)
}
Expand Down
66 changes: 66 additions & 0 deletions hermes/service/hermes/metricscleanup/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package metricscleanup

import (
"context"
"time"

"github.com/pastelnetwork/gonode/common/errgroup"
"github.com/pastelnetwork/gonode/common/errors"
"github.com/pastelnetwork/gonode/common/log"
)

const (
runTaskInterval = 24 * time.Hour
)

var (
metricsCleanupThreshold = time.Now().UTC().AddDate(0, 0, -7).Format("2006-01-02")
)

// Run cleans up inactive tickets

// Run stores the latest block hash and height to DB if not stored already
func (s *metricsCleanupService) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return errors.Errorf("context done: %w", ctx.Err())
case <-time.After(runTaskInterval):
group, gctx := errgroup.WithContext(ctx)
group.Go(func() error {
err := s.run(gctx)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error executing metrics cleanup service")
}

return nil
})

if err := group.Wait(); err != nil {
log.WithContext(gctx).WithError(err).Errorf("run task failed")
}
}
}

}

func (s metricsCleanupService) Stats(_ context.Context) (map[string]interface{}, error) {
//cleanup service stats can be implemented here
return nil, nil
}

func (s *metricsCleanupService) run(ctx context.Context) error {
err := s.historyDB.RemoveStorageChallengeStaleData(ctx, metricsCleanupThreshold)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error removing storage challenge stale data")
}
log.WithContext(ctx).Info("storage challenge stale data has been removed")

err = s.historyDB.RemoveSelfHealingStaleData(ctx, metricsCleanupThreshold)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error removing self-healing stale data")
}
log.WithContext(ctx).Info("self-healing stale data has been removed")

return nil
}
20 changes: 20 additions & 0 deletions hermes/service/hermes/metricscleanup/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package metricscleanup

import "github.com/pastelnetwork/gonode/hermes/service/node"

// Config is the config struct for cleaner-service
type Config struct {
snHost string
snPort int

sn node.SNClientInterface
}

// NewConfig sets the configurations for cleaner service
func NewConfig(host string, port int, SN node.SNClientInterface) Config {
return Config{
snHost: host,
snPort: port,
sn: SN,
}
}
17 changes: 17 additions & 0 deletions hermes/service/hermes/metricscleanup/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package metricscleanup

import (
"github.com/pastelnetwork/gonode/common/storage/queries"
"github.com/pastelnetwork/gonode/hermes/service"
)

type metricsCleanupService struct {
historyDB queries.LocalStoreInterface
}

// NewMetricsCleanupService returns a new metric cleanup service
func NewMetricsCleanupService(hDB queries.LocalStoreInterface) (service.SvcInterface, error) {
return &metricsCleanupService{
historyDB: hDB,
}, nil
}
13 changes: 11 additions & 2 deletions hermes/service/hermes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package hermes
import (
"context"
"fmt"
"github.com/pastelnetwork/gonode/common/storage/queries"
"github.com/pastelnetwork/gonode/hermes/service/hermes/logsrotator"
"github.com/pastelnetwork/gonode/hermes/service/hermes/metricscleanup"
"os"
"reflect"

Expand Down Expand Up @@ -31,6 +33,7 @@ type service struct {
p2p node.HermesP2PInterface
sn node.SNClientInterface
store *store.SQLiteStore
hDB queries.LocalStoreInterface
}

func (s *service) Run(ctx context.Context) error {
Expand Down Expand Up @@ -73,7 +76,12 @@ func (s *service) Run(ctx context.Context) error {
log.WithContext(ctx).WithError(err).Error("unable to initialize restart pastel-d service")
}

return runServices(ctx, chainReorgService, cleanerService, collectionService, fingerprintService, pastelBlockService, restartPastelDService, logRotationService)
metricsCleanupSerivce, err := metricscleanup.NewMetricsCleanupService(s.hDB)
if err != nil {
log.WithContext(ctx).WithError(err).Error("unable to initialize restart pastel-d service")
}

return runServices(ctx, chainReorgService, cleanerService, collectionService, fingerprintService, pastelBlockService, restartPastelDService, logRotationService, metricsCleanupSerivce)
}

func runServices(ctx context.Context, services ...service2.SvcInterface) error {
Expand Down Expand Up @@ -124,7 +132,7 @@ func (s *service) Stats(ctx context.Context) (map[string]interface{}, error) {
}

// NewService returns a new ddscan service
func NewService(config *Config, pastelClient pastel.Client, sn node.SNClientInterface) (service2.SvcInterface, error) {
func NewService(config *Config, pastelClient pastel.Client, sn node.SNClientInterface, hDB queries.LocalStoreInterface) (service2.SvcInterface, error) {
store, err := store.NewSQLiteStore(config.DataFile)
if err != nil {
return nil, fmt.Errorf("unable to initialise database: %w", err)
Expand All @@ -148,5 +156,6 @@ func NewService(config *Config, pastelClient pastel.Client, sn node.SNClientInte
store: store,
sn: sn,
p2p: p2p,
hDB: hDB,
}, nil
}

0 comments on commit e5114e1

Please sign in to comment.