Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions harmony/harmonydb/sql/20251124-pdp-idempotency.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Idempotency tracking for PDP operations
CREATE TABLE pdp_idempotency (
idempotency_key TEXT PRIMARY KEY,
tx_hash TEXT, -- NULL initially, set after successful transaction
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Index for cleanup operations
CREATE INDEX pdp_idempotency_created_at_idx ON pdp_idempotency(created_at);
1 change: 1 addition & 0 deletions pdp/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func NewPDPService(ctx context.Context, db *harmonydb.DB, stor paths.StashStore,
}

go p.cleanup(ctx)
go p.startIdempotencyCleanup(ctx)
return p
}

Expand Down
53 changes: 39 additions & 14 deletions pdp/handlers_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (p *PDPService) transformAddPiecesRequest(ctx context.Context, serviceLabel

// Start a DB transaction
_, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
// Step 4: Get pdp_piecerefs matching all subPiece cids + make sure those refs belong to serviceLabel
// Get pdp_piecerefs matching all subPiece cids + make sure those refs belong to serviceLabel
rows, err := tx.Query(`
SELECT ppr.piece_cid, ppr.id AS pdp_pieceref_id, ppr.piece_ref,
pp.piece_padded_size, pp.piece_raw_size
Expand Down Expand Up @@ -255,14 +255,14 @@ func (p *PDPService) transformAddPiecesRequest(ctx context.Context, serviceLabel
func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// Step 1: Verify that the request is authorized using ECDSA JWT
// Verify that the request is authorized using ECDSA JWT
serviceLabel, err := p.AuthService(r)
if err != nil {
http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}

// Step 2: Extract dataSetId from the URL
// Extract dataSetId from the URL
dataSetIdStr := chi.URLParam(r, "dataSetId")
if dataSetIdStr == "" {
http.Error(w, "Missing data set ID in URL", http.StatusBadRequest)
Expand Down Expand Up @@ -301,12 +301,13 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ
// Convert dataSetId to *big.Int
dataSetId := new(big.Int).SetUint64(dataSetIdUint64)

// Step 3: Parse the request body
// Parse the request body

// AddPiecesPayload defines the structure for the entire add pieces request payload
type AddPiecesPayload struct {
Pieces []AddPieceRequest `json:"pieces"`
ExtraData *string `json:"extraData,omitempty"`
IdempotencyKey IdempotencyKey `json:"idempotencyKey,omitempty"`
Pieces []AddPieceRequest `json:"pieces"`
ExtraData *string `json:"extraData,omitempty"`
}

var payload AddPiecesPayload
Expand All @@ -319,6 +320,18 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ
_ = r.Body.Close()
}()

// Check or reserve idempotency key
idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, payload.IdempotencyKey)
if err != nil {
http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError)
return
}

if idempotencyResult.Exists {
p.handleAddIdempotencyResponse(w, &idempotencyResult, dataSetIdStr)
return
}

if len(payload.Pieces) == 0 {
http.Error(w, "At least one piece must be provided", http.StatusBadRequest)
return
Expand All @@ -330,22 +343,22 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ
return
}

// Step 4: Prepare piece information
// Prepare piece information
pieceDataArray, subPieceInfoMap, subPieceCidList, err := p.transformAddPiecesRequest(ctx, serviceLabel, payload.Pieces)
if err != nil {
logAdd.Warnf("Failed to process AddPieces request data: %+v", err)
http.Error(w, "Failed to process request: "+err.Error(), http.StatusBadRequest)
}

// Step 5: Prepare the Ethereum transaction data outside the DB transaction
// Prepare the Ethereum transaction data outside the DB transaction
// Obtain the ABI of the PDPVerifier contract
abiData, err := contract.PDPVerifierMetaData.GetAbi()
if err != nil {
http.Error(w, "Failed to get contract ABI: "+err.Error(), http.StatusInternalServerError)
return
}

// Step 6: Prepare the Ethereum transaction
// Prepare the Ethereum transaction
// Pack the method call data
// The extraDataBytes variable is now correctly populated above
data, err := abiData.Pack("addPieces", dataSetId, common.Address{}, pieceDataArray, extraDataBytes)
Expand All @@ -354,7 +367,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ
return
}

// Step 7: Get the sender address from 'eth_keys' table where role = 'pdp' limit 1
// Get the sender address from 'eth_keys' table where role = 'pdp' limit 1
fromAddress, err := p.getSenderAddress(ctx)
if err != nil {
http.Error(w, "Failed to get sender address: "+err.Error(), http.StatusInternalServerError)
Expand All @@ -371,7 +384,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ
data,
)

// Step 8: Check for indexing requirements
// Check for indexing requirements
mustIndex, err := CheckIfIndexingNeeded(p.ethClient, dataSetIdUint64)
if err != nil {
logAdd.Errorw("Failed to check indexing requirements", "error", err, "dataSetId", dataSetId)
Expand All @@ -382,23 +395,35 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ
logAdd.Infow("Data set has withIPFSIndexing enabled, pieces will be indexed", "dataSetId", dataSetId)
}

// Step 9: Send the transaction
// Send the transaction
reason := "pdp-addpieces"
txHash, err := p.sender.Send(ctx, fromAddress, txEth, reason)
if err != nil {
// Clean up reserved idempotency key on failure
if payload.IdempotencyKey != "" {
if cleanupErr := p.cleanupReservedIdempotencyKey(ctx, payload.IdempotencyKey); cleanupErr != nil {
logAdd.Errorw("Failed to cleanup idempotency key", "error", cleanupErr)
}
}
http.Error(w, "Failed to send transaction: "+err.Error(), http.StatusInternalServerError)
logAdd.Errorf("Failed to send transaction: %+v", err)
return
}

// Step 10: Insert database tracking records
// Insert database tracking records
txHashLower := strings.ToLower(txHash.Hex())
logAdd.Infow("PDP AddPieces: Inserting transaction tracking",
"txHash", txHashLower,
"dataSetId", dataSetIdUint64,
"pieceCount", len(payload.Pieces))

_, err = p.db.BeginTransaction(ctx, func(txdb *harmonydb.Tx) (bool, error) {
// Update idempotency key with transaction hash
if payload.IdempotencyKey != "" {
if err := p.updateIdempotencyKey(txdb, payload.IdempotencyKey, txHashLower); err != nil {
logAdd.Errorw("Failed to update idempotency key", "error", err)
}
}
// Insert into message_waits_eth
logAdd.Debugw("Inserting AddPieces into message_waits_eth",
"txHash", txHashLower,
Expand Down Expand Up @@ -435,7 +460,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ
return
}

// Step 10: Respond with 201 Created
// Respond with 201 Created
w.Header().Set("Location", path.Join("/pdp/data-sets", dataSetIdStr, "pieces/added", txHashLower))
w.WriteHeader(http.StatusCreated)
}
Expand Down
79 changes: 62 additions & 17 deletions pdp/handlers_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,38 @@ var logCreate = logger.Logger("pdp/create")
func (p *PDPService) handleCreateDataSetAndAddPieces(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// Step 1: Verify that the request is authorized using ECDSA JWT
// Verify that the request is authorized using ECDSA JWT
serviceLabel, err := p.AuthService(r)
if err != nil {
http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}

type RequestBody struct {
RecordKeeper string `json:"recordKeeper"`
Pieces []AddPieceRequest `json:"pieces"`
ExtraData *string `json:"extraData,omitempty"`
type CreateAddRequestBody struct {
Pieces []AddPieceRequest `json:"pieces"`
IdempotencyKey IdempotencyKey `json:"idempotencyKey,omitempty"`
RecordKeeper string `json:"recordKeeper"`
ExtraData *string `json:"extraData,omitempty"`
}

var reqBody RequestBody
var reqBody CreateAddRequestBody
if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
http.Error(w, "Invalid JSON in request body: "+err.Error(), http.StatusBadRequest)
return
}

// Check or reserve idempotency key
idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, reqBody.IdempotencyKey)
if err != nil {
http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError)
return
}

if idempotencyResult.Exists {
p.handleCreateIdempotencyResponse(w, &idempotencyResult)
return
}

if reqBody.RecordKeeper == "" {
http.Error(w, "recordKeeper address is required", http.StatusBadRequest)
return
Expand Down Expand Up @@ -112,6 +125,10 @@ func (p *PDPService) handleCreateDataSetAndAddPieces(w http.ResponseWriter, r *h
reason := "pdp-create-and-add"
txHash, err := p.sender.Send(ctx, fromAddress, tx, reason)
if err != nil {
// Clean up reserved idempotency key on transaction failure
if reqBody.IdempotencyKey != "" {
_ = p.cleanupReservedIdempotencyKey(ctx, reqBody.IdempotencyKey)
}
http.Error(w, "Failed to send transaction: "+err.Error(), http.StatusInternalServerError)
logCreate.Errorf("Failed to send transaction: %+v", err)
return
Expand All @@ -124,6 +141,14 @@ func (p *PDPService) handleCreateDataSetAndAddPieces(w http.ResponseWriter, r *h
"recordKeeper", recordKeeperAddr.Hex())
// Begin a database transaction
_, err = p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
// Update idempotency key with transaction hash
if reqBody.IdempotencyKey != "" {
if err := p.updateIdempotencyKey(tx, reqBody.IdempotencyKey, txHashLower); err != nil {
logCreate.Warnw("Failed to update idempotency key", "error", err)
// Don't fail the transaction for idempotency issues
}
}

err := p.insertMessageWaitsAndDataSetCreate(tx, txHashLower, serviceLabel)
if err != nil {
return false, err
Expand Down Expand Up @@ -173,17 +198,18 @@ func decodeExtraData(extraDataString *string) ([]byte, error) {
func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// Step 1: Verify that the request is authorized using ECDSA JWT
// Verify that the request is authorized using ECDSA JWT
serviceLabel, err := p.AuthService(r)
if err != nil {
http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}

// Step 2: Parse the request body to get the 'recordKeeper' address and extraData
type RequestBody struct {
RecordKeeper string `json:"recordKeeper"`
ExtraData *string `json:"extraData,omitempty"`
// Parse the request body to get the 'recordKeeper' address and extraData
type CreateDataSetRequestBody struct {
IdempotencyKey IdempotencyKey `json:"idempotencyKey,omitempty"`
RecordKeeper string `json:"recordKeeper"`
ExtraData *string `json:"extraData,omitempty"`
}

body, err := io.ReadAll(r.Body)
Expand All @@ -195,12 +221,24 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request)
_ = r.Body.Close()
}()

var reqBody RequestBody
var reqBody CreateDataSetRequestBody
if err := json.Unmarshal(body, &reqBody); err != nil {
http.Error(w, "Invalid JSON in request body: "+err.Error(), http.StatusBadRequest)
return
}

// Check or reserve idempotency key
idempotencyResult, err := p.checkOrReserveIdempotencyKey(ctx, reqBody.IdempotencyKey)
if err != nil {
http.Error(w, "Failed to check idempotency: "+err.Error(), http.StatusInternalServerError)
return
}

if idempotencyResult.Exists {
p.handleCreateIdempotencyResponse(w, &idempotencyResult)
return
}

if reqBody.RecordKeeper == "" {
http.Error(w, "recordKeeper address is required", http.StatusBadRequest)
return
Expand All @@ -225,14 +263,14 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request)
return
}

// Step 3: Get the sender address from 'eth_keys' table where role = 'pdp' limit 1
// Get the sender address from 'eth_keys' table where role = 'pdp' limit 1
fromAddress, err := p.getSenderAddress(ctx)
if err != nil {
http.Error(w, "Failed to get sender address: "+err.Error(), http.StatusInternalServerError)
return
}

// Step 4: Manually create the transaction without requiring a Signer
// Manually create the transaction without requiring a Signer
// Obtain the ABI of the PDPVerifier contract
abiData, err := contract.PDPVerifierMetaData.GetAbi()
if err != nil {
Expand All @@ -257,7 +295,7 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request)
data,
)

// Step 5: Send the transaction using SenderETH
// Send the transaction using SenderETH
reason := "pdp-mkdataset"
txHash, err := p.sender.Send(ctx, fromAddress, tx, reason)
if err != nil {
Expand All @@ -266,7 +304,7 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request)
return
}

// Step 6: Insert into message_waits_eth and pdp_data_set_creates
// Insert into message_waits_eth and pdp_data_set_creates
txHashLower := strings.ToLower(txHash.Hex())
logCreate.Infow("PDP CreateDataSet: Inserting transaction tracking",
"txHash", txHashLower,
Expand All @@ -275,6 +313,13 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request)

// Begin a database transaction
_, err = p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
// Update idempotency key with transaction hash
if reqBody.IdempotencyKey != "" {
if err := p.updateIdempotencyKey(tx, reqBody.IdempotencyKey, txHashLower); err != nil {
logCreate.Warnw("Failed to update idempotency key", "error", err)
// Don't fail the transaction for idempotency issues
}
}
err := p.insertMessageWaitsAndDataSetCreate(tx, txHashLower, serviceLabel)
if err != nil {
return false, err
Expand All @@ -288,7 +333,7 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request)
return
}

// Step 7: Respond with 201 Created and Location header
// Respond with 201 Created and Location header
w.Header().Set("Location", path.Join("/pdp/data-sets/created", txHashLower))
w.WriteHeader(http.StatusCreated)
}
Expand Down
Loading