From 2d714098351eb3ce20fd7b62c514a132ef5d3ca8 Mon Sep 17 00:00:00 2001 From: j-rafique Date: Mon, 24 Jun 2024 17:22:04 +0500 Subject: [PATCH] [PSL-1188] modify cascade registration endpoint with multi-volume ticket changes --- .../ticketstore/activation_attempts.go | 61 +++++---- common/storage/ticketstore/files.go | 68 +++++++--- .../ticketstore/registration_attempts.go | 61 +++++---- common/storage/ticketstore/store.go | 1 + common/types/ticket.go | 12 ++ walletnode/api/services/cascade.go | 72 +++++++++-- walletnode/services/cascaderegister/config.go | 8 ++ .../services/cascaderegister/service.go | 99 +++++++++++++-- walletnode/services/cascaderegister/task.go | 118 +++++++++++++++--- walletnode/services/common/action_ticket.go | 8 +- 10 files changed, 403 insertions(+), 105 deletions(-) diff --git a/common/storage/ticketstore/activation_attempts.go b/common/storage/ticketstore/activation_attempts.go index 3d9a7ac4f..709957799 100644 --- a/common/storage/ticketstore/activation_attempts.go +++ b/common/storage/ticketstore/activation_attempts.go @@ -5,42 +5,61 @@ import ( ) type ActivationAttemptsQueries interface { - UpsertActivationAttempt(attempt types.ActivationAttempt) error - GetActivationAttemptByID(id string) (*types.ActivationAttempt, error) + InsertActivationAttempt(attempt types.ActivationAttempt) (int64, error) + UpdateActivationAttempt(attempt types.ActivationAttempt) (int64, error) + GetActivationAttemptByID(id int) (*types.ActivationAttempt, error) GetActivationAttemptsByFileID(fileID string) ([]*types.ActivationAttempt, error) } -// UpsertActivationAttempt upsert a new activation attempt into the activation_attempts table -func (s *TicketStore) UpsertActivationAttempt(attempt types.ActivationAttempt) error { - const upsertQuery = ` +// InsertActivationAttempt insert a new activation attempt into the activation_attempts table +func (s *TicketStore) InsertActivationAttempt(attempt types.ActivationAttempt) (int64, error) { + const insertQuery = ` INSERT INTO activation_attempts ( - id, file_id, activation_attempt_at, is_successful, error_message - ) VALUES (?, ?, ?, ?, ?) - ON CONFLICT(id) - DO UPDATE SET - file_id = excluded.file_id, - activation_attempt_at = excluded.activation_attempt_at, - is_successful = excluded.is_successful, - error_message = excluded.error_message;` - - _, err := s.db.Exec(upsertQuery, - attempt.ID, attempt.FileID, attempt.ActivationAttemptAt, - attempt.IsSuccessful, attempt.ErrorMessage) + file_id, activation_attempt_at, is_successful, error_message + ) VALUES (?, ?, ?, ?) + RETURNING id;` + + var id int64 + err := s.db.QueryRow(insertQuery, + attempt.FileID, attempt.ActivationAttemptAt, + attempt.IsSuccessful, attempt.ErrorMessage).Scan(&id) + if err != nil { + return 0, err + } + + return id, nil +} + +// UpdateActivationAttempt update a new activation attempt into the activation_attempts table +func (s *TicketStore) UpdateActivationAttempt(attempt types.ActivationAttempt) (int64, error) { + const updateQuery = ` + UPDATE activation_attempts + SET activation_attempt_at = ?, is_successful = ?, error_message = ? + WHERE id = ? AND file_id = ? + RETURNING id` + + var id int64 + err := s.db.QueryRow(updateQuery, + attempt.ActivationAttemptAt, + attempt.IsSuccessful, + attempt.ErrorMessage, + attempt.ID, + attempt.FileID).Scan(&id) if err != nil { - return err + return 0, err } - return nil + return id, nil } // GetActivationAttemptByID retrieves an activation attempt by its ID from the activation_attempts table -func (s *TicketStore) GetActivationAttemptByID(id string) (*types.ActivationAttempt, error) { +func (s *TicketStore) GetActivationAttemptByID(id int) (*types.ActivationAttempt, error) { const selectQuery = ` SELECT id, file_id, activation_attempt_at, is_successful, error_message FROM activation_attempts WHERE id = ?;` - row := s.db.QueryRow(selectQuery, id) + row := s.db.QueryRow(selectQuery, int64(id)) var attempt types.ActivationAttempt err := row.Scan( diff --git a/common/storage/ticketstore/files.go b/common/storage/ticketstore/files.go index 23903e5e6..a408efcee 100644 --- a/common/storage/ticketstore/files.go +++ b/common/storage/ticketstore/files.go @@ -8,6 +8,7 @@ type FilesQueries interface { UpsertFile(file types.File) error GetFileByID(fileID string) (*types.File, error) GetFilesByBaseFileID(baseFileID string) ([]*types.File, error) + GetFileByTaskID(taskID string) (*types.File, error) } // UpsertFile inserts a new file into the files table @@ -23,25 +24,25 @@ func (s *TicketStore) UpsertFile(file types.File) error { ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(file_id) DO UPDATE SET - upload_timestamp = excluded.upload_timestamp, - path = excluded.path, - file_index = excluded.file_index, - base_file_id = excluded.base_file_id, - task_id = excluded.task_id, - reg_txid = excluded.reg_txid, - activation_txid = excluded.activation_txid, - req_burn_txn_amount = excluded.req_burn_txn_amount, - burn_txn_id = excluded.burn_txn_id, - req_amount = excluded.req_amount, - is_concluded = excluded.is_concluded, - cascade_metadata_ticket_id = excluded.cascade_metadata_ticket_id, - uuid_key = excluded.uuid_key, - hash_of_original_big_file = excluded.hash_of_original_big_file, - name_of_original_big_file_with_ext = excluded.name_of_original_big_file_with_ext, - size_of_original_big_file = excluded.size_of_original_big_file, - data_type_of_original_big_file = excluded.data_type_of_original_big_file, - start_block = excluded.start_block, - done_block = excluded.done_block;` + upload_timestamp = COALESCE(excluded.upload_timestamp, files.upload_timestamp), + path = COALESCE(excluded.path, files.path), + file_index = COALESCE(excluded.file_index, files.file_index), + base_file_id = COALESCE(excluded.base_file_id, files.base_file_id), + task_id = COALESCE(excluded.task_id, files.task_id), + reg_txid = COALESCE(excluded.reg_txid, files.reg_txid), + activation_txid = COALESCE(excluded.activation_txid, files.activation_txid), + req_burn_txn_amount = COALESCE(excluded.req_burn_txn_amount, files.req_burn_txn_amount), + burn_txn_id = COALESCE(excluded.burn_txn_id, files.burn_txn_id), + req_amount = COALESCE(excluded.req_amount, files.req_amount), + is_concluded = COALESCE(excluded.is_concluded, files.is_concluded), + cascade_metadata_ticket_id = COALESCE(excluded.cascade_metadata_ticket_id, files.cascade_metadata_ticket_id), + uuid_key = COALESCE(excluded.uuid_key, files.uuid_key), + hash_of_original_big_file = COALESCE(excluded.hash_of_original_big_file, files.hash_of_original_big_file), + name_of_original_big_file_with_ext = COALESCE(excluded.name_of_original_big_file_with_ext, files.name_of_original_big_file_with_ext), + size_of_original_big_file = COALESCE(excluded.size_of_original_big_file, files.size_of_original_big_file), + data_type_of_original_big_file = COALESCE(excluded.data_type_of_original_big_file, files.data_type_of_original_big_file), + start_block = COALESCE(excluded.start_block, files.start_block), + done_block = COALESCE(excluded.done_block, files.done_block);` _, err := s.db.Exec(upsertQuery, file.FileID, file.UploadTimestamp, file.Path, file.FileIndex, file.BaseFileID, file.TaskID, @@ -86,6 +87,35 @@ func (s *TicketStore) GetFileByID(fileID string) (*types.File, error) { return &file, nil } +// GetFileByTaskID retrieves a file by its task-id from the files table +func (s *TicketStore) GetFileByTaskID(taskID string) (*types.File, error) { + const selectQuery = ` + SELECT file_id, upload_timestamp, path, file_index, base_file_id, task_id, + reg_txid, activation_txid, req_burn_txn_amount, burn_txn_id, + req_amount, is_concluded, cascade_metadata_ticket_id, uuid_key, + hash_of_original_big_file, name_of_original_big_file_with_ext, + size_of_original_big_file, data_type_of_original_big_file, + start_block, done_block + FROM files + WHERE task_id = ?;` + + row := s.db.QueryRow(selectQuery, taskID) + + var file types.File + err := row.Scan( + &file.FileID, &file.UploadTimestamp, &file.Path, &file.FileIndex, &file.BaseFileID, &file.TaskID, + &file.RegTxid, &file.ActivationTxid, &file.ReqBurnTxnAmount, &file.BurnTxnID, + &file.ReqAmount, &file.IsConcluded, &file.CascadeMetadataTicketID, &file.UUIDKey, + &file.HashOfOriginalBigFile, &file.NameOfOriginalBigFileWithExt, + &file.SizeOfOriginalBigFile, &file.DataTypeOfOriginalBigFile, + &file.StartBlock, &file.DoneBlock) + if err != nil { + return nil, err + } + + return &file, nil +} + // GetFilesByBaseFileID retrieves files by base_file_id from the files table func (s *TicketStore) GetFilesByBaseFileID(baseFileID string) ([]*types.File, error) { const selectQuery = ` diff --git a/common/storage/ticketstore/registration_attempts.go b/common/storage/ticketstore/registration_attempts.go index 47109c0a2..02af9cb4e 100644 --- a/common/storage/ticketstore/registration_attempts.go +++ b/common/storage/ticketstore/registration_attempts.go @@ -5,34 +5,53 @@ import ( ) type RegistrationAttemptsQueries interface { - UpsertRegistrationAttempt(attempt types.RegistrationAttempt) error + InsertRegistrationAttempt(attempt types.RegistrationAttempt) (int64, error) + UpdateRegistrationAttempt(attempt types.RegistrationAttempt) (int64, error) GetRegistrationAttemptByID(id int) (*types.RegistrationAttempt, error) GetRegistrationAttemptsByFileID(fileID string) ([]*types.RegistrationAttempt, error) } -// UpsertRegistrationAttempt upsert a new registration attempt into the registration_attempts table -func (s *TicketStore) UpsertRegistrationAttempt(attempt types.RegistrationAttempt) error { - const upsertQuery = ` +// InsertRegistrationAttempt insert a new registration attempt into the registration_attempts table +func (s *TicketStore) InsertRegistrationAttempt(attempt types.RegistrationAttempt) (int64, error) { + const insertQuery = ` INSERT INTO registration_attempts ( - id, file_id, reg_started_at, processor_sns, finished_at, is_successful, error_message - ) VALUES (?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(id) - DO UPDATE SET - file_id = excluded.file_id, - reg_started_at = excluded.reg_started_at, - processor_sns = excluded.processor_sns, - finished_at = excluded.finished_at, - is_successful = excluded.is_successful, - error_message = excluded.error_message;` - - _, err := s.db.Exec(upsertQuery, - attempt.ID, attempt.FileID, attempt.RegStartedAt, attempt.ProcessorSNS, - attempt.FinishedAt, attempt.IsSuccessful, attempt.ErrorMessage) + file_id, reg_started_at, processor_sns, finished_at, is_successful, error_message + ) VALUES (?, ?, ?, ?, ?, ?) + RETURNING id;` + + var id int64 + err := s.db.QueryRow(insertQuery, + attempt.FileID, attempt.RegStartedAt, attempt.ProcessorSNS, + attempt.FinishedAt, attempt.IsSuccessful, attempt.ErrorMessage).Scan(&id) + if err != nil { + return 0, err + } + + return id, nil +} + +// UpdateRegistrationAttempt update a new registration attempt into the registration_attempts table +func (s *TicketStore) UpdateRegistrationAttempt(attempt types.RegistrationAttempt) (int64, error) { + const updateQuery = ` + UPDATE registration_attempts + SET reg_started_at = ?, + processor_sns = ?, + finished_at = ?, + is_successful = ?, + error_message = ? + WHERE id = ? AND file_id = ? + RETURNING id;` + + var id int64 + err := s.db.QueryRow(updateQuery, + attempt.RegStartedAt, attempt.ProcessorSNS, attempt.FinishedAt, + attempt.IsSuccessful, attempt.ErrorMessage, + attempt.ID, attempt.FileID).Scan(&id) if err != nil { - return err + return 0, err } - return nil + return id, nil } // GetRegistrationAttemptByID retrieves a registration attempt by its ID from the registration_attempts table @@ -43,7 +62,7 @@ func (s *TicketStore) GetRegistrationAttemptByID(id int) (*types.RegistrationAtt FROM registration_attempts WHERE id = ?;` - row := s.db.QueryRow(selectQuery, id) + row := s.db.QueryRow(selectQuery, int64(id)) var attempt types.RegistrationAttempt err := row.Scan( diff --git a/common/storage/ticketstore/store.go b/common/storage/ticketstore/store.go index 0e8510d97..ccc472bb0 100644 --- a/common/storage/ticketstore/store.go +++ b/common/storage/ticketstore/store.go @@ -76,6 +76,7 @@ func (s *TicketStore) CloseTicketDB(ctx context.Context) { // OpenTicketingDb opens ticket DB func OpenTicketingDb() (TicketStorageInterface, error) { dbFile := filepath.Join(configurer.DefaultPath(), ticketDBName) + db, err := sqlx.Connect("sqlite3", dbFile) if err != nil { return nil, fmt.Errorf("cannot open sqlite database: %w", err) diff --git a/common/types/ticket.go b/common/types/ticket.go index bafb09385..2b8b8132c 100644 --- a/common/types/ticket.go +++ b/common/types/ticket.go @@ -27,6 +27,8 @@ type File struct { DoneBlock int } +type Files []*File + type RegistrationAttempt struct { ID int FileID string @@ -44,3 +46,13 @@ type ActivationAttempt struct { IsSuccessful bool ErrorMessage string } + +func (fs Files) GetBase() *File { + for _, f := range fs { + if f.FileIndex == "0" { + return f + } + } + + return nil +} diff --git a/walletnode/api/services/cascade.go b/walletnode/api/services/cascade.go index cdaf96192..901b2aed8 100644 --- a/walletnode/api/services/cascade.go +++ b/walletnode/api/services/cascade.go @@ -3,6 +3,7 @@ package services import ( "context" "fmt" + "github.com/pastelnetwork/gonode/common/types" "os" "path/filepath" "strings" @@ -25,6 +26,11 @@ import ( "goa.design/goa/v3/security" ) +const ( + maxFileSize = 350 * 1024 * 1024 // 350MB in bytes + maxFileRegistrationAttempts = 3 +) + // CascadeAPIHandler - CascadeAPIHandler service type CascadeAPIHandler struct { *Common @@ -107,23 +113,69 @@ func (service *CascadeAPIHandler) StartProcessing(ctx context.Context, p *cascad return nil, cascade.MakeUnAuthorized(errors.New("user not authorized: invalid PastelID or Key")) } - taskID, err := service.register.AddTask(p) + relatedFiles, err := service.register.GetFilesByBaseFileID(p.FileID) if err != nil { - log.WithError(err).Error("unable to add task") return nil, cascade.MakeInternalServerError(err) } + log.WithContext(ctx).WithField("total_volumes", len(relatedFiles)).Info("related volumes retrieved from the base file-id") - res = &cascade.StartProcessingResult{ - TaskID: taskID, + baseFile := relatedFiles.GetBase() + if baseFile == nil { + return nil, cascade.MakeInternalServerError(err) } - fileName, err := service.register.ImageHandler.FileDb.Get(p.FileID) - if err != nil { - return nil, cascade.MakeBadRequest(errors.New("file not found, please re-upload and try again")) - } + switch { + case len(relatedFiles) == 1: + switch { + case baseFile.IsConcluded: + return nil, cascade.MakeInternalServerError(errors.New("ticket has already been registered & activated")) - log.WithField("task_id", taskID).WithField("file_id", p.FileID).WithField("file_name", string(fileName)). - Info("task has been added") + case baseFile.RegTxid == "": + baseFileRegistrationAttempts, err := service.register.GetRegistrationAttemptsByFileID(baseFile.FileID) + if err != nil { + return nil, cascade.MakeInternalServerError(err) + } + + switch { + case len(baseFileRegistrationAttempts) > maxFileRegistrationAttempts: + return nil, cascade.MakeInternalServerError(errors.New("ticket registration attempts have been exceeded")) + default: + regAttemptID, err := service.register.InsertRegistrationAttempts(types.RegistrationAttempt{ + FileID: p.FileID, + RegStartedAt: time.Now().UTC(), + }) + if err != nil { + log.WithContext(ctx).WithField("file_id", p.FileID).WithError(err).Error("error inserting registration attempt") + return nil, err + } + + taskID, err := service.register.AddTask(p, regAttemptID, baseFile.FileID) + if err != nil { + log.WithError(err).Error("unable to add task") + return nil, cascade.MakeInternalServerError(err) + } + + res = &cascade.StartProcessingResult{ + TaskID: taskID, + } + + baseFile.TaskID = taskID + baseFile.BurnTxnID = p.BurnTxid + err = service.register.UpsertFile(*baseFile) + if err != nil { + log.WithField("task_id", taskID).WithField("file_id", p.FileID). + WithField("burn_txid", p.BurnTxid). + WithField("file_name", baseFile.FileID). + Errorf("Error in file upsert: %v", err.Error()) + return nil, cascade.MakeInternalServerError(errors.New("Error in file upsert")) + } + } + default: + // Activation code + } + case len(relatedFiles) > 1: + + } return res, nil } diff --git a/walletnode/services/cascaderegister/config.go b/walletnode/services/cascaderegister/config.go index ea4e59630..86bc42917 100644 --- a/walletnode/services/cascaderegister/config.go +++ b/walletnode/services/cascaderegister/config.go @@ -1,7 +1,9 @@ package cascaderegister import ( + "github.com/pastelnetwork/gonode/common/configurer" "github.com/pastelnetwork/gonode/walletnode/services/common" + "path/filepath" ) const ( @@ -14,6 +16,10 @@ const ( defaultRQIDsMax = 50 ) +var ( + defaultCascadeFilesDir = filepath.Join(configurer.DefaultPath(), "files") +) + // Config contains settings of the registering nft. type Config struct { common.Config `mapstructure:",squash" json:"-"` @@ -28,6 +34,7 @@ type Config struct { // raptorq service RaptorQServiceAddress string `mapstructure:"-" json:"-"` RqFilesDir string + CascadeFilesDir string } // NewConfig returns a new Config instance. @@ -39,5 +46,6 @@ func NewConfig() *Config { WaitTxnValidInterval: defaultWaitTxnValidInterval, RQIDsMax: defaultRQIDsMax, NumberRQIDSFiles: defaultNumberRQIDSFiles, + CascadeFilesDir: defaultCascadeFilesDir, } } diff --git a/walletnode/services/cascaderegister/service.go b/walletnode/services/cascaderegister/service.go index ad7102663..f4f3bcf11 100644 --- a/walletnode/services/cascaderegister/service.go +++ b/walletnode/services/cascaderegister/service.go @@ -95,22 +95,35 @@ func (service *CascadeRegistrationService) ValidateUser(ctx context.Context, id } // AddTask create ticket request and start a new task with the given payload -func (service *CascadeRegistrationService) AddTask(p *cascade.StartProcessingPayload) (string, error) { +func (service *CascadeRegistrationService) AddTask(p *cascade.StartProcessingPayload, regAttemptID int64, filename string) (string, error) { request := FromStartProcessingPayload(p) + request.RegAttemptID = regAttemptID + request.FileID = filename // get image filename from storage based on image_id - filename, err := service.ImageHandler.FileDb.Get(p.FileID) + filePath := filepath.Join(service.config.CascadeFilesDir, p.FileID, filename) + fileData, err := os.ReadFile(filePath) if err != nil { - return "", errors.Errorf("get image filename from storage: %w", err) + return "", err } - // get image data from storage - file, err := service.ImageHandler.FileStorage.File(string(filename)) - if err != nil { - return "", errors.Errorf("get image data: %v", err) + file := service.ImageHandler.FileStorage.NewFile() + if file == nil { + return "", errors.Errorf("unable to create new file instance for %s", filename) + } + + // Write the []byte data to the file + if _, err := file.Write(fileData); err != nil { + return "", errors.Errorf("write image data to file: %v", err) } + + // Set the file format based on the filename extension + if err := file.SetFormatFromExtension(filepath.Ext(filename)); err != nil { + return "", errors.Errorf("set file format: %v", err) + } + + // Assign the newly created File instance to the request request.Image = file - request.FileName = string(filename) task := NewCascadeRegisterTask(service, request) service.Worker.AddTask(task) @@ -148,9 +161,6 @@ func (service *CascadeRegistrationService) StoreFileMetadata(ctx context.Context } files, err := os.ReadDir(dir) - if err != nil { - return fee, preburn, err - } type fileMetadata struct { Name string @@ -226,11 +236,20 @@ func (service *CascadeRegistrationService) GetFile(fileID string) (*types.File, return file, nil } -func (service *CascadeRegistrationService) GetFilesByBaseFileID(fileID string) ([]*types.File, error) { +func (service *CascadeRegistrationService) GetFileByTaskID(taskID string) (*types.File, error) { + file, err := service.ticketDB.GetFileByTaskID(taskID) + if err != nil { + return nil, err + } + return file, nil +} + +func (service *CascadeRegistrationService) GetFilesByBaseFileID(fileID string) (types.Files, error) { files, err := service.ticketDB.GetFilesByBaseFileID(fileID) if err != nil { return nil, err } + return files, nil } @@ -250,6 +269,62 @@ func (service *CascadeRegistrationService) GetRegistrationAttemptsByFileID(fileI return registrationAttempts, nil } +func (service *CascadeRegistrationService) GetRegistrationAttemptsByID(attemptID int) (*types.RegistrationAttempt, error) { + registrationAttempt, err := service.ticketDB.GetRegistrationAttemptByID(attemptID) + if err != nil { + return nil, err + } + return registrationAttempt, nil +} + +func (service *CascadeRegistrationService) GetActivationAttemptByID(attemptID int) (*types.ActivationAttempt, error) { + actAttempt, err := service.ticketDB.GetActivationAttemptByID(attemptID) + if err != nil { + return nil, err + } + return actAttempt, nil +} + +func (service *CascadeRegistrationService) InsertRegistrationAttempts(regAttempt types.RegistrationAttempt) (int64, error) { + id, err := service.ticketDB.InsertRegistrationAttempt(regAttempt) + if err != nil { + return 0, err + } + return id, nil +} + +func (service *CascadeRegistrationService) UpdateRegistrationAttempts(regAttempt types.RegistrationAttempt) (int64, error) { + id, err := service.ticketDB.UpdateRegistrationAttempt(regAttempt) + if err != nil { + return 0, err + } + return id, nil +} + +func (service *CascadeRegistrationService) InsertActivationAttempt(actAttempt types.ActivationAttempt) (int64, error) { + id, err := service.ticketDB.InsertActivationAttempt(actAttempt) + if err != nil { + return 0, err + } + return id, nil +} + +func (service *CascadeRegistrationService) UpdateActivationAttempts(actAttempt types.ActivationAttempt) (int64, error) { + id, err := service.ticketDB.UpdateActivationAttempt(actAttempt) + if err != nil { + return 0, err + } + return id, nil +} + +func (service *CascadeRegistrationService) UpsertFile(file types.File) error { + err := service.ticketDB.UpsertFile(file) + if err != nil { + return err + } + return nil +} + // NewService returns a new Service instance func NewService(config *Config, pastelClient pastel.Client, nodeClient node.ClientInterface, fileStorage storage.FileStorageInterface, db storage.KeyValue, diff --git a/walletnode/services/cascaderegister/task.go b/walletnode/services/cascaderegister/task.go index 244bfd46a..77d3be382 100644 --- a/walletnode/services/cascaderegister/task.go +++ b/walletnode/services/cascaderegister/task.go @@ -6,15 +6,16 @@ import ( "os" "time" - "github.com/gabriel-vasile/mimetype" "github.com/pastelnetwork/gonode/common/errgroup" + "github.com/pastelnetwork/gonode/common/types" + "github.com/pastelnetwork/gonode/walletnode/api/gen/nft" + + "github.com/gabriel-vasile/mimetype" "github.com/pastelnetwork/gonode/common/errors" "github.com/pastelnetwork/gonode/common/log" - "github.com/pastelnetwork/gonode/common/types" "github.com/pastelnetwork/gonode/common/utils" "github.com/pastelnetwork/gonode/mixins" "github.com/pastelnetwork/gonode/pastel" - "github.com/pastelnetwork/gonode/walletnode/api/gen/nft" "github.com/pastelnetwork/gonode/walletnode/services/common" "github.com/pastelnetwork/gonode/walletnode/services/download" ) @@ -47,6 +48,11 @@ type CascadeRegistrationTask struct { // only set to true for unit tests skipPrimaryNodeTxidVerify bool + + regCascadeFinishedAt time.Time + regCascadeStartedAt time.Time + + actAttemptID int64 } // Run starts the task @@ -55,6 +61,71 @@ func (task *CascadeRegistrationTask) Run(ctx context.Context) error { } func (task *CascadeRegistrationTask) run(ctx context.Context) error { + regTxid, actTxid, err := task.runTicketRegActTask(ctx) + if err != nil { + return err + } + + taskID := task.ID() + if regTxid != "" && actTxid != "" { + doneBlock, err := task.service.pastelHandler.PastelClient.GetBlockCount(ctx) + if err != nil { + log.WithContext(ctx).WithError(err).Error("error retrieving block-count") + return err + } + + file, err := task.service.GetFileByTaskID(taskID) + if err != nil { + log.WithContext(ctx).WithError(err).Error("error retrieving file") + return nil + } + + file.DoneBlock = int(doneBlock) + file.RegTxid = regTxid + file.ActivationTxid = actTxid + file.IsConcluded = true + err = task.service.ticketDB.UpsertFile(*file) + if err != nil { + log.Errorf("Error in file upsert: %v", err.Error()) + return nil + } + + // Upsert registration attempts + + ra, err := task.service.GetRegistrationAttemptsByID(int(task.Request.RegAttemptID)) + if err != nil { + log.Errorf("Error retrieving file reg attempt: %v", err.Error()) + return nil + } + + ra.FinishedAt = time.Now().UTC() + ra.IsSuccessful = true + _, err = task.service.UpdateRegistrationAttempts(*ra) + if err != nil { + log.Errorf("Error in registration attempts upsert: %v", err.Error()) + return nil + } + + actAttempt, err := task.service.GetActivationAttemptByID(int(task.actAttemptID)) + if err != nil { + log.Errorf("Error retrieving file act attempt: %v", err.Error()) + return nil + } + + actAttempt.IsSuccessful = true + _, err = task.service.UpdateActivationAttempts(*actAttempt) + if err != nil { + log.Errorf("Error in activation attempts upsert: %v", err.Error()) + return nil + } + + return nil + } + + return nil +} + +func (task *CascadeRegistrationTask) runTicketRegActTask(ctx context.Context) (regTxid, actTxid string, err error) { if r := recover(); r != nil { log.Errorf("Recovered from panic in cascade run: %v", r) } @@ -70,19 +141,21 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { <-connCtx.Done() }() + task.regCascadeStartedAt = time.Now() + nftBytes, err := task.Request.Image.Bytes() if err != nil { log.WithContext(ctx).WithError(err).Error("error converting image to bytes") task.StatusLog[common.FieldErrorDetail] = err.Error() task.UpdateStatus(common.StatusErrorConvertingImageBytes) - return errors.Errorf("convert image to byte stream %w", err) + return "", "", errors.Errorf("convert image to byte stream %w", err) } fileDataInMb := utils.GetFileSizeInMB(nftBytes) fee, err := task.service.pastelHandler.GetEstimatedCascadeFee(ctx, fileDataInMb) if err != nil { log.WithContext(ctx).WithError(err).Error("error getting estimated fee") - return errors.Errorf("getting estimated fee %w", err) + return "", "", errors.Errorf("getting estimated fee %w", err) } task.registrationFee = int64(fee) @@ -94,7 +167,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { if err := task.service.pastelHandler.ValidateBurnTxID(ctx, task.Request.BurnTxID, fee); err != nil { log.WithContext(ctx).WithError(err).Error("error getting confirmations on burn txn") - return errors.Errorf("waiting on burn txn confirmations failed: %w", err) + return "", "", errors.Errorf("waiting on burn txn confirmations failed: %w", err) } task.UpdateStatus(common.StatusBurnTxnValidated) log.WithContext(ctx).Info("Pre-Burn Transaction validated successfully, hashing data now..") @@ -107,7 +180,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { log.WithContext(ctx).WithError(err).Errorf("error creating hash") task.StatusLog[common.FieldErrorDetail] = err.Error() task.UpdateStatus(common.StatusErrorEncodingImage) - return errors.Errorf("hash encoded image: %w", err) + return "", "", errors.Errorf("hash encoded image: %w", err) } key := append(nftBytes, []byte(task.WalletNodeTask.ID())...) @@ -127,7 +200,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { log.WithContext(ctx).WithError(err).Error("error closing sn-connections") } - return errors.Errorf("connect to top rank nodes: %w", err) + return "", "", errors.Errorf("connect to top rank nodes: %w", err) } task.creatorBlockHeight = creatorBlockHeight @@ -155,7 +228,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { task.StatusLog[common.FieldErrorDetail] = err.Error() closeErr = err task.UpdateStatus(common.StatusErrorSendingRegMetadata) - return errors.Errorf("send registration metadata: %w", err) + return "", "", errors.Errorf("send registration metadata: %w", err) } log.WithContext(ctx).Info("Uploading data to Supernodes") @@ -164,7 +237,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { task.StatusLog[common.FieldErrorDetail] = err.Error() task.UpdateStatus(common.StatusErrorUploadImageFailed) closeErr = err - return errors.Errorf("upload data: %w", err) + return "", "", errors.Errorf("upload data: %w", err) } log.WithContext(ctx).Info("Data uploaded onto SNs, generating RaptorQ symbols' identifiers") @@ -182,7 +255,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { task.StatusLog[common.FieldErrorDetail] = err.Error() task.UpdateStatus(common.StatusErrorGenRaptorQSymbolsFailed) closeErr = err - return errors.Errorf("gen RaptorQ symbols' identifiers: %w", err) + return "", "", errors.Errorf("gen RaptorQ symbols' identifiers: %w", err) } log.WithContext(ctx).Info("RaptorQ symbols' identifiers generated") @@ -198,7 +271,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { task.StatusLog[common.FieldErrorDetail] = err.Error() closeErr = err task.UpdateStatus(common.StatusErrorCreatingTicket) - return errors.Errorf("create ticket: %w", err) + return "", "", errors.Errorf("create ticket: %w", err) } // sign ticket with creator signature @@ -207,7 +280,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { task.StatusLog[common.FieldErrorDetail] = err.Error() task.UpdateStatus(common.StatusErrorSigningTicket) closeErr = err - return errors.Errorf("sign cascade ticket: %w", err) + return "", "", errors.Errorf("sign cascade ticket: %w", err) } log.WithContext(ctx).Info("Cascade Ticket sent to Supernodes for validation & registration - waiting for Supernodes to respond..") @@ -216,9 +289,10 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { task.StatusLog[common.FieldErrorDetail] = err.Error() task.UpdateStatus(common.StatusErrorUploadingTicket) closeErr = err - return errors.Errorf("send signed cascade ticket: %w", err) + return "", "", errors.Errorf("send signed cascade ticket: %w", err) } task.StatusLog[common.FieldRegTicketTxnID] = task.regCascadeTxid + task.regCascadeFinishedAt = time.Now().UTC() task.UpdateStatus(common.StatusTicketAccepted) task.UpdateStatus(&common.EphemeralStatus{ StatusTitle: "Validating Cascade Reg TXID: ", @@ -250,7 +324,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { }) task.UpdateStatus(common.StatusTaskRejected) - return errors.Errorf("error validating cascade ticket data") + return task.regCascadeTxid, "", errors.Errorf("error validating cascade ticket data") } } log.WithContext(ctx).Infof("Cascade Registration Ticket validated through Download - Its registered with txid: %s", task.regCascadeTxid) @@ -260,7 +334,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { if err := task.service.pastelHandler.WaitTxidValid(ctx, task.regCascadeTxid, int64(task.service.config.CascadeRegTxMinConfirmations), time.Duration(task.service.config.WaitTxnValidInterval)*time.Second); err != nil { log.WithContext(ctx).WithError(err).Error("error waiting for Reg TXID confirmations") - return errors.Errorf("wait reg-nft ticket valid: %w", err) + return task.regCascadeTxid, "", errors.Errorf("wait reg-nft ticket valid: %w", err) } task.UpdateStatus(common.StatusTicketRegistered) task.UpdateStatus(&common.EphemeralStatus{ @@ -272,13 +346,18 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { log.WithContext(ctx).Info("Cascade Registrtion Ticket confirmed, now activating it..") + id, err := task.service.InsertActivationAttempt(types.ActivationAttempt{ + FileID: task.Request.FileID, + ActivationAttemptAt: time.Now().UTC(), + }) + task.actAttemptID = id // activate cascade ticket registered at previous step by SN activateTxID, err := task.activateActionTicket(ctx) if err != nil { log.WithContext(ctx).WithError(err).Error("error activating action ticket") task.StatusLog[common.FieldErrorDetail] = err.Error() task.UpdateStatus(common.StatusErrorActivatingTicket) - return errors.Errorf("active action ticket: %w", err) + return task.regCascadeTxid, "", errors.Errorf("active action ticket: %w", err) } task.StatusLog[common.FieldActivateTicketTxnID] = activateTxID task.UpdateStatus(&common.EphemeralStatus{ @@ -293,7 +372,7 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { time.Duration(task.service.config.WaitTxnValidInterval)*time.Second) if err != nil { log.WithContext(ctx).WithError(err).Error("error waiting for activated Reg TXID confirmations") - return errors.Errorf("wait activate txid valid: %w", err) + return task.regCascadeTxid, activateTxID, errors.Errorf("wait activate txid valid: %w", err) } task.UpdateStatus(common.StatusTicketActivated) task.UpdateStatus(&common.EphemeralStatus{ @@ -304,7 +383,8 @@ func (task *CascadeRegistrationTask) run(ctx context.Context) error { }) log.WithContext(ctx).Infof("Cascade Activation ticket is confirmed. Activation ticket txid: %s", activateTxID) - return nil + return task.regCascadeTxid, activateTxID, nil + } // sendActionMetadata sends Action Ticket metadata to supernodes diff --git a/walletnode/services/common/action_ticket.go b/walletnode/services/common/action_ticket.go index ba9c07de7..75b128b0b 100644 --- a/walletnode/services/common/action_ticket.go +++ b/walletnode/services/common/action_ticket.go @@ -1,8 +1,6 @@ package common -import ( - "github.com/pastelnetwork/gonode/common/storage/files" -) +import "github.com/pastelnetwork/gonode/common/storage/files" // ActionRegistrationRequest represents a request for the registration sense type ActionRegistrationRequest struct { @@ -24,4 +22,8 @@ type ActionRegistrationRequest struct { GroupID string `json:"group_id"` // SpendableAddress to use for registration fee SpendableAddress string `json:"spendable_address"` + // FileID to be use for registration and activation requests update + FileID string `json:"file_id"` + + RegAttemptID int64 }