Skip to content

Commit

Permalink
fix: sealv2 bug
Browse files Browse the repository at this point in the history
  • Loading branch information
constwz committed Apr 29, 2024
1 parent 1be658b commit bc748a9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
2 changes: 1 addition & 1 deletion modular/executor/execute_replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (e *ExecuteModular) HandleReplicatePieceTask(ctx context.Context, task core
if task.GetIsAgentUpload() {
expectCheckSums, makeErr := e.makeCheckSumsForAgentUpload(ctx, task.GetObjectInfo(), len(task.GetSecondaryEndpoints()), task.GetStorageParams())
if makeErr != nil {
log.CtxErrorw(ctx, "failed to makeCheckSumsForAgentUpload", "error", err)
log.CtxErrorw(ctx, "failed to makeCheckSumsForAgentUpload", "error", makeErr)
err = makeErr
return
}
Expand Down
24 changes: 18 additions & 6 deletions modular/receiver/receive_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,24 @@ func (r *ReceiveModular) HandleDoneReceivePieceTask(ctx context.Context, task ta
return nil, ErrGfSpDBWithDetail("failed to write integrity meta to db, error: " + err.Error())
}
deletePieceHashTime := time.Now()
if err = r.baseApp.GfSpDB().DeleteAllReplicatePieceChecksumOptimized(
task.GetObjectInfo().Id.Uint64(), task.GetRedundancyIdx()); err != nil {
log.CtxErrorw(ctx, "failed to delete all replicate piece checksum", "task", task, "error", err)
// ignore the error,let the request go, the background task will gc the meta again later
metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time").
Observe(time.Since(deletePieceHashTime).Seconds())
gvg, queryErr := r.baseApp.Consensus().QueryGlobalVirtualGroup(ctx, task.GetGlobalVirtualGroupId())
if queryErr != nil {
log.CtxErrorw(ctx, "failed to QueryGlobalVirtualGroup", "error", queryErr)
return nil, ErrGfSpDBWithDetail("failed to QueryGlobalVirtualGroup, error: " + queryErr.Error())
}
spID, idErr := r.getSPID()
if idErr != nil {
log.CtxErrorw(ctx, "failed to getSPID", "error", idErr)
return nil, ErrGfSpDBWithDetail("failed to getSPID, error: " + idErr.Error())
}
if spID != gvg.PrimarySpId {
if err = r.baseApp.GfSpDB().DeleteAllReplicatePieceChecksumOptimized(
task.GetObjectInfo().Id.Uint64(), task.GetRedundancyIdx()); err != nil {
log.CtxErrorw(ctx, "failed to delete all replicate piece checksum", "task", task, "error", err)
// ignore the error,let the request go, the background task will gc the meta again later
metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time").
Observe(time.Since(deletePieceHashTime).Seconds())
}
}

metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_delete_piece_hash_time").
Expand Down
14 changes: 14 additions & 0 deletions modular/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type ReceiveModular struct {
baseApp *gfspapp.GfSpBaseApp
scope rcmgr.ResourceScope
receiveQueue taskqueue.TQueueOnStrategy

spID uint32
}

func (r *ReceiveModular) Name() string {
Expand Down Expand Up @@ -50,3 +52,15 @@ func (r *ReceiveModular) ReserveResource(ctx context.Context, state *rcmgr.Scope
func (r *ReceiveModular) ReleaseResource(ctx context.Context, span rcmgr.ResourceScopeSpan) {
span.Done()
}

func (e *ReceiveModular) getSPID() (uint32, error) {
if e.spID != 0 {
return e.spID, nil
}
spInfo, err := e.baseApp.Consensus().QuerySP(context.Background(), e.baseApp.OperatorAddress())
if err != nil {
return 0, err
}
e.spID = spInfo.GetId()
return e.spID, nil
}

0 comments on commit bc748a9

Please sign in to comment.