diff --git a/bridge/go.mod b/bridge/go.mod index 36df4c94f..a7e7f4ac8 100644 --- a/bridge/go.mod +++ b/bridge/go.mod @@ -62,7 +62,7 @@ require ( github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect golang.org/x/crypto v0.11.0 // indirect - golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect + golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.12.0 // indirect golang.org/x/term v0.12.0 // indirect diff --git a/common/storage/local/sqlite.go b/common/storage/local/sqlite.go index 48cf8d39d..9a9849650 100644 --- a/common/storage/local/sqlite.go +++ b/common/storage/local/sqlite.go @@ -86,6 +86,10 @@ const createPingHistory string = ` updated_at DATETIME NOT NULL );` +const createPingHistoryUniqueIndex string = ` +CREATE UNIQUE INDEX IF NOT EXISTS ping_history_unique ON ping_history(supernode_id, ip_address); +` + const ( historyDBName = "history.db" emptyString = "" @@ -188,8 +192,8 @@ func (s *SQLiteStore) UpsertPingHistory(pingInfo types.PingInfo) error { is_online = excluded.is_online, is_on_watchlist = excluded.is_on_watchlist, is_adjusted = excluded.is_adjusted, - last_seen = excluded.last_seen - cumulative_response_time = excluded.cumulative_response_time + last_seen = excluded.last_seen, + cumulative_response_time = excluded.cumulative_response_time, updated_at = excluded.updated_at;` _, err := s.db.Exec(upsertQuery, @@ -207,7 +211,7 @@ func (s *SQLiteStore) UpsertPingHistory(pingInfo types.PingInfo) error { func (s *SQLiteStore) GetPingInfoBySupernodeID(supernodeID string) (*types.PingInfo, error) { const selectQuery = ` SELECT id, supernode_id, ip_address, total_pings, total_successful_pings, - avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen, + avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen, cumulative_response_time, created_at, updated_at FROM ping_history WHERE supernode_id = ?;` @@ -219,7 +223,7 @@ func (s *SQLiteStore) GetPingInfoBySupernodeID(supernodeID string) (*types.PingI err := row.Scan( &pingInfo.ID, &pingInfo.SupernodeID, &pingInfo.IPAddress, &pingInfo.TotalPings, &pingInfo.TotalSuccessfulPings, &pingInfo.AvgPingResponseTime, - &pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen, + &pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen, &pingInfo.CumulativeResponseTime, &pingInfo.CreatedAt, &pingInfo.UpdatedAt, ) @@ -234,7 +238,7 @@ func (s *SQLiteStore) GetPingInfoBySupernodeID(supernodeID string) (*types.PingI func (s *SQLiteStore) GetWatchlistPingInfo() ([]types.PingInfo, error) { const selectQuery = ` SELECT id, supernode_id, ip_address, total_pings, total_successful_pings, - avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen, + avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen, cumulative_response_time, created_at, updated_at FROM ping_history WHERE is_on_watchlist = true AND is_adjusted = false;` @@ -251,7 +255,7 @@ func (s *SQLiteStore) GetWatchlistPingInfo() ([]types.PingInfo, error) { if err := rows.Scan( &pingInfo.ID, &pingInfo.SupernodeID, &pingInfo.IPAddress, &pingInfo.TotalPings, &pingInfo.TotalSuccessfulPings, &pingInfo.AvgPingResponseTime, - &pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen, + &pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen, &pingInfo.CumulativeResponseTime, &pingInfo.CreatedAt, &pingInfo.UpdatedAt, ); err != nil { return nil, err @@ -362,7 +366,7 @@ func (s *SQLiteStore) InsertSelfHealingChallenge(challenge types.SelfHealingChal func (s *SQLiteStore) GetAllPingInfos() (types.PingInfos, error) { const selectQuery = ` SELECT id, supernode_id, ip_address, total_pings, total_successful_pings, - avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen, + avg_ping_response_time, is_online, is_on_watchlist, is_adjusted, last_seen, cumulative_response_time, created_at, updated_at FROM ping_history ` @@ -379,7 +383,7 @@ func (s *SQLiteStore) GetAllPingInfos() (types.PingInfos, error) { if err := rows.Scan( &pingInfo.ID, &pingInfo.SupernodeID, &pingInfo.IPAddress, &pingInfo.TotalPings, &pingInfo.TotalSuccessfulPings, &pingInfo.AvgPingResponseTime, - &pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen, + &pingInfo.IsOnline, &pingInfo.IsOnWatchlist, &pingInfo.IsAdjusted, &pingInfo.LastSeen, &pingInfo.CumulativeResponseTime, &pingInfo.CreatedAt, &pingInfo.UpdatedAt, ); err != nil { return nil, err @@ -433,6 +437,10 @@ func OpenHistoryDB() (storage.LocalStoreInterface, error) { return nil, fmt.Errorf("cannot create table(s): %w", err) } + if _, err := db.Exec(createPingHistoryUniqueIndex); err != nil { + return nil, fmt.Errorf("cannot create table(s): %w", err) + } + _, _ = db.Exec(alterTaskHistory) pragmas := []string{ diff --git a/common/types/self_healing.go b/common/types/self_healing.go index 323a894cd..1aa4fcede 100644 --- a/common/types/self_healing.go +++ b/common/types/self_healing.go @@ -44,7 +44,7 @@ type PingInfo struct { LastSeen sql.NullTime `db:"last_seen"` CreatedAt time.Time `db:"created_at"` UpdatedAt time.Time `db:"updated_at"` - LastResponseTime float64 + LastResponseTime float64 `db:"-"` } // PingInfos represents array of ping info diff --git a/mixins/go.mod b/mixins/go.mod index 7c288f5d5..56bbb5614 100644 --- a/mixins/go.mod +++ b/mixins/go.mod @@ -41,7 +41,7 @@ require ( github.com/stretchr/testify v1.8.4 // indirect github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect golang.org/x/crypto v0.11.0 // indirect - golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect + golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d // indirect golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.12.0 // indirect golang.org/x/term v0.12.0 // indirect diff --git a/supernode/node/grpc/server/services/supernode/self_healing.go b/supernode/node/grpc/server/services/supernode/self_healing.go index 321e7eb9b..5acde9b79 100644 --- a/supernode/node/grpc/server/services/supernode/self_healing.go +++ b/supernode/node/grpc/server/services/supernode/self_healing.go @@ -93,8 +93,6 @@ func (service *SelfHealingChallengeGRPC) Desc() *grpc.ServiceDesc { // Ping is the server side of self-healing challenge processing GRPC comms func (service *SelfHealingChallengeGRPC) Ping(ctx context.Context, pingReq *pb.PingRequest) (*pb.PingResponse, error) { - log.WithContext(ctx).WithField("req", pingReq).Info("ping request received at the server side") - task := service.NewSHTask() res, err := task.Ping(ctx, pingReq) if err != nil { diff --git a/supernode/services/common/reg_task_helper.go b/supernode/services/common/reg_task_helper.go index 6966eeb16..88b9ba048 100644 --- a/supernode/services/common/reg_task_helper.go +++ b/supernode/services/common/reg_task_helper.go @@ -248,9 +248,9 @@ func (h *RegTaskHelper) verifyTxn(ctx context.Context, txn *pastel.GetRawTransactionVerbose1Result, totalAmt float64, percent float64) error { inRange := func(val float64, reqVal float64, slackPercent float64) bool { lower := reqVal - (reqVal * slackPercent / 100) - upper := reqVal + (reqVal * slackPercent / 100) + //upper := reqVal + (reqVal * slackPercent / 100) - return val >= lower && val <= upper + return val >= lower } log.WithContext(ctx).Debug("Verifying Burn Txn") diff --git a/supernode/services/selfhealing/generate_self_healing.go b/supernode/services/selfhealing/generate_self_healing.go index ff7cd6f90..f945e3e31 100644 --- a/supernode/services/selfhealing/generate_self_healing.go +++ b/supernode/services/selfhealing/generate_self_healing.go @@ -73,7 +73,7 @@ func (task *SHTask) GenerateSelfHealingChallenge(ctx context.Context) error { selfHealingTicketsMap := task.identifySelfHealingTickets(ctx, watchlistPingInfos, mapOfClosestNodesAgainstKeys, symbolFileKeyMap) log.WithContext(ctx).WithField("self_healing_tickets", selfHealingTicketsMap).Info("self-healing tickets have been identified") - challengeRecipientMap, err := task.identifyChallengeRecipients(ctx, selfHealingTicketsMap) + challengeRecipientMap, err := task.identifyChallengeRecipients(ctx, selfHealingTicketsMap, watchlistPingInfos) if err != nil { log.WithContext(ctx).WithError(err).Error("error identifying challenge recipients") } @@ -287,7 +287,7 @@ func (task *SHTask) isOnWatchlist(nodeID string, watchlistPingInfos types.PingIn return false } -func (task *SHTask) identifyChallengeRecipients(ctx context.Context, selfHealingTicketsMap map[string]SymbolFileKeyDetails) (map[string][]SymbolFileKeyDetails, error) { +func (task *SHTask) identifyChallengeRecipients(ctx context.Context, selfHealingTicketsMap map[string]SymbolFileKeyDetails, watchlist types.PingInfos) (map[string][]SymbolFileKeyDetails, error) { challengeRecipientMap := make(map[string][]SymbolFileKeyDetails) for txID, ticketDetails := range selfHealingTicketsMap { logger := log.WithContext(ctx).WithField("TxID", txID) @@ -309,7 +309,7 @@ func (task *SHTask) identifyChallengeRecipients(ctx context.Context, selfHealing logger.WithError(err).Error("unable to retrieve list of supernodes") } - challengeRecipients := task.SHService.GetNClosestSupernodeIDsToComparisonString(ctx, 1, string(dataHash), listOfSupernodes) + challengeRecipients := task.SHService.GetNClosestSupernodeIDsToComparisonString(ctx, 1, string(dataHash), task.filterWatchlistAndCurrentNode(watchlist, listOfSupernodes)) if len(challengeRecipients) < 1 { log.WithContext(ctx).WithField("file_hash", dataHash).Info("no closest nodes have found against the file") continue @@ -621,3 +621,21 @@ func (service *SHService) getListOfSupernode(ctx context.Context) ([]string, err return ret, nil } + +func (task *SHTask) filterWatchlistAndCurrentNode(watchList types.PingInfos, listOfSupernodes []string) []string { + var filteredList []string + + for _, nodeID := range listOfSupernodes { + if task.isOnWatchlist(nodeID, watchList) { + continue + } + + if task.nodeID == nodeID { + continue + } + + filteredList = append(filteredList, nodeID) + } + + return filteredList +} diff --git a/supernode/services/selfhealing/ping_nodes.go b/supernode/services/selfhealing/ping_nodes.go index c0e65fb56..1ce0f7416 100644 --- a/supernode/services/selfhealing/ping_nodes.go +++ b/supernode/services/selfhealing/ping_nodes.go @@ -48,8 +48,11 @@ func (task *SHTask) getNodesAddressesToConnect(ctx context.Context) ([]pastel.Ma for _, mn := range supernodes { if mn.ExtAddress == "" || mn.ExtKey == "" { log.WithContext(ctx).WithField("method", "FetchAndMaintainPingInfo"). - WithField("node_id", mn.ExtKey).Warn("node address or node id is empty") + WithField("node_id", mn.ExtKey).Debug("node address or node id is empty") + continue + } + if mn.ExtKey == task.nodeID { continue } @@ -81,7 +84,7 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing pastel.MasterNode res, err := task.ping(ctx, req, node.ExtAddress) if err != nil { log.WithContext(ctx).WithField("sn_address", node.ExtAddress).WithError(err). - Error("error pinging sn") + Debug("error pinging sn") pi := types.PingInfo{ SupernodeID: node.ExtKey, @@ -91,27 +94,29 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing pastel.MasterNode } if err := task.StorePingInfo(ctx, pi); err != nil { - log.WithContext(ctx).WithError(err).Error("error storing ping info") + log.WithContext(ctx).WithField("supernode_id", pi.SupernodeID). + WithError(err). + Error("error storing ping info") } return } - respondedAt, err := time.Parse(UTCTimeLayout, res.RespondedAt) - if err != nil { - log.WithContext(ctx).WithError(err) - } + lastSeen := time.Now().UTC() + responseTime := lastSeen.Sub(timeBeforePing).Abs().Seconds() pi := types.PingInfo{ SupernodeID: node.ExtKey, IPAddress: node.ExtAddress, IsOnline: res.IsOnline, - LastResponseTime: respondedAt.Sub(timeBeforePing).Seconds(), - LastSeen: sql.NullTime{Time: respondedAt, Valid: true}, + LastSeen: sql.NullTime{Time: lastSeen, Valid: true}, + LastResponseTime: responseTime, } if err := task.StorePingInfo(ctx, pi); err != nil { - log.WithContext(ctx).WithError(err).Error("error storing ping info") + log.WithContext(ctx).WithField("supernode_id", pi.SupernodeID). + WithError(err). + Error("error storing ping info") } }() } @@ -123,7 +128,7 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing pastel.MasterNode // ping just pings the given node address func (task *SHTask) ping(ctx context.Context, req *pb.PingRequest, supernodeAddr string) (*pb.PingResponse, error) { - log.WithContext(ctx).Info("pinging supernode: " + supernodeAddr) + log.WithContext(ctx).Debug("pinging supernode: " + supernodeAddr) // Create a context with timeout pingCtx, cancel := context.WithTimeout(ctx, timeoutDuration) @@ -133,7 +138,6 @@ func (task *SHTask) ping(ctx context.Context, req *pb.PingRequest, supernodeAddr nodeClientConn, err := task.nodeClient.Connect(pingCtx, supernodeAddr) if err != nil { err = fmt.Errorf("could not use node client to connect to: %s, error: %v", supernodeAddr, err) - log.WithContext(ctx).Warn(err.Error()) return nil, err } defer nodeClientConn.Close() @@ -212,6 +216,10 @@ func (task *SHTask) GetPingInfoFromDB(ctx context.Context, supernodeID string) ( return nil, err } + + if info == nil { + return &types.PingInfo{}, nil + } } return info, nil @@ -225,7 +233,7 @@ func GetPingInfoToInsert(existedInfo, info *types.PingInfo) *types.PingInfo { info.TotalPings = existedInfo.TotalPings + 1 - if !existedInfo.LastSeen.Valid { //for the first row + if existedInfo.LastSeen.Time.IsZero() || !(existedInfo.LastSeen.Valid) { //for the first row info.LastSeen = sql.NullTime{Time: time.Now().UTC(), Valid: true} } @@ -237,9 +245,14 @@ func GetPingInfoToInsert(existedInfo, info *types.PingInfo) *types.PingInfo { if !info.IsOnline { info.TotalSuccessfulPings = existedInfo.TotalSuccessfulPings - info.LastSeen = existedInfo.LastSeen info.IsOnWatchlist = existedInfo.IsOnWatchlist info.IsAdjusted = existedInfo.IsAdjusted + + if existedInfo.LastSeen.Time.IsZero() || !(existedInfo.LastSeen.Valid) { //for the first row + info.LastSeen = sql.NullTime{Time: time.Now().UTC(), Valid: true} + } else { + info.LastSeen = existedInfo.LastSeen + } } info.CumulativeResponseTime = existedInfo.CumulativeResponseTime + info.LastResponseTime diff --git a/supernode/services/selfhealing/ping_nodes_test.go b/supernode/services/selfhealing/ping_nodes_test.go index 62a10c805..125cbadda 100644 --- a/supernode/services/selfhealing/ping_nodes_test.go +++ b/supernode/services/selfhealing/ping_nodes_test.go @@ -22,9 +22,9 @@ func TestGetInfoToInsert(t *testing.T) { }{ { testCase: "when node is offline, should increment total ping only", - existedInfo: &types.PingInfo{TotalPings: 2, TotalSuccessfulPings: 2}, + existedInfo: &types.PingInfo{TotalPings: 2, TotalSuccessfulPings: 2, LastSeen: sql.NullTime{Time: now, Valid: true}}, receivedInfo: &types.PingInfo{TotalPings: 0, IsOnline: false}, - expectedInfo: &types.PingInfo{TotalPings: 3, TotalSuccessfulPings: 2, IsOnline: false}, + expectedInfo: &types.PingInfo{TotalPings: 3, TotalSuccessfulPings: 2, IsOnline: false, LastSeen: sql.NullTime{Time: now, Valid: true}}, }, { testCase: "when node is online, should increment total ping & total successful pings", diff --git a/supernode/services/selfhealing/process_ping.go b/supernode/services/selfhealing/process_ping.go index 216e9129e..6f582a322 100644 --- a/supernode/services/selfhealing/process_ping.go +++ b/supernode/services/selfhealing/process_ping.go @@ -10,7 +10,7 @@ import ( // Ping acknowledges the received message and return with the timestamp func (task *SHTask) Ping(ctx context.Context, _ *pb.PingRequest) (*pb.PingResponse, error) { - log.WithContext(ctx).WithField("node_id", task.nodeID).Println("ping request received at the server") + log.WithContext(ctx).WithField("node_id", task.nodeID).Debug("ping request received at the server") return &pb.PingResponse{ ReceiverId: task.nodeID, diff --git a/supernode/services/selfhealing/service.go b/supernode/services/selfhealing/service.go index bca2a1abd..15f3e4917 100644 --- a/supernode/services/selfhealing/service.go +++ b/supernode/services/selfhealing/service.go @@ -101,9 +101,9 @@ func (service *SHService) Run(ctx context.Context) error { for { select { case <-time.After(defaultTimerBlockCheckDuration): - newCtx := context.Background() - task := service.NewSHTask() - task.GenerateSelfHealingChallenge(newCtx) + //newCtx := context.Background() + //task := service.NewSHTask() + //task.GenerateSelfHealingChallenge(newCtx) log.WithContext(ctx).Debug("Would normally invoke a self-healing worker") case <-ctx.Done(): diff --git a/supernode/services/selfhealing/update_watchlist.go b/supernode/services/selfhealing/update_watchlist.go index a0801b45e..d40677b91 100644 --- a/supernode/services/selfhealing/update_watchlist.go +++ b/supernode/services/selfhealing/update_watchlist.go @@ -25,6 +25,7 @@ func (task *SHTask) UpdateWatchlist(ctx context.Context) error { info.IsOnWatchlist = true info.IsAdjusted = false + log.WithContext(ctx).WithField("supernode_id", info.SupernodeID).Info("updating watchlist flag") if err := task.UpsertPingHistory(ctx, info); err != nil { log.WithContext(ctx). WithField("supernode_id", info.SupernodeID).