Skip to content

Commit

Permalink
Merge pull request #883 from pastelnetwork/PSL-1200_handleErrorMultiV…
Browse files Browse the repository at this point in the history
…olumeReg

[PSL-1200] handle error scenarios for multi-volume cascade registration flow
  • Loading branch information
j-rafique authored Jun 26, 2024
2 parents a8c369c + bc0ba38 commit 4b79083
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 4 deletions.
2 changes: 1 addition & 1 deletion p2p/test/mock_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (client *Client) ListenOnNClosestNodesWithIncludingNodelist(retArr []string

// ListenOnStoreBatch listening on StoreBatch
func (client *Client) ListenOnStoreBatch(err error) *Client {
client.On(StoreBatch, mock.Anything, mock.Anything, mock.Anything).Return(err)
client.On(StoreBatch, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(err)
return client
}

Expand Down
4 changes: 2 additions & 2 deletions walletnode/api/services/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ func (service *CascadeAPIHandler) StartProcessing(ctx context.Context, p *cascad
return nil, cascade.MakeInternalServerError(errors.New("ticket registration attempts have been exceeded"))
default:
regAttemptID, err := service.register.InsertRegistrationAttempts(types.RegistrationAttempt{
FileID: p.FileID,
FileID: baseFile.FileID,
RegStartedAt: time.Now().UTC(),
})
if err != nil {
log.WithContext(ctx).WithField("file_id", p.FileID).WithError(err).Error("error inserting registration attempt")
log.WithContext(ctx).WithField("file_id", baseFile.FileID).WithError(err).Error("error inserting registration attempt")
return nil, err
}

Expand Down
69 changes: 69 additions & 0 deletions walletnode/services/cascaderegister/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package cascaderegister

import (
"context"
"database/sql"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/google/uuid"

"github.com/pastelnetwork/gonode/common/log"
"github.com/pastelnetwork/gonode/common/storage/queries"
"github.com/pastelnetwork/gonode/common/storage/ticketstore"
"github.com/pastelnetwork/gonode/common/types"
Expand Down Expand Up @@ -325,6 +328,72 @@ func (service *CascadeRegistrationService) UpsertFile(file types.File) error {
return nil
}

func (service *CascadeRegistrationService) HandleTaskRegistrationErrorAttempts(ctx context.Context, taskID, regTxid, actTxid string, regAttemptID int64, actAttemptID int64, taskError error) error {
doneBlock, err := service.pastelHandler.PastelClient.GetBlockCount(ctx)
if err != nil {
log.WithContext(ctx).WithField("task_id", taskID).WithError(err).Error("error retrieving block count")
return err
}

switch {
case regTxid == "":
ra, err := service.GetRegistrationAttemptsByID(int(regAttemptID))
if err != nil {
log.WithContext(ctx).WithField("task_id", taskID).WithError(err).Error("error retrieving file reg attempt")
return err
}

ra.FinishedAt = time.Now().UTC()
ra.IsSuccessful = false
ra.ErrorMessage = taskError.Error()
_, err = service.UpdateRegistrationAttempts(*ra)
if err != nil {
log.WithContext(ctx).WithField("task_id", taskID).WithError(err).Error("error updating file reg attempt")
return err
}

return nil
case actTxid == "":
file, err := service.GetFileByTaskID(taskID)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error retrieving file")
return nil
}

file.DoneBlock = int(doneBlock)
file.RegTxid = regTxid
file.IsConcluded = false
err = service.ticketDB.UpsertFile(*file)
if err != nil {
log.Errorf("Error in file upsert: %v", err.Error())
return nil
}

actAttempt, err := service.GetActivationAttemptByID(int(actAttemptID))
if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Errorf("Error retrieving file act attempt: %v", err.Error())
return err
}

if actAttempt == nil {
return nil
}

actAttempt.IsSuccessful = false
actAttempt.ActivationAttemptAt = time.Now().UTC()
actAttempt.ErrorMessage = taskError.Error()
_, err = service.UpdateActivationAttempts(*actAttempt)
if err != nil {
log.Errorf("Error in activation attempts upsert: %v", err.Error())
return err
}

return err
}

return nil
}

// NewService returns a new Service instance
func NewService(config *Config, pastelClient pastel.Client, nodeClient node.ClientInterface,
fileStorage storage.FileStorageInterface, db storage.KeyValue,
Expand Down
9 changes: 9 additions & 0 deletions walletnode/services/cascaderegister/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (task *CascadeRegistrationTask) Run(ctx context.Context) error {
func (task *CascadeRegistrationTask) run(ctx context.Context) error {
regTxid, actTxid, err := task.runTicketRegActTask(ctx)
if err != nil {
attemptErr := task.service.HandleTaskRegistrationErrorAttempts(ctx, task.ID(), regTxid, actTxid, task.Request.RegAttemptID, task.actAttemptID, err)
if attemptErr != nil {
return attemptErr
}

return err
}

Expand Down Expand Up @@ -350,6 +355,10 @@ func (task *CascadeRegistrationTask) runTicketRegActTask(ctx context.Context) (r
FileID: task.Request.FileID,
ActivationAttemptAt: time.Now().UTC(),
})
if err != nil {
log.WithContext(ctx).WithError(err).Error("error inserting activation attempt")
return task.regCascadeTxid, "", errors.Errorf("error inserting activation attempt: %w", err)
}
task.actAttemptID = id
// activate cascade ticket registered at previous step by SN
activateTxID, err := task.activateActionTicket(ctx)
Expand Down
29 changes: 28 additions & 1 deletion walletnode/services/cascaderegister/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package cascaderegister

import (
"context"
"github.com/pastelnetwork/gonode/common/storage/ticketstore"
"image"
"image/png"
"os"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -48,6 +50,20 @@ func newTestImageFile() (*files.File, error) {
}

func TestTaskRun(t *testing.T) {
homeDir, _ := os.UserHomeDir()
basePath := filepath.Join(homeDir, ".pastel")

// Make sure base path exists
if err := os.MkdirAll(basePath, 0755); err != nil {
t.Fatalf("Failed to create base path: %v", err)
}

tempDir, err := createTempDirInPath(basePath)
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir) // clean up after test

type fields struct {
Request *common.ActionRegistrationRequest
}
Expand Down Expand Up @@ -194,8 +210,11 @@ func TestTaskRun(t *testing.T) {
rqClientMock.ListenOnRaptorQ().ListenOnClose(nil)
rqClientMock.ListenOnConnect(testCase.args.connectErr)

ticketDB, err := ticketstore.OpenTicketingDb()
assert.NoError(t, err)

downloadService := download.NewNftDownloadService(download.NewConfig(), pastelClientMock, nodeClient, nil)
service := NewService(NewConfig(), pastelClientMock, nodeClient, nil, nil, *downloadService, nil, nil)
service := NewService(NewConfig(), pastelClientMock, nodeClient, nil, nil, *downloadService, nil, ticketDB)
service.rqClient = rqClientMock
service.config.WaitTxnValidInterval = 1

Expand All @@ -219,3 +238,11 @@ func TestTaskRun(t *testing.T) {
})
}
}

func createTempDirInPath(basePath string) (string, error) {
dir, err := os.MkdirTemp(basePath, ".pastel")
if err != nil {
return "", err
}
return dir, nil
}

0 comments on commit 4b79083

Please sign in to comment.