Skip to content

Commit

Permalink
Merge pull request #742 from pastelnetwork/PSL-1091_updateWatchlistWo…
Browse files Browse the repository at this point in the history
…rker

[PSL-1091] implement update watchlist worker
  • Loading branch information
j-rafique authored Dec 11, 2023
2 parents 512457a + 4fdb522 commit 4c4416e
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 32 deletions.
1 change: 1 addition & 0 deletions common/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type LocalStoreInterface interface {
QuerySelfHealingChallenges() (challenges []types.SelfHealingChallenge, err error)
UpsertPingHistory(pingInfo types.PingInfo) error
GetPingInfoBySupernodeID(supernodeID string) (*types.PingInfo, error)
GetAllPingInfos() (types.PingInfos, error)
GetWatchlistPingInfo() ([]types.PingInfo, error)
UpdatePingInfo(supernodeID string) error
CloseHistoryDB(ctx context.Context)
Expand Down
36 changes: 36 additions & 0 deletions common/storage/local/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,42 @@ func (s *SQLiteStore) InsertSelfHealingChallenge(challenge types.SelfHealingChal
return int(id), nil
}

// GetAllPingInfos retrieves all ping infos
func (s *SQLiteStore) GetAllPingInfos() (types.PingInfos, error) {
const selectQuery = `
SELECT id, supernode_id, ip_address, total_pings, total_successful_pings,
avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen,
created_at, updated_at
FROM ping_history
`
rows, err := s.db.Query(selectQuery)
if err != nil {
return nil, err
}
defer rows.Close()

var pingInfos types.PingInfos
for rows.Next() {

var pingInfo types.PingInfo
if err := rows.Scan(
&pingInfo.ID, &pingInfo.SupernodeID, &pingInfo.IPAddress, &pingInfo.TotalPings,
&pingInfo.TotalSuccessfulPings, &pingInfo.AvgPingResponseTime,
&pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen,
&pingInfo.CreatedAt, &pingInfo.UpdatedAt,
); err != nil {
return nil, err
}
pingInfos = append(pingInfos, pingInfo)
}

if err = rows.Err(); err != nil {
return nil, err
}

return pingInfos, nil
}

// CleanupSelfHealingChallenges cleans up self-healing challenges stored in DB for inspection
func (s *SQLiteStore) CleanupSelfHealingChallenges() (err error) {
const delQuery = "DELETE FROM self_healing_challenges"
Expand Down
3 changes: 2 additions & 1 deletion common/types/self_healing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type PingInfo struct {
IsOnline bool `db:"is_online"`
IsOnWatchlist bool `db:"is_on_watchlist"`
IsAdjusted bool `db:"is_adjusted"`
CumulativeResponseTime float64 `json:"cumulative_response_time"`
CumulativeResponseTime float64 `db:"cumulative_response_time"`
LastSeen sql.NullTime `db:"last_seen"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
LastResponseTime float64
}

// PingInfos represents array of ping info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing pastel.MasterNode
Error("error pinging sn")

pi := types.PingInfo{
SupernodeID: node.ExtKey,
IPAddress: node.ExtAddress,
IsOnline: false,
IsAdjusted: false,
AvgPingResponseTime: 0.0,
SupernodeID: node.ExtKey,
IPAddress: node.ExtAddress,
IsOnline: false,
IsAdjusted: false,
LastResponseTime: 0.0,
}

if err := task.StorePingInfo(ctx, pi); err != nil {
Expand All @@ -104,12 +104,12 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing pastel.MasterNode
}

pi := types.PingInfo{
SupernodeID: node.ExtKey,
IPAddress: node.ExtAddress,
IsOnline: res.IsOnline,
IsAdjusted: false,
AvgPingResponseTime: respondedAt.Sub(timeBeforePing).Seconds(),
LastSeen: sql.NullTime{Time: respondedAt, Valid: true},
SupernodeID: node.ExtKey,
IPAddress: node.ExtAddress,
IsOnline: res.IsOnline,
IsAdjusted: false,
LastResponseTime: respondedAt.Sub(timeBeforePing).Seconds(),
LastSeen: sql.NullTime{Time: respondedAt, Valid: true},
}

if err := task.StorePingInfo(ctx, pi); err != nil {
Expand Down Expand Up @@ -229,15 +229,6 @@ func GetPingInfoToInsert(existedInfo, info *types.PingInfo) *types.PingInfo {

if !existedInfo.LastSeen.Valid { //for the first row
info.LastSeen = sql.NullTime{Time: time.Now().UTC(), Valid: true}
} else {
twentyMinutesAgo := time.Now().UTC().Add(-20 * time.Minute)

// Check if the timestamp is before 20 minutes ago
if existedInfo.LastSeen.Time.Before(twentyMinutesAgo) {
info.IsOnWatchlist = true
} else {
info.IsOnWatchlist = false
}
}

if info.IsOnline {
Expand All @@ -249,7 +240,7 @@ func GetPingInfoToInsert(existedInfo, info *types.PingInfo) *types.PingInfo {
info.LastSeen = existedInfo.LastSeen
}

info.CumulativeResponseTime = existedInfo.CumulativeResponseTime + info.AvgPingResponseTime
info.CumulativeResponseTime = existedInfo.CumulativeResponseTime + info.LastResponseTime

var avgPingResponseTime float64
if info.TotalSuccessfulPings != 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestGetInfoToInsert(t *testing.T) {
{
testCase: "when node is offline, last seen should be the same as existed record",
existedInfo: &types.PingInfo{TotalPings: 2, TotalSuccessfulPings: 2, LastSeen: sql.NullTime{Time: now, Valid: true}},
receivedInfo: &types.PingInfo{TotalPings: 0, IsOnline: false},
receivedInfo: &types.PingInfo{TotalPings: 0, IsOnline: false, LastResponseTime: 0},
expectedInfo: &types.PingInfo{TotalPings: 3, TotalSuccessfulPings: 2, IsOnline: false, LastSeen: sql.NullTime{Time: now, Valid: true}},
},
{
Expand All @@ -53,14 +53,8 @@ func TestGetInfoToInsert(t *testing.T) {
{
testCase: "when node is online, cumulative response time should add the last ping time, and avg response time should be recalculated",
existedInfo: &types.PingInfo{TotalPings: 2, TotalSuccessfulPings: 2, LastSeen: sql.NullTime{Time: now, Valid: true}, CumulativeResponseTime: 20 * time.Second.Seconds(), AvgPingResponseTime: 24},
receivedInfo: &types.PingInfo{TotalPings: 0, IsOnline: true, AvgPingResponseTime: 4.0, LastSeen: sql.NullTime{Time: now, Valid: true}},
expectedInfo: &types.PingInfo{TotalPings: 3, TotalSuccessfulPings: 3, IsOnline: true, AvgPingResponseTime: 8, LastSeen: sql.NullTime{Time: now, Valid: true}, CumulativeResponseTime: 24 * time.Second.Seconds()},
},
{
testCase: "when node is offline for more than 20 minutes, is_on_watchlist should be true",
existedInfo: &types.PingInfo{TotalPings: 2, TotalSuccessfulPings: 2, LastSeen: sql.NullTime{Time: now.Add(-21 * time.Minute), Valid: true}, CumulativeResponseTime: now.Sub(now.Add(-20 * time.Second)).Seconds(), AvgPingResponseTime: 24},
receivedInfo: &types.PingInfo{TotalPings: 0, IsOnline: false, AvgPingResponseTime: 0.0},
expectedInfo: &types.PingInfo{TotalPings: 3, TotalSuccessfulPings: 2, IsOnline: false, AvgPingResponseTime: 10, LastSeen: sql.NullTime{Time: now.Add(-21 * time.Minute), Valid: true}, CumulativeResponseTime: now.Sub(now.Add(-20 * time.Second)).Seconds(), IsOnWatchlist: true},
receivedInfo: &types.PingInfo{TotalPings: 0, IsOnline: true, LastResponseTime: 4.0, LastSeen: sql.NullTime{Time: now, Valid: true}},
expectedInfo: &types.PingInfo{TotalPings: 3, TotalSuccessfulPings: 3, IsOnline: true, AvgPingResponseTime: 8, LastSeen: sql.NullTime{Time: now, Valid: true}, CumulativeResponseTime: 24 * time.Second.Seconds(), LastResponseTime: 4},
},
}

Expand Down
21 changes: 20 additions & 1 deletion supernode/services/selfhealing/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
defaultTimerBlockCheckDuration = 5 * time.Minute
defaultFetchNodesPingInfoInterval = 60 * time.Second
defaultUpdateWatchlistInterval = 70 * time.Second
)

// SHService keeps track of the supernode's nodeID and passes this, the pastel client,
Expand Down Expand Up @@ -60,7 +61,23 @@ func (service *SHService) RunFetchNodesPingInfoWorker(ctx context.Context) {
task.FetchAndMaintainPingInfo(newCtx)

case <-ctx.Done():
log.Println("Context done being called in local keys fetch worker in service.go")
log.Println("Context done being called in fetch & maintain ping info worker")
return
}
}
}

// RunUpdateWatchlistWorker : This worker will periodically fetch and maintain the ping info and update watchlist field
func (service *SHService) RunUpdateWatchlistWorker(ctx context.Context) {
for {
select {
case <-time.After(defaultUpdateWatchlistInterval):
newCtx := context.Background()
task := service.NewSHTask()
task.UpdateWatchlist(newCtx)

case <-ctx.Done():
log.Println("Context done being called in update watchlist worker")
return
}
}
Expand All @@ -78,6 +95,8 @@ func (service *SHService) Run(ctx context.Context) error {

go service.RunFetchNodesPingInfoWorker(ctx)

go service.RunUpdateWatchlistWorker(ctx)

for {
select {
case <-time.After(defaultTimerBlockCheckDuration):
Expand Down
104 changes: 104 additions & 0 deletions supernode/services/selfhealing/update_watchlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package selfhealing

import (
"context"
"database/sql"
"github.com/pastelnetwork/gonode/common/errors"
"github.com/pastelnetwork/gonode/common/log"
"github.com/pastelnetwork/gonode/common/storage/local"
"github.com/pastelnetwork/gonode/common/types"
"time"
)

// UpdateWatchlist fetch and update the nodes on watchlist
func (task *SHTask) UpdateWatchlist(ctx context.Context) error {
log.WithContext(ctx).Infoln("Update Watchlist worker invoked")

pingInfos, err := task.GetAllPingInfo(ctx)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error retrieving ping infos")
}

for _, info := range pingInfos {

if task.shouldUpdateWatchlistField(info) {
info.IsOnWatchlist = true

if err := task.UpsertPingHistory(ctx, info); err != nil {
log.WithContext(ctx).
WithField("supernode_id", info.SupernodeID).
WithError(err).
Error("error upserting ping history")
}

}
}

return nil
}

// GetAllPingInfo get all the ping info from db
func (task *SHTask) GetAllPingInfo(ctx context.Context) (types.PingInfos, error) {
store, err := local.OpenHistoryDB()
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error Opening DB")
return nil, err
}

var infos types.PingInfos

if store != nil {
defer store.CloseHistoryDB(ctx)

infos, err = store.GetAllPingInfos()
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}

log.WithContext(ctx).
Error("error retrieving ping histories")

return nil, err
}
}

return infos, nil
}

// UpsertPingHistory upsert the ping info to db
func (task *SHTask) UpsertPingHistory(ctx context.Context, info types.PingInfo) error {
store, err := local.OpenHistoryDB()
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error Opening DB")
return err
}

if store != nil {
defer store.CloseHistoryDB(ctx)

err = store.UpsertPingHistory(info)
if err != nil {
log.WithContext(ctx).
WithField("supernode_id", info.SupernodeID).
Error("error upserting ping history")

return err
}

}

return nil
}

func (task *SHTask) shouldUpdateWatchlistField(info types.PingInfo) bool {
twentyMinutesAgo := time.Now().UTC().Add(-20 * time.Minute)

// Check if the timestamp is before 20 minutes ago
if info.LastSeen.Time.Before(twentyMinutesAgo) {
return true
}

return false

}

0 comments on commit 4c4416e

Please sign in to comment.