Skip to content

Commit

Permalink
fix: delegate upload content type
Browse files Browse the repository at this point in the history
  • Loading branch information
constwz committed Apr 23, 2024
1 parent 242b252 commit cfe6d3f
Show file tree
Hide file tree
Showing 7 changed files with 6 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker-develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Docker-CI

on:
push:
branches: [ develop, master,fix-v1.6.0]
branches: [ develop, master]

env:
IMAGE_NAME: ghcr.io/bnb-chain/greenfield-storage-provider-invisible
Expand Down
3 changes: 0 additions & 3 deletions base/gfspclient/gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,13 @@ func (s *GfSpClient) ReplicatePieceToSecondary(ctx context.Context, endpoint str
}

receiveTask := receive.(*gfsptask.GfSpReceivePieceTask)
log.CtxInfow(ctx, "gateway debug info", "receiveTask", receiveTask)
log.Debugw("Debug Info", "object_id", receive.GetObjectInfo().Id, "IsAgentUpload", receive.GetIsAgentUploadTask())
receiveMsg, err := json.Marshal(receiveTask)
if err != nil {
log.CtxErrorw(ctx, "failed to replicate piece to secondary sp due to marshal error", "error", err)
return err
}
receiveHeader := hex.EncodeToString(receiveMsg)
req.Header.Add(GnfdReceiveMsgHeader, receiveHeader)
log.Debugw("Debug Info", "object_id", receive.GetObjectInfo().Id, "header", receiveHeader)
resp, err := s.HTTPClient(ctx).Do(req)
if err != nil {
return err
Expand Down
5 changes: 0 additions & 5 deletions modular/executor/execute_replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func (e *ExecuteModular) HandleReplicatePieceTask(ctx context.Context, task core
objectInfo *storagetypes.ObjectInfo
)
startReplicateTime := time.Now()
log.Debugw("Debug Info", "object_id", task.GetObjectInfo().Id, "IsAgentUpload", task.GetIsAgentUpload())
defer func() {
task.SetError(err)
metrics.PerfPutObjectTime.WithLabelValues("background_replicate_cost").Observe(time.Since(startReplicateTime).Seconds())
Expand Down Expand Up @@ -129,7 +128,6 @@ func (e *ExecuteModular) handleReplicatePiece(ctx context.Context, rTask coretas
)

log.Debugw("replicate task info", "task_sps", rTask.GetSecondaryEndpoints())
log.Debugw("Debug Info", "object_id", rTask.GetObjectInfo().Id, "IsAgentUpload", rTask.GetIsAgentUpload())

doReplicateECPiece := func(ctx context.Context, segIdx uint32, data [][]byte, errChan chan error) {
log.Debug("start to replicate ec piece")
Expand Down Expand Up @@ -274,7 +272,6 @@ func (e *ExecuteModular) doReplicatePiece(ctx context.Context, waitGroup *sync.W
log.CtxErrorw(ctx, "ReplicatePieceTask object info is empty")
return ErrInvalidReplicatePieceTask
}
log.Debugw("Debug Info", "object_id", rTask.GetObjectInfo().Id, "IsAgentUpload", rTask.GetIsAgentUpload())
rTask.AppendLog(fmt.Sprintf("executor-begin-replicate-piece-sIdx:%d-rIdx-%d", segmentIdx, redundancyIdx))
startTime := time.Now()
defer func() {
Expand Down Expand Up @@ -320,12 +317,10 @@ func (e *ExecuteModular) doReplicatePiece(ctx context.Context, waitGroup *sync.W
}
receive.SetSignature(signature)
replicateOnePieceTime := time.Now()
log.Debugw("Debug Info", "object_id", receive.GetObjectInfo().Id, "IsAgentUpload", receive.GetIsAgentUploadTask())
if err = retry.Do(func() error {
// timeout for single piece replication
ctxWithTimeout, cancel := context.WithTimeout(ctx, replicateTimeOut)
defer cancel()
log.Debugw("Debug Info", "object_id", receive.GetObjectInfo().Id, "IsAgentUpload", receive.GetIsAgentUploadTask())
return e.baseApp.GfSpClient().ReplicatePieceToSecondary(ctxWithTimeout, spEndpoint, receive, data)
}, RtyAttem,
RtyDelay,
Expand Down
6 changes: 5 additions & 1 deletion modular/gater/object_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,10 @@ func (g *GateModular) delegateResumablePutObjectHandler(w http.ResponseWriter, r
return
}
fingerprint = commonhash.GenerateChecksum(approvalMsg)
contentType = r.Header.Get(ContentTypeHeader)
if contentType == "" {
contentType = ContentDefault
}
startTime := time.Now()

if isUpdate {
Expand Down Expand Up @@ -1573,7 +1577,7 @@ func (g *GateModular) delegateCreateFolderHandler(w http.ResponseWriter, r *http
err = ErrNoPermission
return
}
contentType = reqCtx.vars["content_type"]
contentType = r.Header.Get(ContentTypeHeader)
if contentType == "" {
contentType = ContentDefault
}
Expand Down
3 changes: 0 additions & 3 deletions modular/manager/manage_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func (m *ManageModular) pickGVGAndReplicate(ctx context.Context, vgfID uint32, t
replicateTask.SetLogs(task.GetLogs())
replicateTask.SetRetry(task.GetRetry())
replicateTask.AppendLog("manager-create-replicate-task")
log.Debugw("Debug Info", "object_id", replicateTask.GetObjectInfo().Id, "IsAgentUpload", replicateTask.GetIsAgentUploadTask())
err = m.replicateQueue.Push(replicateTask)
if err != nil {
log.CtxErrorw(ctx, "failed to push replicate piece task to queue", "error", err)
Expand Down Expand Up @@ -268,7 +267,6 @@ func (m *ManageModular) HandleDoneResumableUploadObjectTask(ctx context.Context,
replicateTask.GlobalVirtualGroupId = gvgMeta.ID
replicateTask.SecondaryEndpoints = gvgMeta.SecondarySPEndpoints
log.Debugw("replicate task info", "task", replicateTask, "gvg_meta", gvgMeta)
log.Debugw("Debug Info", "object_id", replicateTask.GetObjectInfo().Id, "IsAgentUpload", replicateTask.GetIsAgentUploadTask())
err = m.replicateQueue.Push(replicateTask)
if err != nil {
log.CtxErrorw(ctx, "failed to push replicate piece task to queue", "error", err)
Expand Down Expand Up @@ -451,7 +449,6 @@ func (m *ManageModular) handleFailedReplicatePieceTask(ctx context.Context, hand
"excludedGVGs", shouldFreezeGVGs, "error", rePickAndReplicateErr)
return rePickAndReplicateErr
} else {
log.Debugw("Debug Info", "object_id", handleTask.GetObjectInfo().Id, "IsAgentUpload", handleTask.GetIsAgentUpload())
pushErr := m.replicateQueue.Push(handleTask)
log.CtxDebugw(ctx, "push task again to retry", "task_info", handleTask.Info(), "error", pushErr)
return pushErr
Expand Down
2 changes: 0 additions & 2 deletions modular/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ func (m *ManageModular) LoadTaskFromDB() error {
replicateTask.GlobalVirtualGroupId = meta.GlobalVirtualGroupID
replicateTask.SecondaryEndpoints = meta.SecondaryEndpoints
}
log.Debugw("Debug Info", "object_id", replicateTask.GetObjectInfo().Id, "IsAgentUpload", replicateTask.GetIsAgentUpload())
pushErr := m.replicateQueue.Push(replicateTask)
if pushErr != nil {
log.Errorw("failed to push replicate piece task to queue", "object_info", objectInfo, "error", pushErr)
Expand Down Expand Up @@ -999,7 +998,6 @@ func (m *ManageModular) backUpTask() {
func (m *ManageModular) repushTask(reserved task.Task) {
switch t := reserved.(type) {
case *gfsptask.GfSpReplicatePieceTask:
log.Debugw("Debug Info", "object_id", t.GetObjectInfo().Id, "IsAgentUpload", t.GetIsAgentUpload())
err := m.replicateQueue.Push(t)
log.Infow("retry push replicate task to queue after dispatching", "error", err)
case *gfsptask.GfSpSealObjectTask:
Expand Down
1 change: 0 additions & 1 deletion modular/manager/task_retry_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func (s *TaskRetryScheduler) retryReplicateTask(meta *spdb.UploadObjectMeta) err
replicateTask.GlobalVirtualGroupId = meta.GlobalVirtualGroupID
replicateTask.SecondaryEndpoints = meta.SecondaryEndpoints
}
log.Debugw("Debug Info", "object_id", replicateTask.GetObjectInfo().Id, "IsAgentUpload", replicateTask.GetIsAgentUpload())
err = s.manager.replicateQueue.Push(replicateTask)
if err != nil {
if errors.Is(err, gfsptqueue.ErrTaskQueueExceed) {
Expand Down

0 comments on commit cfe6d3f

Please sign in to comment.