Skip to content

Commit

Permalink
Merge pull request #746 from pastelnetwork/PSL-1090_BugFixes
Browse files Browse the repository at this point in the history
[PSL-1090] fix bugs and edge cases handling
  • Loading branch information
j-rafique authored Dec 13, 2023
2 parents 1535f14 + 38a3b78 commit d987ef3
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 259 deletions.
2 changes: 1 addition & 1 deletion common/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ type LocalStoreInterface interface {
GetPingInfoBySupernodeID(supernodeID string) (*types.PingInfo, error)
GetAllPingInfos() (types.PingInfos, error)
GetWatchlistPingInfo() ([]types.PingInfo, error)
UpdatePingInfo(supernodeID string) error
UpdatePingInfo(supernodeID string, isOnWatchlist, isAdjusted bool) error
CloseHistoryDB(ctx context.Context)
}
6 changes: 3 additions & 3 deletions common/storage/local/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,15 @@ func (s *SQLiteStore) GetWatchlistPingInfo() ([]types.PingInfo, error) {
}

// UpdatePingInfo updates the ping info
func (s *SQLiteStore) UpdatePingInfo(supernodeID string) error {
func (s *SQLiteStore) UpdatePingInfo(supernodeID string, isOnWatchlist, isAdjusted bool) error {
// Update query
const updateQuery = `
UPDATE ping_history
SET is_adjusted = true, is_on_watchlist = false
SET is_adjusted = ?, is_on_watchlist = ?
WHERE supernode_id = ?;`

// Execute the update query
_, err := s.db.Exec(updateQuery, supernodeID)
_, err := s.db.Exec(updateQuery, isAdjusted, isOnWatchlist, supernodeID)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion supernode/services/download/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ func (task *NftDownloadingTask) getSymbolIDsFromMetadataFile(ctx context.Context
return symbolIDs, nil
}

func (task *NftDownloadingTask) restoreFile(ctx context.Context, rqID []string, rqOti []byte, dataHash []byte, txid string) ([]byte, error) {
// RestoreFile restores the file using the available rq-ids
func (task *NftDownloadingTask) RestoreFile(ctx context.Context, rqID []string, rqOti []byte, dataHash []byte, txid string) ([]byte, error) {
var file []byte
var lastErr error
var err error
Expand Down
2 changes: 1 addition & 1 deletion supernode/services/download/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (task *NftDownloadingTask) Download(ctx context.Context, txid, timestamp, s
// Get the list of "symbols/chunks" from Kademlia by using symbol identifiers from file
// Pass all symbols/chunks to the raptorq service to decode (also passing encoder parameters: rq_oti)
// Validate hash of the restored image matches the image hash in the Art Reistration ticket (data_hash)
file, err = task.restoreFile(ctx, info.rqIDs, info.rqOti, info.dataHash, txid)
file, err = task.RestoreFile(ctx, info.rqIDs, info.rqOti, info.dataHash, txid)
if err != nil {
log.WithContext(ctx).WithField("txid", txid).Error("restore file failed")
err = errors.Errorf("restore file: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
)

const (
watchlistThreshold = 6
defaultClosestNodes = 6
minNodesOnWatchlistThreshold = 6
minTimeForWatchlistNodes = 30 //in minutes
defaultClosestNodes = 6
)

// TicketType will identify the type of ticket
Expand All @@ -38,8 +39,8 @@ type SymbolFileKeyDetails struct {
TicketType TicketType
}

// SelfHealingWorker checks the ping info and decide self-healing files
func (task *SHTask) SelfHealingWorker(ctx context.Context) error {
// GenerateSelfHealingChallenge worker checks the ping info and identify self-healing tickets and their recipients
func (task *SHTask) GenerateSelfHealingChallenge(ctx context.Context) error {
log.WithContext(ctx).Infoln("Self Healing Worker has been invoked")

watchlistPingInfos, err := task.retrieveWatchlistPingInfo(ctx)
Expand All @@ -49,11 +50,12 @@ func (task *SHTask) SelfHealingWorker(ctx context.Context) error {
}
log.WithContext(ctx).Info("watchlist ping history has been retrieved")

if len(watchlistPingInfos) < watchlistThreshold {
shouldTrigger, watchlistPingInfos := task.shouldTriggerSelfHealing(watchlistPingInfos)
if !shouldTrigger {
log.WithContext(ctx).WithField("no_of_nodes_on_watchlist", len(watchlistPingInfos)).Info("not enough nodes on the watchlist, skipping further processing")
return nil
}
log.WithContext(ctx).Info("watchlist threshold has been reached, proceeding forward")
log.WithContext(ctx).Info("self-healing has been triggered, proceeding with the identification of files & recipients")

keys, symbolFileKeyMap, err := task.ListSymbolFileKeysFromNFTAndActionTickets(ctx)
if err != nil {
Expand Down Expand Up @@ -117,6 +119,23 @@ func (task *SHTask) retrieveWatchlistPingInfo(ctx context.Context) (types.PingIn
return infos, nil
}

func (task *SHTask) shouldTriggerSelfHealing(infos types.PingInfos) (bool, types.PingInfos) {
var filteredPings types.PingInfos
for _, ping := range infos {
if ping.LastSeen.Valid {
if time.Since(ping.LastSeen.Time).Minutes() <= minTimeForWatchlistNodes {
filteredPings = append(filteredPings, ping)
}
}
}

if len(filteredPings) < minNodesOnWatchlistThreshold {
return false, filteredPings
}

return true, filteredPings
}

// ListSymbolFileKeysFromNFTAndActionTickets : Get an NFT and Action Ticket's associated raptor q ticket file id's.
func (service *SHService) ListSymbolFileKeysFromNFTAndActionTickets(ctx context.Context) ([]string, map[string]SymbolFileKeyDetails, error) {
var keys = make([]string, 0)
Expand Down Expand Up @@ -285,7 +304,12 @@ func (task *SHTask) identifyChallengeRecipients(ctx context.Context, selfHealing
continue
}

challengeRecipients := task.SHService.GetNClosestSupernodeIDsToComparisonString(ctx, 1, string(dataHash), nil)
listOfSupernodes, err := task.getListOfSupernode(ctx)
if err != nil {
logger.WithError(err).Error("unable to retrieve list of supernodes")
}

challengeRecipients := task.SHService.GetNClosestSupernodeIDsToComparisonString(ctx, 1, string(dataHash), listOfSupernodes)
if len(challengeRecipients) < 1 {
log.WithContext(ctx).WithField("file_hash", dataHash).Info("no closest nodes have found against the file")
continue
Expand Down Expand Up @@ -571,7 +595,7 @@ func (task *SHTask) updateWatchlist(ctx context.Context, watchlistPingInfos type
defer store.CloseHistoryDB(ctx)

for _, info := range watchlistPingInfos {
err = store.UpdatePingInfo(info.SupernodeID)
err = store.UpdatePingInfo(info.SupernodeID, false, true)
if err != nil {
log.WithContext(ctx).WithField("supernode_id", info.SupernodeID).
Error("error updating watchlist ping info")
Expand All @@ -583,3 +607,17 @@ func (task *SHTask) updateWatchlist(ctx context.Context, watchlistPingInfos type

return nil
}

func (service *SHService) getListOfSupernode(ctx context.Context) ([]string, error) {
var ret = make([]string, 0)
listMN, err := service.SuperNodeService.PastelClient.MasterNodesExtra(ctx)
if err != nil {
return ret, err
}

for _, node := range listMN {
ret = append(ret, node.ExtKey)
}

return ret, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package selfhealing

import (
"context"
json "github.com/json-iterator/go"
"github.com/pastelnetwork/gonode/common/types"
"golang.org/x/crypto/sha3"
"database/sql"
"testing"
"time"

fuzz "github.com/google/gofuzz"
json "github.com/json-iterator/go"
"github.com/pastelnetwork/gonode/common/types"
p2pMock "github.com/pastelnetwork/gonode/p2p/test"
"github.com/pastelnetwork/gonode/pastel"
pastelMock "github.com/pastelnetwork/gonode/pastel/test"
Expand All @@ -16,6 +17,7 @@ import (
rqmock "github.com/pastelnetwork/gonode/raptorq/node/test"
shtest "github.com/pastelnetwork/gonode/supernode/node/test/self_healing"
"github.com/stretchr/testify/require"
"golang.org/x/crypto/sha3"
)

func TestListSymbolFileKeysForNFTAndActionTickets(t *testing.T) {
Expand Down Expand Up @@ -302,3 +304,75 @@ func TestCreateSelfHealingTicketsMap(t *testing.T) {
}

}

func TestShouldTriggerSelfHealing(t *testing.T) {
t.Parallel()

config := NewConfig()
pastelClient := pastelMock.NewMockClient(t)
p2pClient := p2pMock.NewMockClient(t)
raptorQClient := rqmock.NewMockClient(t)
var nodeClient *shtest.Client

tests := []struct {
testcase string
infos types.PingInfos
expect func(*testing.T, bool, types.PingInfos)
}{
{
testcase: "when more than 6 nodes that are on watchlist, have last_seen within 30 min time span",
infos: []types.PingInfo{
types.PingInfo{SupernodeID: "A", LastSeen: sql.NullTime{Time: time.Now().Add(-5 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "B", LastSeen: sql.NullTime{Time: time.Now().Add(-10 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "C", LastSeen: sql.NullTime{Time: time.Now().Add(-15 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "D", LastSeen: sql.NullTime{Time: time.Now().Add(-45 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "E", LastSeen: sql.NullTime{Time: time.Now().Add(-50 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "F", LastSeen: sql.NullTime{Time: time.Now().Add(-75 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "G", LastSeen: sql.NullTime{Time: time.Now().Add(-100 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "H", LastSeen: sql.NullTime{Time: time.Now().Add(-25 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "I", LastSeen: sql.NullTime{Time: time.Now().Add(-29 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "J", LastSeen: sql.NullTime{Time: time.Now().Add(-22 * time.Minute), Valid: true}},
},
expect: func(t *testing.T, shouldTrigger bool, infos types.PingInfos) {
require.Equal(t, len(infos), 6)
require.Equal(t, shouldTrigger, true)
},
},
{
testcase: "when less than 6 nodes that are on watchlist, have last_seen within 30 min time span",
infos: []types.PingInfo{
types.PingInfo{SupernodeID: "A", LastSeen: sql.NullTime{Time: time.Now().Add(-55 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "B", LastSeen: sql.NullTime{Time: time.Now().Add(-90 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "C", LastSeen: sql.NullTime{Time: time.Now().Add(-45 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "D", LastSeen: sql.NullTime{Time: time.Now().Add(-45 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "E", LastSeen: sql.NullTime{Time: time.Now().Add(-50 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "F", LastSeen: sql.NullTime{Time: time.Now().Add(-75 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "G", LastSeen: sql.NullTime{Time: time.Now().Add(-100 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "H", LastSeen: sql.NullTime{Time: time.Now().Add(-75 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "I", LastSeen: sql.NullTime{Time: time.Now().Add(-29 * time.Minute), Valid: true}},
types.PingInfo{SupernodeID: "J", LastSeen: sql.NullTime{Time: time.Now().Add(-22 * time.Minute), Valid: true}},
},
expect: func(t *testing.T, shouldTrigger bool, infos types.PingInfos) {
require.Equal(t, len(infos), 2)
require.Equal(t, shouldTrigger, false)
},
},
}

for _, tt := range tests {
tt := tt

t.Run(tt.testcase, func(t *testing.T) {
// Run the setup for the testcase
service := NewService(config, nil, pastelClient, nodeClient,
p2pClient, nil)
task := NewSHTask(service)
task.StorageHandler.RqClient = raptorQClient
// call the function to get return values
shouldTrigger, filteredPings := task.shouldTriggerSelfHealing(tt.infos)
// handle the test case's assertions with the provided func
tt.expect(t, shouldTrigger, filteredPings)
})
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing pastel.MasterNode
SupernodeID: node.ExtKey,
IPAddress: node.ExtAddress,
IsOnline: false,
IsAdjusted: false,
LastResponseTime: 0.0,
}

Expand All @@ -107,7 +106,6 @@ func (task *SHTask) pingNodes(ctx context.Context, nodesToPing pastel.MasterNode
SupernodeID: node.ExtKey,
IPAddress: node.ExtAddress,
IsOnline: res.IsOnline,
IsAdjusted: false,
LastResponseTime: respondedAt.Sub(timeBeforePing).Seconds(),
LastSeen: sql.NullTime{Time: respondedAt, Valid: true},
}
Expand Down Expand Up @@ -233,11 +231,15 @@ func GetPingInfoToInsert(existedInfo, info *types.PingInfo) *types.PingInfo {

if info.IsOnline {
info.TotalSuccessfulPings = existedInfo.TotalSuccessfulPings + 1
info.IsOnWatchlist = false
info.IsAdjusted = false
}

if !info.IsOnline {
info.TotalSuccessfulPings = existedInfo.TotalSuccessfulPings
info.LastSeen = existedInfo.LastSeen
info.IsOnWatchlist = existedInfo.IsOnWatchlist
info.IsAdjusted = existedInfo.IsAdjusted
}

info.CumulativeResponseTime = existedInfo.CumulativeResponseTime + info.LastResponseTime
Expand Down
Loading

0 comments on commit d987ef3

Please sign in to comment.