Skip to content

Commit e2918b1

Browse files
committed
[PSL-1188] modify cascade registration endpoint with multi-volume ticket changes
1 parent c0c9710 commit e2918b1

File tree

8 files changed

+268
-78
lines changed

8 files changed

+268
-78
lines changed

common/storage/ticketstore/activation_attempts.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ import (
55
)
66

77
type ActivationAttemptsQueries interface {
8-
UpsertActivationAttempt(attempt types.ActivationAttempt) error
9-
GetActivationAttemptByID(id string) (*types.ActivationAttempt, error)
8+
UpsertActivationAttempt(attempt types.ActivationAttempt) (int64, error)
9+
GetActivationAttemptByID(id int) (*types.ActivationAttempt, error)
1010
GetActivationAttemptsByFileID(fileID string) ([]*types.ActivationAttempt, error)
1111
}
1212

1313
// UpsertActivationAttempt upsert a new activation attempt into the activation_attempts table
14-
func (s *TicketStore) UpsertActivationAttempt(attempt types.ActivationAttempt) error {
14+
func (s *TicketStore) UpsertActivationAttempt(attempt types.ActivationAttempt) (int64, error) {
1515
const upsertQuery = `
1616
INSERT INTO activation_attempts (
1717
id, file_id, activation_attempt_at, is_successful, error_message
@@ -21,26 +21,28 @@ func (s *TicketStore) UpsertActivationAttempt(attempt types.ActivationAttempt) e
2121
file_id = excluded.file_id,
2222
activation_attempt_at = excluded.activation_attempt_at,
2323
is_successful = excluded.is_successful,
24-
error_message = excluded.error_message;`
24+
error_message = excluded.error_message
25+
returning id;`
2526

26-
_, err := s.db.Exec(upsertQuery,
27+
var id int64
28+
err := s.db.QueryRow(upsertQuery,
2729
attempt.ID, attempt.FileID, attempt.ActivationAttemptAt,
28-
attempt.IsSuccessful, attempt.ErrorMessage)
30+
attempt.IsSuccessful, attempt.ErrorMessage).Scan(&id)
2931
if err != nil {
30-
return err
32+
return 0, err
3133
}
3234

33-
return nil
35+
return id, nil
3436
}
3537

3638
// GetActivationAttemptByID retrieves an activation attempt by its ID from the activation_attempts table
37-
func (s *TicketStore) GetActivationAttemptByID(id string) (*types.ActivationAttempt, error) {
39+
func (s *TicketStore) GetActivationAttemptByID(id int) (*types.ActivationAttempt, error) {
3840
const selectQuery = `
3941
SELECT id, file_id, activation_attempt_at, is_successful, error_message
4042
FROM activation_attempts
4143
WHERE id = ?;`
4244

43-
row := s.db.QueryRow(selectQuery, id)
45+
row := s.db.QueryRow(selectQuery, int64(id))
4446

4547
var attempt types.ActivationAttempt
4648
err := row.Scan(

common/storage/ticketstore/files.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type FilesQueries interface {
1414
func (s *TicketStore) UpsertFile(file types.File) error {
1515
const upsertQuery = `
1616
INSERT INTO files (
17-
file_id, upload_timestamp, path, index, base_file_id, task_id,
17+
file_id, upload_timestamp, path, file_index, base_file_id, task_id,
1818
reg_txid, activation_txid, req_burn_txn_amount, burn_txn_id,
1919
req_amount, is_concluded, cascade_metadata_ticket_id, uuid_key,
2020
hash_of_original_big_file, name_of_original_big_file_with_ext,
@@ -23,25 +23,25 @@ func (s *TicketStore) UpsertFile(file types.File) error {
2323
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
2424
ON CONFLICT(file_id)
2525
DO UPDATE SET
26-
upload_timestamp = excluded.upload_timestamp,
27-
path = excluded.path,
28-
file_index = excluded.index,
29-
base_file_id = excluded.base_file_id,
30-
task_id = excluded.task_id,
31-
reg_txid = excluded.reg_txid,
32-
activation_txid = excluded.activation_txid,
33-
req_burn_txn_amount = excluded.req_burn_txn_amount,
34-
burn_txn_id = excluded.burn_txn_id,
35-
req_amount = excluded.req_amount,
36-
is_concluded = excluded.is_concluded,
37-
cascade_metadata_ticket_id = excluded.cascade_metadata_ticket_id,
38-
uuid_key = excluded.uuid_key,
39-
hash_of_original_big_file = excluded.hash_of_original_big_file,
40-
name_of_original_big_file_with_ext = excluded.name_of_original_big_file_with_ext,
41-
size_of_original_big_file = excluded.size_of_original_big_file,
42-
data_type_of_original_big_file = excluded.data_type_of_original_big_file,
43-
start_block = excluded.start_block,
44-
done_block = excluded.done_block;`
26+
upload_timestamp = COALESCE(excluded.upload_timestamp, files.upload_timestamp),
27+
path = COALESCE(excluded.path, files.path),
28+
file_index = COALESCE(excluded.file_index, files.file_index),
29+
base_file_id = COALESCE(excluded.base_file_id, files.base_file_id),
30+
task_id = COALESCE(excluded.task_id, files.task_id),
31+
reg_txid = COALESCE(excluded.reg_txid, files.reg_txid),
32+
activation_txid = COALESCE(excluded.activation_txid, files.activation_txid),
33+
req_burn_txn_amount = COALESCE(excluded.req_burn_txn_amount, files.req_burn_txn_amount),
34+
burn_txn_id = COALESCE(excluded.burn_txn_id, files.burn_txn_id),
35+
req_amount = COALESCE(excluded.req_amount, files.req_amount),
36+
is_concluded = COALESCE(excluded.is_concluded, files.is_concluded),
37+
cascade_metadata_ticket_id = COALESCE(excluded.cascade_metadata_ticket_id, files.cascade_metadata_ticket_id),
38+
uuid_key = COALESCE(excluded.uuid_key, files.uuid_key),
39+
hash_of_original_big_file = COALESCE(excluded.hash_of_original_big_file, files.hash_of_original_big_file),
40+
name_of_original_big_file_with_ext = COALESCE(excluded.name_of_original_big_file_with_ext, files.name_of_original_big_file_with_ext),
41+
size_of_original_big_file = COALESCE(excluded.size_of_original_big_file, files.size_of_original_big_file),
42+
data_type_of_original_big_file = COALESCE(excluded.data_type_of_original_big_file, files.data_type_of_original_big_file),
43+
start_block = COALESCE(excluded.start_block, files.start_block),
44+
done_block = COALESCE(excluded.done_block, files.done_block);`
4545

4646
_, err := s.db.Exec(upsertQuery,
4747
file.FileID, file.UploadTimestamp, file.Path, file.FileIndex, file.BaseFileID, file.TaskID,

common/storage/ticketstore/registration_attempts.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,36 @@ import (
55
)
66

77
type RegistrationAttemptsQueries interface {
8-
UpsertRegistrationAttempt(attempt types.RegistrationAttempt) error
8+
UpsertRegistrationAttempt(attempt types.RegistrationAttempt) (int64, error)
99
GetRegistrationAttemptByID(id int) (*types.RegistrationAttempt, error)
1010
GetRegistrationAttemptsByFileID(fileID string) ([]*types.RegistrationAttempt, error)
1111
}
1212

1313
// UpsertRegistrationAttempt upsert a new registration attempt into the registration_attempts table
14-
func (s *TicketStore) UpsertRegistrationAttempt(attempt types.RegistrationAttempt) error {
14+
func (s *TicketStore) UpsertRegistrationAttempt(attempt types.RegistrationAttempt) (int64, error) {
1515
const upsertQuery = `
1616
INSERT INTO registration_attempts (
1717
id, file_id, reg_started_at, processor_sns, finished_at, is_successful, error_message
1818
) VALUES (?, ?, ?, ?, ?, ?, ?)
1919
ON CONFLICT(id)
2020
DO UPDATE SET
21-
file_id = excluded.file_id,
22-
reg_started_at = excluded.reg_started_at,
23-
processor_sns = excluded.processor_sns,
24-
finished_at = excluded.finished_at,
25-
is_successful = excluded.is_successful,
26-
error_message = excluded.error_message;`
21+
file_id = COALESCE(excluded.file_id, registration_attempts.file_id),
22+
reg_started_at = COALESCE(excluded.reg_started_at, registration_attempts.reg_started_at),
23+
processor_sns = COALESCE(excluded.processor_sns, registration_attempts.processor_sns),
24+
finished_at = COALESCE(excluded.finished_at, registration_attempts.finished_at),
25+
is_successful = COALESCE(excluded.is_successful, registration_attempts.is_successful),
26+
error_message = COALESCE(excluded.error_message, registration_attempts.error_message)
27+
returning id;`
2728

28-
_, err := s.db.Exec(upsertQuery,
29+
var id int64
30+
err := s.db.QueryRow(upsertQuery,
2931
attempt.ID, attempt.FileID, attempt.RegStartedAt, attempt.ProcessorSNS,
30-
attempt.FinishedAt, attempt.IsSuccessful, attempt.ErrorMessage)
32+
attempt.FinishedAt, attempt.IsSuccessful, attempt.ErrorMessage).Scan(&id)
3133
if err != nil {
32-
return err
34+
return 0, err
3335
}
3436

35-
return nil
37+
return id, nil
3638
}
3739

3840
// GetRegistrationAttemptByID retrieves a registration attempt by its ID from the registration_attempts table
@@ -43,7 +45,7 @@ func (s *TicketStore) GetRegistrationAttemptByID(id int) (*types.RegistrationAtt
4345
FROM registration_attempts
4446
WHERE id = ?;`
4547

46-
row := s.db.QueryRow(selectQuery, id)
48+
row := s.db.QueryRow(selectQuery, int64(id))
4749

4850
var attempt types.RegistrationAttempt
4951
err := row.Scan(

walletnode/api/services/cascade.go

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package services
33
import (
44
"context"
55
"fmt"
6+
"github.com/pastelnetwork/gonode/common/types"
67
"os"
78
"path/filepath"
89
"strings"
@@ -26,7 +27,8 @@ import (
2627
)
2728

2829
const (
29-
maxFileSize = 350 * 1024 * 1024 // 350MB in bytes
30+
maxFileSize = 350 * 1024 * 1024 // 350MB in bytes
31+
maxFileRegistrationAttempts = 3
3032
)
3133

3234
// CascadeAPIHandler - CascadeAPIHandler service
@@ -117,23 +119,82 @@ func (service *CascadeAPIHandler) StartProcessing(ctx context.Context, p *cascad
117119
return nil, cascade.MakeUnAuthorized(errors.New("user not authorized: invalid PastelID or Key"))
118120
}
119121

120-
taskID, err := service.register.AddTask(p)
122+
baseFile, err := service.register.GetFile(p.FileID)
121123
if err != nil {
122-
log.WithError(err).Error("unable to add task")
123124
return nil, cascade.MakeInternalServerError(err)
124125
}
125126

126-
res = &cascade.StartProcessingResult{
127-
TaskID: taskID,
128-
}
129-
130-
fileName, err := service.register.ImageHandler.FileDb.Get(p.FileID)
127+
relatedFiles, err := service.register.GetFilesByBaseFileID(baseFile.FileID)
131128
if err != nil {
132-
return nil, cascade.MakeBadRequest(errors.New("file not found, please re-upload and try again"))
129+
return nil, cascade.MakeInternalServerError(err)
133130
}
134131

135-
log.WithField("task_id", taskID).WithField("file_id", p.FileID).WithField("file_name", string(fileName)).
136-
Info("task has been added")
132+
switch {
133+
case len(relatedFiles) == 1:
134+
switch {
135+
case baseFile.IsConcluded:
136+
return nil, cascade.MakeInternalServerError(errors.New("ticket has already been registered & activated"))
137+
138+
case baseFile.RegTxid == "":
139+
baseFileRegistrationAttempts, err := service.register.GetRegistrationAttemptsByFileID(baseFile.FileID)
140+
if err != nil {
141+
return nil, cascade.MakeInternalServerError(err)
142+
}
143+
144+
switch {
145+
case len(baseFileRegistrationAttempts) > maxFileRegistrationAttempts:
146+
return nil, cascade.MakeInternalServerError(errors.New("ticket registration attempts have been exceeded"))
147+
default:
148+
regAttemptID, err := service.register.UpsertRegistrationAttempts(types.RegistrationAttempt{
149+
FileID: p.FileID,
150+
RegStartedAt: time.Now().UTC(),
151+
})
152+
if err != nil {
153+
log.WithContext(ctx).WithField("file_id", p.FileID).WithError(err).Error("error inserting registration attempt")
154+
return nil, err
155+
}
156+
157+
taskID, err := service.register.AddTask(p, regAttemptID)
158+
if err != nil {
159+
log.WithError(err).Error("unable to add task")
160+
return nil, cascade.MakeInternalServerError(err)
161+
}
162+
163+
res = &cascade.StartProcessingResult{
164+
TaskID: taskID,
165+
}
166+
167+
fileName, err := service.register.ImageHandler.FileDb.Get(p.FileID)
168+
if err != nil {
169+
return nil, cascade.MakeBadRequest(errors.New("file not found, please re-upload and try again"))
170+
}
171+
172+
log.WithField("task_id", taskID).WithField("file_id", p.FileID).WithField("file_name", string(fileName)).
173+
Info("task has been added")
174+
175+
file, err := service.register.GetFile(p.FileID)
176+
if err != nil {
177+
log.WithContext(ctx).WithError(err).Error("error retrieving file")
178+
return nil, cascade.MakeBadRequest(errors.New("file not found, please re-upload and try again"))
179+
}
180+
181+
file.TaskID = taskID
182+
file.BurnTxnID = p.BurnTxid
183+
err = service.register.UpsertFile(*file)
184+
if err != nil {
185+
log.WithField("task_id", taskID).WithField("file_id", p.FileID).
186+
WithField("burn_txid", p.BurnTxid).
187+
WithField("file_name", string(fileName)).
188+
Errorf("Error in file upsert: %v", err.Error())
189+
return nil, cascade.MakeInternalServerError(errors.New("Error in file upsert"))
190+
}
191+
}
192+
default:
193+
// Activation code
194+
}
195+
case len(relatedFiles) > 1:
196+
197+
}
137198

138199
return res, nil
139200
}

walletnode/services/cascaderegister/service.go

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,9 @@ func (service *CascadeRegistrationService) ValidateUser(ctx context.Context, id
9191
}
9292

9393
// AddTask create ticket request and start a new task with the given payload
94-
func (service *CascadeRegistrationService) AddTask(p *cascade.StartProcessingPayload) (string, error) {
94+
func (service *CascadeRegistrationService) AddTask(p *cascade.StartProcessingPayload, regAttemptID int64) (string, error) {
9595
request := FromStartProcessingPayload(p)
96+
request.RegAttemptID = regAttemptID
9697

9798
// get image filename from storage based on image_id
9899
filename, err := service.ImageHandler.FileDb.Get(p.FileID)
@@ -143,16 +144,16 @@ func (service *CascadeRegistrationService) StoreFileMetadata(ctx context.Context
143144
return errors.Errorf("cannot get block count: %w", err)
144145
}
145146

146-
basefileID := uuid.NewString()
147+
fileUUID := uuid.NewString()
147148
err = service.ticketDB.UpsertFile(types.File{
148-
FileID: basefileID,
149+
FileID: m.TaskID,
149150
UploadTimestamp: time.Now().UTC(),
150151
FileIndex: "00",
151-
BaseFileID: basefileID,
152+
BaseFileID: m.TaskID,
152153
TaskID: m.TaskID,
153154
ReqBurnTxnAmount: m.ReqPreBurnAmount,
154155
ReqAmount: m.TotalEstimatedFee,
155-
UUIDKey: basefileID,
156+
UUIDKey: fileUUID,
156157
HashOfOriginalBigFile: utils.GetHashStringFromBytes(m.UploadAssetReq.Bytes),
157158
NameOfOriginalBigFileWithExt: *m.UploadAssetReq.Filename,
158159
SizeOfOriginalBigFile: utils.GetFileSizeInMB(m.UploadAssetReq.Bytes),
@@ -197,6 +198,46 @@ func (service *CascadeRegistrationService) GetRegistrationAttemptsByFileID(fileI
197198
return registrationAttempts, nil
198199
}
199200

201+
func (service *CascadeRegistrationService) GetRegistrationAttemptsByID(attemptID int) (*types.RegistrationAttempt, error) {
202+
registrationAttempt, err := service.ticketDB.GetRegistrationAttemptByID(attemptID)
203+
if err != nil {
204+
return nil, err
205+
}
206+
return registrationAttempt, nil
207+
}
208+
209+
func (service *CascadeRegistrationService) GetActivationAttemptByID(attemptID int) (*types.ActivationAttempt, error) {
210+
actAttempt, err := service.ticketDB.GetActivationAttemptByID(attemptID)
211+
if err != nil {
212+
return nil, err
213+
}
214+
return actAttempt, nil
215+
}
216+
217+
func (service *CascadeRegistrationService) UpsertRegistrationAttempts(regAttempt types.RegistrationAttempt) (int64, error) {
218+
id, err := service.ticketDB.UpsertRegistrationAttempt(regAttempt)
219+
if err != nil {
220+
return 0, err
221+
}
222+
return id, nil
223+
}
224+
225+
func (service *CascadeRegistrationService) UpsertActivationAttempts(actAttempt types.ActivationAttempt) (int64, error) {
226+
id, err := service.ticketDB.UpsertActivationAttempt(actAttempt)
227+
if err != nil {
228+
return 0, err
229+
}
230+
return id, nil
231+
}
232+
233+
func (service *CascadeRegistrationService) UpsertFile(file types.File) error {
234+
err := service.ticketDB.UpsertFile(file)
235+
if err != nil {
236+
return err
237+
}
238+
return nil
239+
}
240+
200241
// NewService returns a new Service instance
201242
func NewService(config *Config, pastelClient pastel.Client, nodeClient node.ClientInterface,
202243
fileStorage storage.FileStorageInterface, db storage.KeyValue,

0 commit comments

Comments
 (0)