Skip to content

Commit

Permalink
[PSL-1188] modify cascade registration endpoint with multi-volume tic…
Browse files Browse the repository at this point in the history
…ket changes
  • Loading branch information
j-rafique committed Jun 24, 2024
1 parent a25cb63 commit 2d71409
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 105 deletions.
61 changes: 40 additions & 21 deletions common/storage/ticketstore/activation_attempts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,61 @@ import (
)

type ActivationAttemptsQueries interface {
UpsertActivationAttempt(attempt types.ActivationAttempt) error
GetActivationAttemptByID(id string) (*types.ActivationAttempt, error)
InsertActivationAttempt(attempt types.ActivationAttempt) (int64, error)
UpdateActivationAttempt(attempt types.ActivationAttempt) (int64, error)
GetActivationAttemptByID(id int) (*types.ActivationAttempt, error)
GetActivationAttemptsByFileID(fileID string) ([]*types.ActivationAttempt, error)
}

// UpsertActivationAttempt upsert a new activation attempt into the activation_attempts table
func (s *TicketStore) UpsertActivationAttempt(attempt types.ActivationAttempt) error {
const upsertQuery = `
// InsertActivationAttempt insert a new activation attempt into the activation_attempts table
func (s *TicketStore) InsertActivationAttempt(attempt types.ActivationAttempt) (int64, error) {
const insertQuery = `
INSERT INTO activation_attempts (
id, file_id, activation_attempt_at, is_successful, error_message
) VALUES (?, ?, ?, ?, ?)
ON CONFLICT(id)
DO UPDATE SET
file_id = excluded.file_id,
activation_attempt_at = excluded.activation_attempt_at,
is_successful = excluded.is_successful,
error_message = excluded.error_message;`

_, err := s.db.Exec(upsertQuery,
attempt.ID, attempt.FileID, attempt.ActivationAttemptAt,
attempt.IsSuccessful, attempt.ErrorMessage)
file_id, activation_attempt_at, is_successful, error_message
) VALUES (?, ?, ?, ?)
RETURNING id;`

var id int64
err := s.db.QueryRow(insertQuery,
attempt.FileID, attempt.ActivationAttemptAt,
attempt.IsSuccessful, attempt.ErrorMessage).Scan(&id)
if err != nil {
return 0, err
}

return id, nil
}

// UpdateActivationAttempt update a new activation attempt into the activation_attempts table
func (s *TicketStore) UpdateActivationAttempt(attempt types.ActivationAttempt) (int64, error) {
const updateQuery = `
UPDATE activation_attempts
SET activation_attempt_at = ?, is_successful = ?, error_message = ?
WHERE id = ? AND file_id = ?
RETURNING id`

var id int64
err := s.db.QueryRow(updateQuery,
attempt.ActivationAttemptAt,
attempt.IsSuccessful,
attempt.ErrorMessage,
attempt.ID,
attempt.FileID).Scan(&id)
if err != nil {
return err
return 0, err
}

return nil
return id, nil
}

