Skip to content

Commit

Permalink
Fix the incorrect port releasing issue for the file syncing API
Browse files Browse the repository at this point in the history
Longhorn 4086

Signed-off-by: Shuo Wu <shuo.wu@suse.com>
  • Loading branch information
shuo-wu authored and innobead committed Jun 9, 2022
1 parent 26bd1e6 commit a08f7f4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 20 deletions.
70 changes: 52 additions & 18 deletions pkg/manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,18 @@ func (m *Manager) Sync(ctx context.Context, req *rpc.SyncRequest) (resp *rpc.Bac
if err != nil {
return nil, err
}
portReleaseChannel := make(chan interface{})
portReleaseChannel := make(chan interface{}, 1)
go func() {
<-portReleaseChannel
log.Infof("Backing Image Manager: start to release port %v after syncing", port)
if !util.DetectHTTPServerAvailability(fmt.Sprintf("http://localhost:%d", port), 180, false) {
log.Errorf("Backing Image Manager: failed to wait for the HTTP server using port %v stopped, cannot release the port", port)
return
}
if err := m.releasePorts(port, port+1); err != nil {
log.WithError(err).Errorf("Backing Image Manager: failed to release port %v after syncing backing image", port)
}

log.Infof("Backing Image Manager: released port %v after syncing", port)
}()

biFilePath := types.GetBackingImageFilePath(m.diskPath, req.Spec.Name, req.Spec.Uuid)
Expand All @@ -320,56 +325,85 @@ func (m *Manager) Sync(ctx context.Context, req *rpc.SyncRequest) (resp *rpc.Bac
}

go func() {
var biResp *rpc.BackingImageResponse

defer func() {
portReleaseChannel <- nil
if err != nil {
portReleaseChannel <- nil
log.WithError(err).Error("Backing Image Manager: failed to request sending the backing image")
return
}

// Wait for 24 hours
if biResp, err = m.waitForEndingFileState(req.Spec.Name, req.Spec.Uuid, 24*60*60); err != nil {
log.WithError(err).Errorf("Backing Image Manager: timeout waiting before releasing port %v", port)
}
portReleaseChannel <- nil
}()

var biFileInfo *api.FileInfo
if biFileInfo, err = m.waitForFileStateNonPending(req.Spec.Name, 300); err != nil {
if biResp, err = m.waitForFileStateNonPending(req.Spec.Name, req.Spec.Uuid, 300); err != nil {
return
}
if biFileInfo.State != string(types.StateStarting) {
log.Infof("Backing Image Manager: there is no need to request backing image since the current state is %v rather than %v", biFileInfo.State, types.StateStarting)
if biResp.Status.State != string(types.StateStarting) {
err = fmt.Errorf("there is no need to request backing image since the current state is %v rather than %v", biResp.Status.State, types.StateStarting)
return
}

receiverIP, err := util.GetIPForPod()
toIP, err := util.GetIPForPod()
if err != nil {
return
}
toAddress := fmt.Sprintf("%s:%d", receiverIP, port)
toAddress := fmt.Sprintf("%s:%d", toIP, port)

// sender.Send is a non-blocking call
sender := client.NewBackingImageManagerClient(req.FromAddress)
if err = sender.Send(req.Spec.Name, req.Spec.Uuid, toAddress); err != nil {
err = errors.Wrapf(err, "sender failed to request backing image sending")
err = errors.Wrapf(err, "sender failed to request backing image sending to %v", toAddress)
return
}

log.Infof("Backing Image Manager: started requesting sending backing image from address %v to address %v", req.FromAddress, toAddress)
}()

log.Info("Backing Image Manager: started receiving backing image")
log.Infof("Backing Image Manager: started receiving backing image at port %v", port)

return m.getAndUpdate(req.Spec.Name, req.Spec.Uuid)
}

func (m *Manager) waitForFileStateNonPending(name string, waitInterval int) (biFileInfo *api.FileInfo, err error) {
func (m *Manager) waitForFileStateNonPending(name, uuid string, waitInterval int) (biResp *rpc.BackingImageResponse, err error) {
endTime := time.Now().Add(time.Duration(waitInterval) * time.Second)

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for time.Now().Before(endTime) {
<-ticker.C
m.lock.RLock()
biFileInfo = m.biFileInfoMap[name]
m.lock.RUnlock()
if biFileInfo != nil && biFileInfo.State != string(types.StatePending) {
return biFileInfo, nil
if biResp, _ = m.getAndUpdate(name, uuid); biResp != nil && biResp.Status.State != string(types.StatePending) {
return biResp, nil
}
}

return nil, fmt.Errorf("failed to wait for backing image %v(%v) becoming state non-pending", name, uuid)
}

func (m *Manager) waitForEndingFileState(name, uuid string, waitInterval int) (biResp *rpc.BackingImageResponse, err error) {
endTime := time.Now().Add(time.Duration(waitInterval) * time.Second)

ticker := time.NewTicker(types.MonitorInterval)
defer ticker.Stop()
for time.Now().Before(endTime) {
<-ticker.C
biResp, err = m.getAndUpdate(name, uuid)
if util.IsGRPCErrorNotFound(err) ||
(biResp != nil && (biResp.Status.State == string(types.StateReady) || biResp.Status.State == string(types.StateFailed))) {
return biResp, nil
}
}
return nil, fmt.Errorf("failed to wait for backing image %v becoming state non-pending", name)

state := "unavailable"
if biResp != nil {
state = biResp.Status.State
}
return nil, fmt.Errorf("failed to wait for backing image %v(%v) becoming an ending state, current state %v", name, uuid, state)
}

func (m *Manager) Send(ctx context.Context, req *rpc.SendRequest) (resp *empty.Empty, err error) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sync/sync_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (sf *SyncingFile) IdleTimeoutCopyToFile(src io.ReadCloser) (copied int64, e
}

func (sf *SyncingFile) Receive(port int, fileType string) (err error) {
sf.log.Infof("SyncingFile: start to launch a receiver")
sf.log.Infof("SyncingFile: start to launch a receiver at port %v", port)

needProcessing, err := sf.stateCheckBeforeProcessing()
if err != nil {
Expand All @@ -539,7 +539,8 @@ func (sf *SyncingFile) Receive(port int, fileType string) (err error) {

// TODO: After merging the sparse tool repo into this sync service, we don't need to launch a separate server here.
// Instead, this SyncingFile is responsible for punching hole, reading/writing data, and computing checksum.
if err = sparserest.Server(sf.ctx, strconv.Itoa(port), sf.tmpFilePath, sf); err != nil && err != http.ErrServerClosed {
if serverErr := sparserest.Server(sf.ctx, strconv.Itoa(port), sf.tmpFilePath, sf); serverErr != nil && serverErr != http.ErrServerClosed {
err = serverErr
return err
}

Expand Down

0 comments on commit a08f7f4

Please sign in to comment.