diff --git a/modular/executor/execute_replicate.go b/modular/executor/execute_replicate.go index 99c147e3f..2b04eeb2c 100644 --- a/modular/executor/execute_replicate.go +++ b/modular/executor/execute_replicate.go @@ -83,7 +83,7 @@ func (e *ExecuteModular) HandleReplicatePieceTask(ctx context.Context, task core if task.GetIsAgentUpload() { expectCheckSums, makeErr := e.makeCheckSumsForAgentUpload(ctx, task.GetObjectInfo(), len(task.GetSecondaryEndpoints()), task.GetStorageParams()) if makeErr != nil { - log.CtxErrorw(ctx, "failed to makeCheckSumsForAgentUpload", "error", err) + log.CtxErrorw(ctx, "failed to makeCheckSumsForAgentUpload", "error", makeErr) err = makeErr return } diff --git a/modular/receiver/receive_task.go b/modular/receiver/receive_task.go index 6c4c7c95a..35fb8ff43 100644 --- a/modular/receiver/receive_task.go +++ b/modular/receiver/receive_task.go @@ -163,12 +163,24 @@ func (r *ReceiveModular) HandleDoneReceivePieceTask(ctx context.Context, task ta return nil, ErrGfSpDBWithDetail("failed to write integrity meta to db, error: " + err.Error()) } deletePieceHashTime := time.Now() - if err = r.baseApp.GfSpDB().DeleteAllReplicatePieceChecksumOptimized( - task.GetObjectInfo().Id.Uint64(), task.GetRedundancyIdx()); err != nil { - log.CtxErrorw(ctx, "failed to delete all replicate piece checksum", "task", task, "error", err) - // ignore the error,let the request go, the background task will gc the meta again later - metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time"). - Observe(time.Since(deletePieceHashTime).Seconds()) + gvg, queryErr := r.baseApp.Consensus().QueryGlobalVirtualGroup(ctx, task.GetGlobalVirtualGroupId()) + if queryErr != nil { + log.CtxErrorw(ctx, "failed to QueryGlobalVirtualGroup", "error", queryErr) + return nil, ErrGfSpDBWithDetail("failed to QueryGlobalVirtualGroup, error: " + queryErr.Error()) + } + spID, idErr := r.getSPID() + if idErr != nil { + log.CtxErrorw(ctx, "failed to getSPID", "error", idErr) + return nil, ErrGfSpDBWithDetail("failed to getSPID, error: " + idErr.Error()) + } + if spID != gvg.PrimarySpId { + if err = r.baseApp.GfSpDB().DeleteAllReplicatePieceChecksumOptimized( + task.GetObjectInfo().Id.Uint64(), task.GetRedundancyIdx()); err != nil { + log.CtxErrorw(ctx, "failed to delete all replicate piece checksum", "task", task, "error", err) + // ignore the error,let the request go, the background task will gc the meta again later + metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time"). + Observe(time.Since(deletePieceHashTime).Seconds()) + } } metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time"). diff --git a/modular/receiver/receiver.go b/modular/receiver/receiver.go index 7de5db8aa..d9d1bef15 100644 --- a/modular/receiver/receiver.go +++ b/modular/receiver/receiver.go @@ -15,6 +15,8 @@ type ReceiveModular struct { baseApp *gfspapp.GfSpBaseApp scope rcmgr.ResourceScope receiveQueue taskqueue.TQueueOnStrategy + + spID uint32 } func (r *ReceiveModular) Name() string { @@ -50,3 +52,15 @@ func (r *ReceiveModular) ReserveResource(ctx context.Context, state *rcmgr.Scope func (r *ReceiveModular) ReleaseResource(ctx context.Context, span rcmgr.ResourceScopeSpan) { span.Done() } + +func (e *ReceiveModular) getSPID() (uint32, error) { + if e.spID != 0 { + return e.spID, nil + } + spInfo, err := e.baseApp.Consensus().QuerySP(context.Background(), e.baseApp.OperatorAddress()) + if err != nil { + return 0, err + } + e.spID = spInfo.GetId() + return e.spID, nil +}