diff --git a/modular/executor/execute_replicate.go b/modular/executor/execute_replicate.go index 99c147e3f..2b04eeb2c 100644 --- a/modular/executor/execute_replicate.go +++ b/modular/executor/execute_replicate.go @@ -83,7 +83,7 @@ func (e *ExecuteModular) HandleReplicatePieceTask(ctx context.Context, task core if task.GetIsAgentUpload() { expectCheckSums, makeErr := e.makeCheckSumsForAgentUpload(ctx, task.GetObjectInfo(), len(task.GetSecondaryEndpoints()), task.GetStorageParams()) if makeErr != nil { - log.CtxErrorw(ctx, "failed to makeCheckSumsForAgentUpload", "error", err) + log.CtxErrorw(ctx, "failed to makeCheckSumsForAgentUpload", "error", makeErr) err = makeErr return } diff --git a/modular/receiver/receive_task.go b/modular/receiver/receive_task.go index 6c4c7c95a..b427ed827 100644 --- a/modular/receiver/receive_task.go +++ b/modular/receiver/receive_task.go @@ -162,13 +162,34 @@ func (r *ReceiveModular) HandleDoneReceivePieceTask(ctx context.Context, task ta log.CtxErrorw(ctx, "failed to write integrity meta to db", "task", task, "error", err) return nil, ErrGfSpDBWithDetail("failed to write integrity meta to db, error: " + err.Error()) } + + // only in the case when the secondary sp is the primary SP in a GVG and it is delegated upload, it should not delete the piece hash at this moment.(will be deleted when the sealing is success) + // in all other cases, piece hash should be deleted from DB + var skipDeleteChecksum bool + if task.GetIsAgentUploadTask() { + gvg, queryErr := r.baseApp.Consensus().QueryGlobalVirtualGroup(ctx, task.GetGlobalVirtualGroupId()) + if queryErr != nil { + log.CtxErrorw(ctx, "failed to QueryGlobalVirtualGroup", "error", queryErr) + return nil, ErrGfSpDBWithDetail("failed to QueryGlobalVirtualGroup, error: " + queryErr.Error()) + } + spID, idErr := r.getSPID() + if idErr != nil { + log.CtxErrorw(ctx, "failed to getSPID", "error", idErr) + return nil, ErrGfSpDBWithDetail("failed to getSPID, error: " + idErr.Error()) + } + if spID == gvg.PrimarySpId { + skipDeleteChecksum = true + } + } deletePieceHashTime := time.Now() - if err = r.baseApp.GfSpDB().DeleteAllReplicatePieceChecksumOptimized( - task.GetObjectInfo().Id.Uint64(), task.GetRedundancyIdx()); err != nil { - log.CtxErrorw(ctx, "failed to delete all replicate piece checksum", "task", task, "error", err) - // ignore the error,let the request go, the background task will gc the meta again later - metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time"). - Observe(time.Since(deletePieceHashTime).Seconds()) + if !skipDeleteChecksum { + if err = r.baseApp.GfSpDB().DeleteAllReplicatePieceChecksumOptimized( + task.GetObjectInfo().Id.Uint64(), task.GetRedundancyIdx()); err != nil { + log.CtxErrorw(ctx, "failed to delete all replicate piece checksum", "task", task, "error", err) + // ignore the error,let the request go, the background task will gc the meta again later + metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time"). + Observe(time.Since(deletePieceHashTime).Seconds()) + } } metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time"). diff --git a/modular/receiver/receiver.go b/modular/receiver/receiver.go index 7de5db8aa..d9d1bef15 100644 --- a/modular/receiver/receiver.go +++ b/modular/receiver/receiver.go @@ -15,6 +15,8 @@ type ReceiveModular struct { baseApp *gfspapp.GfSpBaseApp scope rcmgr.ResourceScope receiveQueue taskqueue.TQueueOnStrategy + + spID uint32 } func (r *ReceiveModular) Name() string { @@ -50,3 +52,15 @@ func (r *ReceiveModular) ReserveResource(ctx context.Context, state *rcmgr.Scope func (r *ReceiveModular) ReleaseResource(ctx context.Context, span rcmgr.ResourceScopeSpan) { span.Done() } + +func (e *ReceiveModular) getSPID() (uint32, error) { + if e.spID != 0 { + return e.spID, nil + } + spInfo, err := e.baseApp.Consensus().QuerySP(context.Background(), e.baseApp.OperatorAddress()) + if err != nil { + return 0, err + } + e.spID = spInfo.GetId() + return e.spID, nil +} diff --git a/store/sqldb/object_integrity.go b/store/sqldb/object_integrity.go index d84d01774..b6725dc37 100644 --- a/store/sqldb/object_integrity.go +++ b/store/sqldb/object_integrity.go @@ -9,6 +9,7 @@ import ( "github.com/go-sql-driver/mysql" "gorm.io/gorm" + "gorm.io/gorm/clause" corespdb "github.com/bnb-chain/greenfield-storage-provider/core/spdb" "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" @@ -408,17 +409,12 @@ func (s *SpDBImpl) SetReplicatePieceChecksum(objectID uint64, segmentIdx uint32, PieceChecksum: hex.EncodeToString(checksum), Version: version, } - result = s.db.Create(insertPieceHash) - if result.Error != nil && MysqlErrCode(result.Error) == ErrDuplicateEntryCode { - // If all columns are identical to previous, the db.Save will also encounter ErrDuplicateEntryCode, then it should skip. - err = s.db.Save(insertPieceHash).Error - if MysqlErrCode(err) == ErrDuplicateEntryCode { - return nil - } - return err - } - if result.Error != nil || result.RowsAffected != 1 { - err = fmt.Errorf("failed to insert piece hash record: %s", result.Error) + result = s.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "object_id"}, {Name: "segment_index"}, {Name: "redundancy_index"}}, + UpdateAll: true, + }).Create(insertPieceHash) + if result.Error != nil { + err = fmt.Errorf("failed to insert piece hash record: %v", result.Error) return err } return nil diff --git a/store/sqldb/object_integrity_test.go b/store/sqldb/object_integrity_test.go index a760c8735..7da578f61 100644 --- a/store/sqldb/object_integrity_test.go +++ b/store/sqldb/object_integrity_test.go @@ -417,7 +417,7 @@ func TestSpDBImpl_SetReplicatePieceChecksumSuccess(t *testing.T) { ) s, mock := setupDB(t) mock.ExpectBegin() - mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`,`version`) VALUES (?,?,?,?,?)"). + mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`,`version`) VALUES (?,?,?,?,?) ON DUPLICATE KEY UPDATE `piece_checksum`=VALUES(`piece_checksum`),`version`=VALUES(`version`)"). WithArgs(objectID, segmentIdx, redundancyIdx, hex.EncodeToString(pieceChecksum), version).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() err := s.SetReplicatePieceChecksum(objectID, segmentIdx, redundancyIdx, pieceChecksum, version) @@ -435,7 +435,7 @@ func TestSpDBImpl_SetReplicatePieceChecksumFailure1(t *testing.T) { ) s, mock := setupDB(t) mock.ExpectBegin() - mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`,`version`) VALUES (?,?,?,?,?)"). + mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`,`version`) VALUES (?,?,?,?,?) ON DUPLICATE KEY UPDATE `piece_checksum`=VALUES(`piece_checksum`),`version`=VALUES(`version`)"). WithArgs(objectID, segmentIdx, redundancyIdx, hex.EncodeToString(pieceChecksum), version).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() err := s.SetReplicatePieceChecksum(objectID, segmentIdx, redundancyIdx, pieceChecksum, version) @@ -452,7 +452,7 @@ func TestSpDBImpl_SetReplicatePieceChecksumFailure2(t *testing.T) { ) s, mock := setupDB(t) mock.ExpectBegin() - mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`) VALUES (?,?,?,?)"). + mock.ExpectExec("INSERT INTO `piece_hash` (`object_id`,`segment_index`,`redundancy_index`,`piece_checksum`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `piece_checksum`=VALUES(`piece_checksum`),`version`=VALUES(`version`)"). WillReturnError(mockDBInternalError) mock.ExpectRollback() mock.ExpectCommit() diff --git a/store/sqldb/shadow_object_integrity.go b/store/sqldb/shadow_object_integrity.go index 217f819da..792e18817 100644 --- a/store/sqldb/shadow_object_integrity.go +++ b/store/sqldb/shadow_object_integrity.go @@ -110,7 +110,7 @@ func (s *SpDBImpl) ListShadowIntegrityMeta() ([]*corespdb.ShadowIntegrityMeta, e err = s.db.Table(ShadowIntegrityMetaTableName). Select("*"). Limit(ListShadowIntegrityMetaDefaultSize). - Order("object_id asc"). + Order("object_id desc"). Find(&shadowIntegrityMetas).Error for _, sim := range shadowIntegrityMetas {