Skip to content

Commit

Permalink
[PSL-1100] resolved issues on ping nodes and update watchlist worker
Browse files Browse the repository at this point in the history
  • Loading branch information
j-rafique committed Dec 18, 2023
1 parent d987ef3 commit 0efab65
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 38 deletions.
2 changes: 1 addition & 1 deletion bridge/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect
golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/term v0.12.0 // indirect
Expand Down
24 changes: 16 additions & 8 deletions common/storage/local/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ const createPingHistory string = `
updated_at DATETIME NOT NULL
);`

const createPingHistoryUniqueIndex string = `
CREATE UNIQUE INDEX IF NOT EXISTS ping_history_unique ON ping_history(supernode_id, ip_address);
`

const (
historyDBName = "history.db"
emptyString = ""
Expand Down Expand Up @@ -188,8 +192,8 @@ func (s *SQLiteStore) UpsertPingHistory(pingInfo types.PingInfo) error {
is_online = excluded.is_online,
is_on_watchlist = excluded.is_on_watchlist,
is_adjusted = excluded.is_adjusted,
last_seen = excluded.last_seen
cumulative_response_time = excluded.cumulative_response_time
last_seen = excluded.last_seen,
cumulative_response_time = excluded.cumulative_response_time,
updated_at = excluded.updated_at;`

_, err := s.db.Exec(upsertQuery,
Expand All @@ -207,7 +211,7 @@ func (s *SQLiteStore) UpsertPingHistory(pingInfo types.PingInfo) error {
func (s *SQLiteStore) GetPingInfoBySupernodeID(supernodeID string) (*types.PingInfo, error) {
const selectQuery = `
SELECT id, supernode_id, ip_address, total_pings, total_successful_pings,
avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen,
avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen, cumulative_response_time,
created_at, updated_at
FROM ping_history
WHERE supernode_id = ?;`
Expand All @@ -219,7 +223,7 @@ func (s *SQLiteStore) GetPingInfoBySupernodeID(supernodeID string) (*types.PingI
err := row.Scan(
&pingInfo.ID, &pingInfo.SupernodeID, &pingInfo.IPAddress, &pingInfo.TotalPings,
&pingInfo.TotalSuccessfulPings, &pingInfo.AvgPingResponseTime,
&pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen,
&pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen, &pingInfo.CumulativeResponseTime,
&pingInfo.CreatedAt, &pingInfo.UpdatedAt,
)

Expand All @@ -234,7 +238,7 @@ func (s *SQLiteStore) GetPingInfoBySupernodeID(supernodeID string) (*types.PingI
func (s *SQLiteStore) GetWatchlistPingInfo() ([]types.PingInfo, error) {
const selectQuery = `
SELECT id, supernode_id, ip_address, total_pings, total_successful_pings,
avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen,
avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen, cumulative_response_time,
created_at, updated_at
FROM ping_history
WHERE is_on_watchlist = true AND is_adjusted = false;`
Expand All @@ -251,7 +255,7 @@ func (s *SQLiteStore) GetWatchlistPingInfo() ([]types.PingInfo, error) {
if err := rows.Scan(
&pingInfo.ID, &pingInfo.SupernodeID, &pingInfo.IPAddress, &pingInfo.TotalPings,
&pingInfo.TotalSuccessfulPings, &pingInfo.AvgPingResponseTime,
&pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen,
&pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen, &pingInfo.CumulativeResponseTime,
&pingInfo.CreatedAt, &pingInfo.UpdatedAt,
); err != nil {
return nil, err
Expand Down Expand Up @@ -362,7 +366,7 @@ func (s *SQLiteStore) InsertSelfHealingChallenge(challenge types.SelfHealingChal
func (s *SQLiteStore) GetAllPingInfos() (types.PingInfos, error) {
const selectQuery = `
SELECT id, supernode_id, ip_address, total_pings, total_successful_pings,
avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen,
avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen, cumulative_response_time,
created_at, updated_at
FROM ping_history
`
Expand All @@ -379,7 +383,7 @@ func (s *SQLiteStore) GetAllPingInfos() (types.PingInfos, error) {
if err := rows.Scan(
&pingInfo.ID, &pingInfo.SupernodeID, &pingInfo.IPAddress, &pingInfo.TotalPings,
&pingInfo.TotalSuccessfulPings, &pingInfo.AvgPingResponseTime,
&pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen,
&pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen, &pingInfo.CumulativeResponseTime,
&pingInfo.CreatedAt, &pingInfo.UpdatedAt,
); err != nil {
return nil, err
Expand Down Expand Up @@ -433,6 +437,10 @@ func OpenHistoryDB() (storage.LocalStoreInterface, error) {
return nil, fmt.Errorf("cannot create table(s): %w", err)
}

if _, err := db.Exec(createPingHistoryUniqueIndex); err != nil {
return nil, fmt.Errorf("cannot create table(s): %w", err)
}

_, _ = db.Exec(alterTaskHistory)

pragmas := []string{
Expand Down
2 changes: 1 addition & 1 deletion common/types/self_healing.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type PingInfo struct {
LastSeen sql.NullTime `db:"last_seen"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
LastResponseTime float64
LastResponseTime float64 `db:"-"`
}

// PingInfos represents array of ping info
Expand Down
2 changes: 1 addition & 1 deletion mixins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/stretchr/testify v1.8.4 // indirect
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect
golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/term v0.12.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions supernode/node/grpc/server/services/supernode/self_healing.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ func (service *SelfHealingChallengeGRPC) Desc() *grpc.ServiceDesc {

// Ping is the server side of self-healing challenge processing GRPC comms
func (service *SelfHealingChallengeGRPC) Ping(ctx context.Context, pingReq *pb.PingRequest) (*pb.PingResponse, error) {
log.WithContext(ctx).WithField("req", pingReq).Info("ping request received at the server side")

task := service.NewSHTask()
res, err := task.Ping(ctx, pingReq)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions supernode/services/common/reg_task_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ func (h *RegTaskHelper) verifyTxn(ctx context.Context,
txn *pastel.GetRawTransactionVerbose1Result, totalAmt float64, percent float64) error {
inRange := func(val float64, reqVal float64, slackPercent float64) bool {
lower := reqVal - (reqVal * slackPercent / 100)
upper := reqVal + (reqVal * slackPercent / 100)
//upper := reqVal + (reqVal * slackPercent / 100)

return val >= lower && val <= upper
return val >= lower
}

log.WithContext(ctx).Debug("Verifying Burn Txn")
Expand Down
24 changes: 21 additions & 3 deletions supernode/services/selfhealing/generate_self_healing.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (task *SHTask) GenerateSelfHealingChallenge(ctx context.Context) error {
selfHealingTicketsMap := task.identifySelfHealingTickets(ctx, watchlistPingInfos, mapOfClosestNodesAgainstKeys, symbolFileKeyMap)
log.WithContext(ctx).WithField("self_healing_tickets", selfHealingTicketsMap).Info("self-healing tickets have been identified")

challengeRecipientMap, err := task.identifyChallengeRecipients(ctx, selfHealingTicketsMap)
challengeRecipientMap, err := task.identifyChallengeRecipients(ctx, selfHealingTicketsMap, watchlistPingInfos)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error identifying challenge recipients")
}
Expand Down Expand Up @@ -287,7 +287,7 @@ func (task *SHTask) isOnWatchlist(nodeID string, watchlistPingInfos types.PingIn
return false
}

func (task *SHTask) identifyChallengeRecipients(ctx context.Context, selfHealingTicketsMap map[string]SymbolFileKeyDetails) (map[string][]SymbolFileKeyDetails, error) {
func (task *SHTask) identifyChallengeRecipients(ctx context.Context, selfHealingTicketsMap map[string]SymbolFileKeyDetails, watchlist types.PingInfos) (map[string][]SymbolFileKeyDetails, error) {
challengeRecipientMap := make(map[string][]SymbolFileKeyDetails)
for txID, ticketDetails := range selfHealingTicketsMap {
logger := log.WithContext(ctx).WithField("TxID", txID)
Expand All @@ -309,7 +309,7 @@ func (task *SHTask) identifyChallengeRecipients(ctx context.Context, selfHealing
logger.WithError(err).Error("unable to retrieve list of supernodes")
}

challengeRecipients := task.SHService.GetNClosestSupernodeIDsToComparisonString(ctx, 1, string(dataHash), listOfSupernodes)
challengeRecipients := task.SHService.GetNClosestSupernodeIDsToComparisonString(ctx, 1, string(dataHash), task.filterWatchlistAndCurrentNode(watchlist, listOfSupernodes))
if len(challengeRecipients) < 1 {
log.WithContext(ctx).WithField("file_hash", dataHash).Info("no closest nodes have found against the file")
continue
Expand Down Expand Up @@ -621,3 +621,21 @@ func (service *SHService) getListOfSupernode(ctx context.Context) ([]string, err

return ret, nil
}

func (task *SHTask) filterWatchlistAndCurrentNode(watchList types.PingInfos, listOfSupernodes []string) []string {
var filteredList []string

for _, nodeID := range listOfSupernodes {
if task.isOnWatchlist(nodeID, watchList) {
continue
}

if task.nodeID == nodeID {
continue
}

filteredList = append(filteredList, nodeID)
}

return filteredList
}
41 changes: 27 additions & 14 deletions supernode/services/selfhealing/ping_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ func (task *SHTask) getNodesAddressesToConnect(ctx context.Context) ([]pastel.Ma
for _, mn := range supernodes {
if mn.ExtAddress == "" || mn.ExtKey == "" {
log.WithContext(ctx).WithField("method", "FetchAndMaintainPingInfo").
WithField("node_id", mn.ExtKey).Warn("node address or node id is empty")
WithField("node_id", mn.ExtKey).Debug("node address or node id is empty")
continue
}

if mn.ExtKey == task.nodeID {
continue
}

Expand Down Expand Up @@ -81,7 +84,7 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing pastel.MasterNode
res, err := task.ping(ctx, req, node.ExtAddress)
if err != nil {
log.WithContext(ctx).WithField("sn_address", node.ExtAddress).WithError(err).
Error("error pinging sn")
Debug("error pinging sn")

pi := types.PingInfo{
SupernodeID: node.ExtKey,
Expand All @@ -91,27 +94,29 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing pastel.MasterNode
}

if err := task.StorePingInfo(ctx, pi); err != nil {
log.WithContext(ctx).WithError(err).Error("error storing ping info")
log.WithContext(ctx).WithField("supernode_id", pi.SupernodeID).
WithError(err).
Error("error storing ping info")
}

return
}

respondedAt, err := time.Parse(UTCTimeLayout, res.RespondedAt)
if err != nil {
log.WithContext(ctx).WithError(err)
}
lastSeen := time.Now().UTC()
responseTime := lastSeen.Sub(timeBeforePing).Abs().Seconds()

pi := types.PingInfo{
SupernodeID: node.ExtKey,
IPAddress: node.ExtAddress,
IsOnline: res.IsOnline,
LastResponseTime: respondedAt.Sub(timeBeforePing).Seconds(),
LastSeen: sql.NullTime{Time: respondedAt, Valid: true},
LastSeen: sql.NullTime{Time: lastSeen, Valid: true},
LastResponseTime: responseTime,
}

if err := task.StorePingInfo(ctx, pi); err != nil {
log.WithContext(ctx).WithError(err).Error("error storing ping info")
log.WithContext(ctx).WithField("supernode_id", pi.SupernodeID).
WithError(err).
Error("error storing ping info")
}
}()
}
Expand All @@ -123,7 +128,7 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing pastel.MasterNode

// ping just pings the given node address
func (task *SHTask) ping(ctx context.Context, req *pb.PingRequest, supernodeAddr string) (*pb.PingResponse, error) {
log.WithContext(ctx).Info("pinging supernode: " + supernodeAddr)
log.WithContext(ctx).Debug("pinging supernode: " + supernodeAddr)

// Create a context with timeout
pingCtx, cancel := context.WithTimeout(ctx, timeoutDuration)
Expand All @@ -133,7 +138,6 @@ func (task *SHTask) ping(ctx context.Context, req *pb.PingRequest, supernodeAddr
nodeClientConn, err := task.nodeClient.Connect(pingCtx, supernodeAddr)
if err != nil {
err = fmt.Errorf("could not use node client to connect to: %s, error: %v", supernodeAddr, err)
log.WithContext(ctx).Warn(err.Error())
return nil, err
}
defer nodeClientConn.Close()
Expand Down Expand Up @@ -212,6 +216,10 @@ func (task *SHTask) GetPingInfoFromDB(ctx context.Context, supernodeID string) (

return nil, err
}

if info == nil {
return &types.PingInfo{}, nil
}
}

return info, nil
Expand All @@ -225,7 +233,7 @@ func GetPingInfoToInsert(existedInfo, info *types.PingInfo) *types.PingInfo {

info.TotalPings = existedInfo.TotalPings + 1

if !existedInfo.LastSeen.Valid { //for the first row
if existedInfo.LastSeen.Time.IsZero() || !(existedInfo.LastSeen.Valid) { //for the first row
info.LastSeen = sql.NullTime{Time: time.Now().UTC(), Valid: true}
}

Expand All @@ -237,9 +245,14 @@ func GetPingInfoToInsert(existedInfo, info *types.PingInfo) *types.PingInfo {

if !info.IsOnline {
info.TotalSuccessfulPings = existedInfo.TotalSuccessfulPings
info.LastSeen = existedInfo.LastSeen
info.IsOnWatchlist = existedInfo.IsOnWatchlist
info.IsAdjusted = existedInfo.IsAdjusted

if existedInfo.LastSeen.Time.IsZero() || !(existedInfo.LastSeen.Valid) { //for the first row
info.LastSeen = sql.NullTime{Time: time.Now().UTC(), Valid: true}
} else {
info.LastSeen = existedInfo.LastSeen
}
}

info.CumulativeResponseTime = existedInfo.CumulativeResponseTime + info.LastResponseTime
Expand Down
4 changes: 2 additions & 2 deletions supernode/services/selfhealing/ping_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestGetInfoToInsert(t *testing.T) {
}{
{
testCase: "when node is offline, should increment total ping only",
existedInfo: &types.PingInfo{TotalPings: 2, TotalSuccessfulPings: 2},
existedInfo: &types.PingInfo{TotalPings: 2, TotalSuccessfulPings: 2, LastSeen: sql.NullTime{Time: now, Valid: true}},
receivedInfo: &types.PingInfo{TotalPings: 0, IsOnline: false},
expectedInfo: &types.PingInfo{TotalPings: 3, TotalSuccessfulPings: 2, IsOnline: false},
expectedInfo: &types.PingInfo{TotalPings: 3, TotalSuccessfulPings: 2, IsOnline: false, LastSeen: sql.NullTime{Time: now, Valid: true}},
},
{
testCase: "when node is online, should increment total ping & total successful pings",
Expand Down
2 changes: 1 addition & 1 deletion supernode/services/selfhealing/process_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// Ping acknowledges the received message and return with the timestamp
func (task *SHTask) Ping(ctx context.Context, _ *pb.PingRequest) (*pb.PingResponse, error) {
log.WithContext(ctx).WithField("node_id", task.nodeID).Println("ping request received at the server")
log.WithContext(ctx).WithField("node_id", task.nodeID).Debug("ping request received at the server")

return &pb.PingResponse{
ReceiverId: task.nodeID,
Expand Down
6 changes: 3 additions & 3 deletions supernode/services/selfhealing/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func (service *SHService) Run(ctx context.Context) error {
for {
select {
case <-time.After(defaultTimerBlockCheckDuration):
newCtx := context.Background()
task := service.NewSHTask()
task.GenerateSelfHealingChallenge(newCtx)
//newCtx := context.Background()
//task := service.NewSHTask()
//task.GenerateSelfHealingChallenge(newCtx)

log.WithContext(ctx).Debug("Would normally invoke a self-healing worker")
case <-ctx.Done():
Expand Down
1 change: 1 addition & 0 deletions supernode/services/selfhealing/update_watchlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (task *SHTask) UpdateWatchlist(ctx context.Context) error {
info.IsOnWatchlist = true
info.IsAdjusted = false

log.WithContext(ctx).WithField("supernode_id", info.SupernodeID).Info("updating watchlist flag")
if err := task.UpsertPingHistory(ctx, info); err != nil {
log.WithContext(ctx).
WithField("supernode_id", info.SupernodeID).
Expand Down

0 comments on commit 0efab65

Please sign in to comment.