diff --git a/pkg/manager/service.go b/pkg/manager/service.go index 23fcaa92..6f51c87f 100644 --- a/pkg/manager/service.go +++ b/pkg/manager/service.go @@ -304,13 +304,18 @@ func (m *Manager) Sync(ctx context.Context, req *rpc.SyncRequest) (resp *rpc.Bac if err != nil { return nil, err } - portReleaseChannel := make(chan interface{}) + portReleaseChannel := make(chan interface{}, 1) go func() { <-portReleaseChannel + log.Infof("Backing Image Manager: start to release port %v after syncing", port) + if !util.DetectHTTPServerAvailability(fmt.Sprintf("http://localhost:%d", port), 180, false) { + log.Errorf("Backing Image Manager: failed to wait for the HTTP server using port %v stopped, cannot release the port", port) + return + } if err := m.releasePorts(port, port+1); err != nil { log.WithError(err).Errorf("Backing Image Manager: failed to release port %v after syncing backing image", port) } - + log.Infof("Backing Image Manager: released port %v after syncing", port) }() biFilePath := types.GetBackingImageFilePath(m.diskPath, req.Spec.Name, req.Spec.Uuid) @@ -320,56 +325,85 @@ func (m *Manager) Sync(ctx context.Context, req *rpc.SyncRequest) (resp *rpc.Bac } go func() { + var biResp *rpc.BackingImageResponse + defer func() { - portReleaseChannel <- nil if err != nil { + portReleaseChannel <- nil log.WithError(err).Error("Backing Image Manager: failed to request sending the backing image") + return } + + // Wait for 24 hours + if biResp, err = m.waitForEndingFileState(req.Spec.Name, req.Spec.Uuid, 24*60*60); err != nil { + log.WithError(err).Errorf("Backing Image Manager: timeout waiting before releasing port %v", port) + } + portReleaseChannel <- nil }() - var biFileInfo *api.FileInfo - if biFileInfo, err = m.waitForFileStateNonPending(req.Spec.Name, 300); err != nil { + if biResp, err = m.waitForFileStateNonPending(req.Spec.Name, req.Spec.Uuid, 300); err != nil { return } - if biFileInfo.State != string(types.StateStarting) { - log.Infof("Backing Image Manager: there is no need to request backing image since the current state is %v rather than %v", biFileInfo.State, types.StateStarting) + if biResp.Status.State != string(types.StateStarting) { + err = fmt.Errorf("there is no need to request backing image since the current state is %v rather than %v", biResp.Status.State, types.StateStarting) return } - receiverIP, err := util.GetIPForPod() + toIP, err := util.GetIPForPod() if err != nil { return } - toAddress := fmt.Sprintf("%s:%d", receiverIP, port) + toAddress := fmt.Sprintf("%s:%d", toIP, port) + // sender.Send is a non-blocking call sender := client.NewBackingImageManagerClient(req.FromAddress) if err = sender.Send(req.Spec.Name, req.Spec.Uuid, toAddress); err != nil { - err = errors.Wrapf(err, "sender failed to request backing image sending") + err = errors.Wrapf(err, "sender failed to request backing image sending to %v", toAddress) return } log.Infof("Backing Image Manager: started requesting sending backing image from address %v to address %v", req.FromAddress, toAddress) }() - log.Info("Backing Image Manager: started receiving backing image") + log.Infof("Backing Image Manager: started receiving backing image at port %v", port) return m.getAndUpdate(req.Spec.Name, req.Spec.Uuid) } -func (m *Manager) waitForFileStateNonPending(name string, waitInterval int) (biFileInfo *api.FileInfo, err error) { +func (m *Manager) waitForFileStateNonPending(name, uuid string, waitInterval int) (biResp *rpc.BackingImageResponse, err error) { endTime := time.Now().Add(time.Duration(waitInterval) * time.Second) + ticker := time.NewTicker(time.Second) defer ticker.Stop() for time.Now().Before(endTime) { <-ticker.C - m.lock.RLock() - biFileInfo = m.biFileInfoMap[name] - m.lock.RUnlock() - if biFileInfo != nil && biFileInfo.State != string(types.StatePending) { - return biFileInfo, nil + if biResp, _ = m.getAndUpdate(name, uuid); biResp != nil && biResp.Status.State != string(types.StatePending) { + return biResp, nil + } + } + + return nil, fmt.Errorf("failed to wait for backing image %v(%v) becoming state non-pending", name, uuid) +} + +func (m *Manager) waitForEndingFileState(name, uuid string, waitInterval int) (biResp *rpc.BackingImageResponse, err error) { + endTime := time.Now().Add(time.Duration(waitInterval) * time.Second) + + ticker := time.NewTicker(types.MonitorInterval) + defer ticker.Stop() + for time.Now().Before(endTime) { + <-ticker.C + biResp, err = m.getAndUpdate(name, uuid) + if util.IsGRPCErrorNotFound(err) || + (biResp != nil && (biResp.Status.State == string(types.StateReady) || biResp.Status.State == string(types.StateFailed))) { + return biResp, nil } } - return nil, fmt.Errorf("failed to wait for backing image %v becoming state non-pending", name) + + state := "unavailable" + if biResp != nil { + state = biResp.Status.State + } + return nil, fmt.Errorf("failed to wait for backing image %v(%v) becoming an ending state, current state %v", name, uuid, state) } func (m *Manager) Send(ctx context.Context, req *rpc.SendRequest) (resp *empty.Empty, err error) { diff --git a/pkg/sync/sync_file.go b/pkg/sync/sync_file.go index 4a7434e6..e5b29971 100644 --- a/pkg/sync/sync_file.go +++ b/pkg/sync/sync_file.go @@ -523,7 +523,7 @@ func (sf *SyncingFile) IdleTimeoutCopyToFile(src io.ReadCloser) (copied int64, e } func (sf *SyncingFile) Receive(port int, fileType string) (err error) { - sf.log.Infof("SyncingFile: start to launch a receiver") + sf.log.Infof("SyncingFile: start to launch a receiver at port %v", port) needProcessing, err := sf.stateCheckBeforeProcessing() if err != nil { @@ -539,7 +539,8 @@ func (sf *SyncingFile) Receive(port int, fileType string) (err error) { // TODO: After merging the sparse tool repo into this sync service, we don't need to launch a separate server here. // Instead, this SyncingFile is responsible for punching hole, reading/writing data, and computing checksum. - if err = sparserest.Server(sf.ctx, strconv.Itoa(port), sf.tmpFilePath, sf); err != nil && err != http.ErrServerClosed { + if serverErr := sparserest.Server(sf.ctx, strconv.Itoa(port), sf.tmpFilePath, sf); serverErr != nil && serverErr != http.ErrServerClosed { + err = serverErr return err }