// GetActivationAttemptByID retrieves an activation attempt by its ID from the activation_attempts table
func (s *TicketStore) GetActivationAttemptByID(id string) (*types.ActivationAttempt, error) {
func (s *TicketStore) GetActivationAttemptByID(id int) (*types.ActivationAttempt, error) {
const selectQuery = `
SELECT id, file_id, activation_attempt_at, is_successful, error_message
FROM activation_attempts
WHERE id = ?;`

row := s.db.QueryRow(selectQuery, id)
row := s.db.QueryRow(selectQuery, int64(id))

var attempt types.ActivationAttempt
err := row.Scan(
Expand Down
68 changes: 49 additions & 19 deletions common/storage/ticketstore/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type FilesQueries interface {
UpsertFile(file types.File) error
GetFileByID(fileID string) (*types.File, error)
GetFilesByBaseFileID(baseFileID string) ([]*types.File, error)
GetFileByTaskID(taskID string) (*types.File, error)
}

// UpsertFile inserts a new file into the files table
Expand All @@ -23,25 +24,25 @@ func (s *TicketStore) UpsertFile(file types.File) error {
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(file_id)
DO UPDATE SET
upload_timestamp = excluded.upload_timestamp,
path = excluded.path,
file_index = excluded.file_index,
base_file_id = excluded.base_file_id,
task_id = excluded.task_id,
reg_txid = excluded.reg_txid,
activation_txid = excluded.activation_txid,
req_burn_txn_amount = excluded.req_burn_txn_amount,
burn_txn_id = excluded.burn_txn_id,
req_amount = excluded.req_amount,
is_concluded = excluded.is_concluded,
cascade_metadata_ticket_id = excluded.cascade_metadata_ticket_id,
uuid_key = excluded.uuid_key,
hash_of_original_big_file = excluded.hash_of_original_big_file,
name_of_original_big_file_with_ext = excluded.name_of_original_big_file_with_ext,
size_of_original_big_file = excluded.size_of_original_big_file,
data_type_of_original_big_file = excluded.data_type_of_original_big_file,
start_block = excluded.start_block,
done_block = excluded.done_block;`
upload_timestamp = COALESCE(excluded.upload_timestamp, files.upload_timestamp),
path = COALESCE(excluded.path, files.path),
file_index = COALESCE(excluded.file_index, files.file_index),
base_file_id = COALESCE(excluded.base_file_id, files.base_file_id),
task_id = COALESCE(excluded.task_id, files.task_id),
reg_txid = COALESCE(excluded.reg_txid, files.reg_txid),
activation_txid = COALESCE(excluded.activation_txid, files.activation_txid),
req_burn_txn_amount = COALESCE(excluded.req_burn_txn_amount, files.req_burn_txn_amount),
burn_txn_id = COALESCE(excluded.burn_txn_id, files.burn_txn_id),
req_amount = COALESCE(excluded.req_amount, files.req_amount),
is_concluded = COALESCE(excluded.is_concluded, files.is_concluded),
cascade_metadata_ticket_id = COALESCE(excluded.cascade_metadata_ticket_id, files.cascade_metadata_ticket_id),
uuid_key = COALESCE(excluded.uuid_key, files.uuid_key),
hash_of_original_big_file = COALESCE(excluded.hash_of_original_big_file, files.hash_of_original_big_file),
name_of_original_big_file_with_ext = COALESCE(excluded.name_of_original_big_file_with_ext, files.name_of_original_big_file_with_ext),
size_of_original_big_file = COALESCE(excluded.size_of_original_big_file, files.size_of_original_big_file),
data_type_of_original_big_file = COALESCE(excluded.data_type_of_original_big_file, files.data_type_of_original_big_file),
start_block = COALESCE(excluded.start_block, files.start_block),
done_block = COALESCE(excluded.done_block, files.done_block);`

_, err := s.db.Exec(upsertQuery,
file.FileID, file.UploadTimestamp, file.Path, file.FileIndex, file.BaseFileID, file.TaskID,
Expand Down Expand Up @@ -86,6 +87,35 @@ func (s *TicketStore) GetFileByID(fileID string) (*types.File, error) {
return &file, nil
}

// GetFileByTaskID retrieves a file by its task-id from the files table
func (s *TicketStore) GetFileByTaskID(taskID string) (*types.File, error) {
const selectQuery = `
SELECT file_id, upload_timestamp, path, file_index, base_file_id, task_id,
reg_txid, activation_txid, req_burn_txn_amount, burn_txn_id,
req_amount, is_concluded, cascade_metadata_ticket_id, uuid_key,
hash_of_original_big_file, name_of_original_big_file_with_ext,
size_of_original_big_file, data_type_of_original_big_file,
start_block, done_block
FROM files
WHERE task_id = ?;`

row := s.db.QueryRow(selectQuery, taskID)

var file types.File
err := row.Scan(
&file.FileID, &file.UploadTimestamp, &file.Path, &file.FileIndex, &file.BaseFileID, &file.TaskID,
&file.RegTxid, &file.ActivationTxid, &file.ReqBurnTxnAmount, &file.BurnTxnID,
&file.ReqAmount, &file.IsConcluded, &file.CascadeMetadataTicketID, &file.UUIDKey,
&file.HashOfOriginalBigFile, &file.NameOfOriginalBigFileWithExt,
&file.SizeOfOriginalBigFile, &file.DataTypeOfOriginalBigFile,
&file.StartBlock, &file.DoneBlock)
if err != nil {
return nil, err
}

return &file, nil
}

// GetFilesByBaseFileID retrieves files by base_file_id from the files table
func (s *TicketStore) GetFilesByBaseFileID(baseFileID string) ([]*types.File, error) {
const selectQuery = `
Expand Down
61 changes: 40 additions & 21 deletions common/storage/ticketstore/registration_attempts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,53 @@ import (
)

type RegistrationAttemptsQueries interface {
UpsertRegistrationAttempt(attempt types.RegistrationAttempt) error
InsertRegistrationAttempt(attempt types.RegistrationAttempt) (int64, error)
UpdateRegistrationAttempt(attempt types.RegistrationAttempt) (int64, error)
GetRegistrationAttemptByID(id int) (*types.RegistrationAttempt, error)
GetRegistrationAttemptsByFileID(fileID string) ([]*types.RegistrationAttempt, error)
}

// UpsertRegistrationAttempt upsert a new registration attempt into the registration_attempts table
func (s *TicketStore) UpsertRegistrationAttempt(attempt types.RegistrationAttempt) error {
const upsertQuery = `
// InsertRegistrationAttempt insert a new registration attempt into the registration_attempts table
func (s *TicketStore) InsertRegistrationAttempt(attempt types.RegistrationAttempt) (int64, error) {
const insertQuery = `
INSERT INTO registration_attempts (
id, file_id, reg_started_at, processor_sns, finished_at, is_successful, error_message
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id)
DO UPDATE SET
file_id = excluded.file_id,
reg_started_at = excluded.reg_started_at,
processor_sns = excluded.processor_sns,
finished_at = excluded.finished_at,
is_successful = excluded.is_successful,
error_message = excluded.error_message;`

_, err := s.db.Exec(upsertQuery,
attempt.ID, attempt.FileID, attempt.RegStartedAt, attempt.ProcessorSNS,
attempt.FinishedAt, attempt.IsSuccessful, attempt.ErrorMessage)
file_id, reg_started_at, processor_sns, finished_at, is_successful, error_message
) VALUES (?, ?, ?, ?, ?, ?)
RETURNING id;`

var id int64
err := s.db.QueryRow(insertQuery,
attempt.FileID, attempt.RegStartedAt, attempt.ProcessorSNS,
attempt.FinishedAt, attempt.IsSuccessful, attempt.ErrorMessage).Scan(&id)
if err != nil {
return 0, err
}

return id, nil
}

// UpdateRegistrationAttempt update a new registration attempt into the registration_attempts table
func (s *TicketStore) UpdateRegistrationAttempt(attempt types.RegistrationAttempt) (int64, error) {
const updateQuery = `
UPDATE registration_attempts
SET reg_started_at = ?,
processor_sns = ?,
finished_at = ?,
is_successful = ?,
error_message = ?
WHERE id = ? AND file_id = ?
RETURNING id;`

var id int64
err := s.db.QueryRow(updateQuery,
attempt.RegStartedAt, attempt.ProcessorSNS, attempt.FinishedAt,
attempt.IsSuccessful, attempt.ErrorMessage,
attempt.ID, attempt.FileID).Scan(&id)
if err != nil {
return err
return 0, err
}

return nil
return id, nil
}

// GetRegistrationAttemptByID retrieves a registration attempt by its ID from the registration_attempts table
Expand All @@ -43,7 +62,7 @@ func (s *TicketStore) GetRegistrationAttemptByID(id int) (*types.RegistrationAtt
FROM registration_attempts
WHERE id = ?;`

row := s.db.QueryRow(selectQuery, id)
row := s.db.QueryRow(selectQuery, int64(id))

var attempt types.RegistrationAttempt
err := row.Scan(
Expand Down
1 change: 1 addition & 0 deletions common/storage/ticketstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (s *TicketStore) CloseTicketDB(ctx context.Context) {
// OpenTicketingDb opens ticket DB
func OpenTicketingDb() (TicketStorageInterface, error) {
dbFile := filepath.Join(configurer.DefaultPath(), ticketDBName)

db, err := sqlx.Connect("sqlite3", dbFile)
if err != nil {
return nil, fmt.Errorf("cannot open sqlite database: %w", err)
Expand Down
12 changes: 12 additions & 0 deletions common/types/ticket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type File struct {
DoneBlock int
}

type Files []*File

type RegistrationAttempt struct {
ID int
FileID string
Expand All @@ -44,3 +46,13 @@ type ActivationAttempt struct {
IsSuccessful bool
ErrorMessage string
}

func (fs Files) GetBase() *File {
for _, f := range fs {
if f.FileIndex == "0" {
return f
}
}

return nil
}
72 changes: 62 additions & 10 deletions walletnode/api/services/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package services
import (
"context"
"fmt"
"github.com/pastelnetwork/gonode/common/types"
"os"
"path/filepath"
"strings"
Expand All @@ -25,6 +26,11 @@ import (
"goa.design/goa/v3/security"
)

const (
maxFileSize = 350 * 1024 * 1024 // 350MB in bytes
maxFileRegistrationAttempts = 3
)

// CascadeAPIHandler - CascadeAPIHandler service
type CascadeAPIHandler struct {
*Common
Expand Down Expand Up @@ -107,23 +113,69 @@ func (service *CascadeAPIHandler) StartProcessing(ctx context.Context, p *cascad
return nil, cascade.MakeUnAuthorized(errors.New("user not authorized: invalid PastelID or Key"))
}

taskID, err := service.register.AddTask(p)
relatedFiles, err := service.register.GetFilesByBaseFileID(p.FileID)
if err != nil {
log.WithError(err).Error("unable to add task")
return nil, cascade.MakeInternalServerError(err)
}
log.WithContext(ctx).WithField("total_volumes", len(relatedFiles)).Info("related volumes retrieved from the base file-id")

res = &cascade.StartProcessingResult{
TaskID: taskID,
baseFile := relatedFiles.GetBase()
if baseFile == nil {
return nil, cascade.MakeInternalServerError(err)
}

fileName, err := service.register.ImageHandler.FileDb.Get(p.FileID)
if err != nil {
return nil, cascade.MakeBadRequest(errors.New("file not found, please re-upload and try again"))
}
switch {
case len(relatedFiles) == 1:
switch {
case baseFile.IsConcluded:
return nil, cascade.MakeInternalServerError(errors.New("ticket has already been registered & activated"))

log.WithField("task_id", taskID).WithField("file_id", p.FileID).WithField("file_name", string(fileName)).
Info("task has been added")
case baseFile.RegTxid == "":
baseFileRegistrationAttempts, err := service.register.GetRegistrationAttemptsByFileID(baseFile.FileID)
if err != nil {
return nil, cascade.MakeInternalServerError(err)
}

switch {
case len(baseFileRegistrationAttempts) > maxFileRegistrationAttempts:
return nil, cascade.MakeInternalServerError(errors.New("ticket registration attempts have been exceeded"))
default:
regAttemptID, err := service.register.InsertRegistrationAttempts(types.RegistrationAttempt{
FileID: p.FileID,
RegStartedAt: time.Now().UTC(),
})
if err != nil {
log.WithContext(ctx).WithField("file_id", p.FileID).WithError(err).Error("error inserting registration attempt")
return nil, err
}

taskID, err := service.register.AddTask(p, regAttemptID, baseFile.FileID)
if err != nil {
log.WithError(err).Error("unable to add task")
return nil, cascade.MakeInternalServerError(err)
}

res = &cascade.StartProcessingResult{
TaskID: taskID,
}

baseFile.TaskID = taskID
baseFile.BurnTxnID = p.BurnTxid
err = service.register.UpsertFile(*baseFile)
if err != nil {
log.WithField("task_id", taskID).WithField("file_id", p.FileID).
WithField("burn_txid", p.BurnTxid).
WithField("file_name", baseFile.FileID).
Errorf("Error in file upsert: %v", err.Error())
return nil, cascade.MakeInternalServerError(errors.New("Error in file upsert"))
}
}
default:
// Activation code
}
case len(relatedFiles) > 1:

}

return res, nil
}
Expand Down
Loading

0 comments on commit 2d71409

Please sign in to comment.