Skip to content

Commit bc0ba38

Browse files
committed
[PSL-1200] handle error scenarios for multi-volume cascade registration flow
1 parent a8c369c commit bc0ba38

File tree

5 files changed

+109
-4
lines changed

5 files changed

+109
-4
lines changed

p2p/test/mock_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (client *Client) ListenOnNClosestNodesWithIncludingNodelist(retArr []string
9393

9494
// ListenOnStoreBatch listening on StoreBatch
9595
func (client *Client) ListenOnStoreBatch(err error) *Client {
96-
client.On(StoreBatch, mock.Anything, mock.Anything, mock.Anything).Return(err)
96+
client.On(StoreBatch, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(err)
9797
return client
9898
}
9999

walletnode/api/services/cascade.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@ func (service *CascadeAPIHandler) StartProcessing(ctx context.Context, p *cascad
141141
return nil, cascade.MakeInternalServerError(errors.New("ticket registration attempts have been exceeded"))
142142
default:
143143
regAttemptID, err := service.register.InsertRegistrationAttempts(types.RegistrationAttempt{
144-
FileID: p.FileID,
144+
FileID: baseFile.FileID,
145145
RegStartedAt: time.Now().UTC(),
146146
})
147147
if err != nil {
148-
log.WithContext(ctx).WithField("file_id", p.FileID).WithError(err).Error("error inserting registration attempt")
148+
log.WithContext(ctx).WithField("file_id", baseFile.FileID).WithError(err).Error("error inserting registration attempt")
149149
return nil, err
150150
}
151151

walletnode/services/cascaderegister/service.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package cascaderegister
22

33
import (
44
"context"
5+
"database/sql"
56
"os"
67
"path/filepath"
78
"strconv"
89
"strings"
910
"time"
1011

1112
"github.com/google/uuid"
13+
14+
"github.com/pastelnetwork/gonode/common/log"
1215
"github.com/pastelnetwork/gonode/common/storage/queries"
1316
"github.com/pastelnetwork/gonode/common/storage/ticketstore"
1417
"github.com/pastelnetwork/gonode/common/types"
@@ -325,6 +328,72 @@ func (service *CascadeRegistrationService) UpsertFile(file types.File) error {
325328
return nil
326329
}
327330

331+
func (service *CascadeRegistrationService) HandleTaskRegistrationErrorAttempts(ctx context.Context, taskID, regTxid, actTxid string, regAttemptID int64, actAttemptID int64, taskError error) error {
332+
doneBlock, err := service.pastelHandler.PastelClient.GetBlockCount(ctx)
333+
if err != nil {
334+
log.WithContext(ctx).WithField("task_id", taskID).WithError(err).Error("error retrieving block count")
335+
return err
336+
}
337+
338+
switch {
339+
case regTxid == "":
340+
ra, err := service.GetRegistrationAttemptsByID(int(regAttemptID))
341+
if err != nil {
342+
log.WithContext(ctx).WithField("task_id", taskID).WithError(err).Error("error retrieving file reg attempt")
343+
return err
344+
}
345+
346+
ra.FinishedAt = time.Now().UTC()
347+
ra.IsSuccessful = false
348+
ra.ErrorMessage = taskError.Error()
349+
_, err = service.UpdateRegistrationAttempts(*ra)
350+
if err != nil {
351+
log.WithContext(ctx).WithField("task_id", taskID).WithError(err).Error("error updating file reg attempt")
352+
return err
353+
}
354+
355+
return nil
356+
case actTxid == "":
357+
file, err := service.GetFileByTaskID(taskID)
358+
if err != nil {
359+
log.WithContext(ctx).WithError(err).Error("error retrieving file")
360+
return nil
361+
}
362+
363+
file.DoneBlock = int(doneBlock)
364+
file.RegTxid = regTxid
365+
file.IsConcluded = false
366+
err = service.ticketDB.UpsertFile(*file)
367+
if err != nil {
368+
log.Errorf("Error in file upsert: %v", err.Error())
369+
return nil
370+
}
371+
372+
actAttempt, err := service.GetActivationAttemptByID(int(actAttemptID))
373+
if err != nil && !errors.Is(err, sql.ErrNoRows) {
374+
log.Errorf("Error retrieving file act attempt: %v", err.Error())
375+
return err
376+
}
377+
378+
if actAttempt == nil {
379+
return nil
380+
}
381+
382+
actAttempt.IsSuccessful = false
383+
actAttempt.ActivationAttemptAt = time.Now().UTC()
384+
actAttempt.ErrorMessage = taskError.Error()
385+
_, err = service.UpdateActivationAttempts(*actAttempt)
386+
if err != nil {
387+
log.Errorf("Error in activation attempts upsert: %v", err.Error())
388+
return err
389+
}
390+
391+
return err
392+
}
393+
394+
return nil
395+
}
396+
328397
// NewService returns a new Service instance
329398
func NewService(config *Config, pastelClient pastel.Client, nodeClient node.ClientInterface,
330399
fileStorage storage.FileStorageInterface, db storage.KeyValue,

walletnode/services/cascaderegister/task.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ func (task *CascadeRegistrationTask) Run(ctx context.Context) error {
6363
func (task *CascadeRegistrationTask) run(ctx context.Context) error {
6464
regTxid, actTxid, err := task.runTicketRegActTask(ctx)
6565
if err != nil {
66+
attemptErr := task.service.HandleTaskRegistrationErrorAttempts(ctx, task.ID(), regTxid, actTxid, task.Request.RegAttemptID, task.actAttemptID, err)
67+
if attemptErr != nil {
68+
return attemptErr
69+
}
70+
6671
return err
6772
}
6873

@@ -350,6 +355,10 @@ func (task *CascadeRegistrationTask) runTicketRegActTask(ctx context.Context) (r
350355
FileID: task.Request.FileID,
351356
ActivationAttemptAt: time.Now().UTC(),
352357
})
358+
if err != nil {
359+
log.WithContext(ctx).WithError(err).Error("error inserting activation attempt")
360+
return task.regCascadeTxid, "", errors.Errorf("error inserting activation attempt: %w", err)
361+
}
353362
task.actAttemptID = id
354363
// activate cascade ticket registered at previous step by SN
355364
activateTxID, err := task.activateActionTicket(ctx)

walletnode/services/cascaderegister/task_test.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package cascaderegister
22

33
import (
44
"context"
5+
"github.com/pastelnetwork/gonode/common/storage/ticketstore"
56
"image"
67
"image/png"
78
"os"
9+
"path/filepath"
810
"testing"
911
"time"
1012

@@ -48,6 +50,20 @@ func newTestImageFile() (*files.File, error) {
4850
}
4951

5052
func TestTaskRun(t *testing.T) {
53+
homeDir, _ := os.UserHomeDir()
54+
basePath := filepath.Join(homeDir, ".pastel")
55+
56+
// Make sure base path exists
57+
if err := os.MkdirAll(basePath, 0755); err != nil {
58+
t.Fatalf("Failed to create base path: %v", err)
59+
}
60+
61+
tempDir, err := createTempDirInPath(basePath)
62+
if err != nil {
63+
t.Fatalf("Failed to create temp dir: %v", err)
64+
}
65+
defer os.RemoveAll(tempDir) // clean up after test
66+
5167
type fields struct {
5268
Request *common.ActionRegistrationRequest
5369
}
@@ -194,8 +210,11 @@ func TestTaskRun(t *testing.T) {
194210
rqClientMock.ListenOnRaptorQ().ListenOnClose(nil)
195211
rqClientMock.ListenOnConnect(testCase.args.connectErr)
196212

213+
ticketDB, err := ticketstore.OpenTicketingDb()
214+
assert.NoError(t, err)
215+
197216
downloadService := download.NewNftDownloadService(download.NewConfig(), pastelClientMock, nodeClient, nil)
198-
service := NewService(NewConfig(), pastelClientMock, nodeClient, nil, nil, *downloadService, nil, nil)
217+
service := NewService(NewConfig(), pastelClientMock, nodeClient, nil, nil, *downloadService, nil, ticketDB)
199218
service.rqClient = rqClientMock
200219
service.config.WaitTxnValidInterval = 1
201220

@@ -219,3 +238,11 @@ func TestTaskRun(t *testing.T) {
219238
})
220239
}
221240
}
241+
242+
func createTempDirInPath(basePath string) (string, error) {
243+
dir, err := os.MkdirTemp(basePath, ".pastel")
244+
if err != nil {
245+
return "", err
246+
}
247+
return dir, nil
248+
}

0 commit comments

Comments
 (0)