Skip to content

Commit

Permalink
[PSL-1211] implement cron-job to restore failed volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
j-rafique committed Jul 12, 2024
1 parent a91fabc commit be74c2b
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 171 deletions.
56 changes: 50 additions & 6 deletions common/storage/ticketstore/files.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package ticketstore

import (
"github.com/pastelnetwork/gonode/common/types"
"time"

"github.com/pastelnetwork/gonode/common/types"
)

type FilesQueries interface {
Expand All @@ -11,6 +12,7 @@ type FilesQueries interface {
GetFilesByBaseFileID(baseFileID string) ([]*types.File, error)
GetFileByTaskID(taskID string) (*types.File, error)
GetFilesByBaseFileIDAndConcludedCheck(baseFileID string, isConcluded bool) ([]*types.File, error)
GetUnCompletedFiles() ([]*types.File, error)
}

// UpsertFile inserts a new file into the files table
Expand All @@ -22,9 +24,9 @@ func (s *TicketStore) UpsertFile(file types.File) error {
req_amount, is_concluded, cascade_metadata_ticket_id, uuid_key,
hash_of_original_big_file, name_of_original_big_file_with_ext,
size_of_original_big_file, data_type_of_original_big_file,
start_block, done_block, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(file_id)
start_block, done_block, pastel_id, passphrase, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(file_id, base_file_id)
DO UPDATE SET
upload_timestamp = COALESCE(excluded.upload_timestamp, files.upload_timestamp),
path = COALESCE(excluded.path, files.path),
Expand All @@ -44,15 +46,17 @@ func (s *TicketStore) UpsertFile(file types.File) error {
size_of_original_big_file = COALESCE(excluded.size_of_original_big_file, files.size_of_original_big_file),
data_type_of_original_big_file = COALESCE(excluded.data_type_of_original_big_file, files.data_type_of_original_big_file),
start_block = COALESCE(excluded.start_block, files.start_block),
done_block = COALESCE(excluded.done_block, files.done_block);`
done_block = COALESCE(excluded.done_block, files.done_block),
pastel_id = COALESCE(excluded.pastel_id, files.pastel_id),
passphrase = COALESCE(excluded.passphrase, files.passphrase);`

_, err := s.db.Exec(upsertQuery,
file.FileID, file.UploadTimestamp, file.Path, file.FileIndex, file.BaseFileID, file.TaskID,
file.RegTxid, file.ActivationTxid, file.ReqBurnTxnAmount, file.BurnTxnID,
file.ReqAmount, file.IsConcluded, file.CascadeMetadataTicketID, file.UUIDKey,
file.HashOfOriginalBigFile, file.NameOfOriginalBigFileWithExt,
file.SizeOfOriginalBigFile, file.DataTypeOfOriginalBigFile,
file.StartBlock, file.DoneBlock, time.Now().UTC(), time.Now().UTC())
file.StartBlock, file.DoneBlock, file.PastelID, file.Passphrase, time.Now().UTC(), time.Now().UTC())
if err != nil {
return err
}
Expand Down Expand Up @@ -199,3 +203,43 @@ func (s *TicketStore) GetFilesByBaseFileIDAndConcludedCheck(baseFileID string, i

return files, nil
}

func (s *TicketStore) GetUnCompletedFiles() ([]*types.File, error) {
const selectQuery = `
SELECT file_id, upload_timestamp, path, file_index, base_file_id, task_id,
reg_txid, activation_txid, req_burn_txn_amount, burn_txn_id,
req_amount, is_concluded, cascade_metadata_ticket_id, uuid_key,
hash_of_original_big_file, name_of_original_big_file_with_ext,
size_of_original_big_file, data_type_of_original_big_file,
pastel_id, passphrase, start_block, done_block
FROM files
WHERE is_concluded = false;`

rows, err := s.db.Query(selectQuery)
if err != nil {
return nil, err
}
defer rows.Close()

var files []*types.File
for rows.Next() {
var file types.File
err := rows.Scan(
&file.FileID, &file.UploadTimestamp, &file.Path, &file.FileIndex, &file.BaseFileID, &file.TaskID,
&file.RegTxid, &file.ActivationTxid, &file.ReqBurnTxnAmount, &file.BurnTxnID,
&file.ReqAmount, &file.IsConcluded, &file.CascadeMetadataTicketID, &file.UUIDKey,
&file.HashOfOriginalBigFile, &file.NameOfOriginalBigFileWithExt,
&file.SizeOfOriginalBigFile, &file.DataTypeOfOriginalBigFile, &file.PastelID, &file.Passphrase,
&file.StartBlock, &file.DoneBlock)
if err != nil {
return nil, err
}
files = append(files, &file)
}

if err = rows.Err(); err != nil {
return nil, err
}

return files, nil
}
17 changes: 17 additions & 0 deletions common/storage/ticketstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ CREATE TABLE IF NOT EXISTS multi_vol_cascade_tickets_txid_map (
);
`

const alterFilesTablePastelID string = `
ALTER TABLE files
ADD COLUMN pastel_id TEXT;
`

const alterFilesTablePassphrase string = `
ALTER TABLE files
ADD COLUMN passphrase TEXT;
`

const addUniqueConstraint string = `
CREATE UNIQUE INDEX IF NOT EXISTS files_unique ON files(file_id, base_file_id);
`

const (
ticketDBName = "ticket.db"
)
Expand Down Expand Up @@ -115,6 +129,9 @@ func OpenTicketingDb() (TicketStorageInterface, error) {
}

_, _ = db.Exec(createMultiVolCascadeTicketTxIDMap)
_, _ = db.Exec(alterFilesTablePastelID)
_, _ = db.Exec(alterFilesTablePassphrase)
_, _ = db.Exec(addUniqueConstraint)

pragmas := []string{
"PRAGMA synchronous=NORMAL;",
Expand Down
2 changes: 2 additions & 0 deletions common/types/ticket.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type File struct {
DataTypeOfOriginalBigFile string
StartBlock int32
DoneBlock int
PastelID string
Passphrase string
}

type Files []*File
Expand Down
156 changes: 2 additions & 154 deletions walletnode/api/services/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,6 @@ func (service *CascadeAPIHandler) StartProcessing(ctx context.Context, p *cascad
return nil, cascade.MakeBadRequest(errors.New("provided burn txids and no of volumes are not equal"))
}

if isDuplicateExists(p.BurnTxids) {
log.WithContext(ctx).WithField("related_volumes", len(relatedFiles)).
WithField("burn_txids", len(p.BurnTxids)).
Error("duplicate burn tx-ids exist")
return nil, cascade.MakeBadRequest(errors.New("duplicate burn tx-ids exist"))
}

sortedBurnTxids, err := service.register.SortBurnTxIDs(ctx, p.BurnTxids)
if err != nil {
return nil, cascade.MakeInternalServerError(err)
Expand Down Expand Up @@ -566,144 +559,12 @@ func (service *CascadeAPIHandler) Restore(ctx context.Context, p *cascade.Restor
return nil, cascade.MakeUnAuthorized(errors.New("user not authorized: invalid PastelID or Key"))
}

// Get Concluded volumes
volumes, err := service.register.GetFilesByBaseFileID(p.BaseFileID)
if err != nil {
return nil, cascade.MakeInternalServerError(err)
}

if len(volumes) == 0 {
return nil, cascade.MakeInternalServerError(errors.New("no volumes associated with the base-file-id to recover"))
}

log.WithContext(ctx).WithField("total_volumes", len(volumes)).
Info("total volumes retrieved from the base file-id")

unConcludedVolumes, err := volumes.GetUnconcludedFiles()
if err != nil {
return nil, cascade.MakeInternalServerError(err)
}

// check if volumes already concluded
if len(unConcludedVolumes) == 0 {
log.WithContext(ctx).
Info("All volumes are already concluded")
return &cascade.RestoreFile{
TotalVolumes: len(volumes),
RegisteredVolumes: len(volumes),
VolumesWithPendingRegistration: 0,
VolumesRegistrationInProgress: 0,
ActivatedVolumes: len(volumes),
VolumesActivatedInRecoveryFlow: 0,
}, nil

}

var (
registeredVolumes int
volumesWithInProgressRegCount int
volumesWithPendingRegistration int
volumesActivatedInRecoveryFlow int
activatedVolumes int
)

// Get BurnTxId by file amount and process un-concluded files that are un-registered or un-activated

logger := log.WithContext(ctx).WithField("base_file_id", p.BaseFileID)

for _, v := range volumes {
if v.RegTxid == "" {
volumesWithPendingRegistration++
logger.WithField("volume_name", v.FileID).Info("find a volume with no registration, trying again...")

var burnTxId string
if service.IsBurnTxIDValidForRecovery(ctx, v.BurnTxnID, v.ReqAmount-10) {
log.WithContext(ctx).WithField("burn_txid", v.BurnTxnID).Info("existing burn-txid is valid")
burnTxId = v.BurnTxnID
} else {
log.WithContext(ctx).WithField("burn_txid", v.BurnTxnID).Info("existing burn-txid is not valid, burning the new txid")

burnTxId, err = service.register.GetBurnTxIdByAmount(ctx, int64(v.ReqBurnTxnAmount))
if err != nil {
log.WithContext(ctx).WithField("amount", int64(v.ReqBurnTxnAmount)).WithError(err).Error("error getting burn TxId for amount")
return nil, cascade.MakeInternalServerError(err)
}

logger.WithField("volume_name", v.FileID).Info("estimated fee has been burned, sending for registration")
}

addTaskPayload := &common.AddTaskPayload{
FileID: v.FileID,
BurnTxid: &burnTxId,
AppPastelID: p.AppPastelID,
MakePubliclyAccessible: p.MakePubliclyAccessible,
Key: p.Key,
}

if p.SpendableAddress != nil {
addTaskPayload.SpendableAddress = p.SpendableAddress
}

_, err = service.register.ProcessFile(ctx, *v, addTaskPayload)
if err != nil {
log.WithContext(ctx).WithField("file_id", v.FileID).WithError(err).Error("error processing un-registered volume")
continue
}
logger.WithField("volume_name", v.FileID).Info("task added for registration recovery")

volumesWithInProgressRegCount += 1
} else if v.ActivationTxid == "" {
logger.WithField("volume_name", v.FileID).Info("find a volume with no activation, trying again...")

// activation code
actAttemptId, err := service.register.InsertActivationAttempt(types.ActivationAttempt{
FileID: v.FileID,
ActivationAttemptAt: time.Now().UTC(),
})
if err != nil {
log.WithContext(ctx).WithField("file_id", v.FileID).WithError(err).Error("error inserting activation attempt")
continue
}

activateActReq := pastel.ActivateActionRequest{
RegTxID: v.RegTxid,
Fee: int64(v.ReqAmount) - 10,
PastelID: p.AppPastelID,
Passphrase: p.Key,
}

err = service.register.ActivateActionTicketAndRegisterVolumeTicket(ctx, activateActReq, *v, actAttemptId)
if err != nil {
log.WithContext(ctx).WithField("file_id", v.FileID).WithError(err).Error("error activating or registering un-concluded volume")
continue
}
logger.WithField("volume_name", v.FileID).Info("request has been sent for activation")

volumesActivatedInRecoveryFlow += 1
} else {
if v.RegTxid != "" {
registeredVolumes += 1
}
if v.ActivationTxid != "" {
activatedVolumes += 1
}
}
}

// only set base file txId return by pastel register all else remains nil
_, err = service.register.RegisterVolumeTicket(ctx, p.BaseFileID)
restoreFile, err := service.register.RestoreFile(ctx, p)
if err != nil {
return nil, err
}

return &cascade.RestoreFile{
TotalVolumes: len(volumes),
RegisteredVolumes: registeredVolumes,
VolumesWithPendingRegistration: volumesWithPendingRegistration,
VolumesRegistrationInProgress: volumesWithInProgressRegCount,
ActivatedVolumes: activatedVolumes,
VolumesActivatedInRecoveryFlow: volumesActivatedInRecoveryFlow,
}, nil
return restoreFile, nil
}

func (service *CascadeAPIHandler) checkBurnTxIDsValidForRegistration(ctx context.Context, burnTxIDs []string, files types.Files) error {
Expand All @@ -727,19 +588,6 @@ func (service *CascadeAPIHandler) checkBurnTxIDsValidForRegistration(ctx context
return nil
}

func (service *CascadeAPIHandler) IsBurnTxIDValidForRecovery(ctx context.Context, burnTxID string, estimatedFee float64) bool {
if err := service.register.CheckBurnTxIDTicketDuplication(ctx, burnTxID); err != nil {
return false
}

err := service.register.ValidateBurnTxn(ctx, burnTxID, estimatedFee)
if err != nil {
return false
}

return true
}

// NewCascadeAPIHandler returns the swagger OpenAPI implementation.
func NewCascadeAPIHandler(config *Config, filesMap *sync.Map, register *cascaderegister.CascadeRegistrationService, download *download.NftDownloadingService) *CascadeAPIHandler {
return &CascadeAPIHandler{
Expand Down
Loading

0 comments on commit be74c2b

Please sign in to comment.