Skip to content

Commit

Permalink
datasource: store processing data to a temporary file before it becom…
Browse files Browse the repository at this point in the history
…ing ready

Signed-off-by: Shuo Wu <shuo.wu@suse.com>
  • Loading branch information
shuo-wu authored and innobead committed Aug 20, 2021
1 parent e392670 commit 3e25f70
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
38 changes: 24 additions & 14 deletions pkg/datasource/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Service struct {

fileName string
filePath string
tmpFilePath string
state types.State
size int64
progress int
Expand Down Expand Up @@ -80,9 +81,10 @@ func LaunchService(ctx context.Context,
parameters: parameters,
expectedChecksum: checksum,

fileName: fileName,
filePath: filepath.Join(workDir, fileName),
state: types.StateStarting,
fileName: fileName,
filePath: filepath.Join(workDir, fileName),
tmpFilePath: filepath.Join(workDir, fileName+types.TmpFileSuffix),
state: types.StateStarting,

downloader: downloader,
}
Expand All @@ -102,6 +104,9 @@ func (s *Service) init() error {
if err := os.RemoveAll(s.filePath); err != nil {
return err
}
if err := os.RemoveAll(s.tmpFilePath); err != nil {
return err
}
switch s.sourceType {
case types.DataSourceTypeDownload:
return s.downloadFromURL(s.parameters)
Expand All @@ -127,7 +132,7 @@ func (s *Service) checkAndReuseBackingImageFile() error {
}
checksum, err := util.GetFileChecksum(s.filePath)
if err != nil {
return errors.Wrapf(err, "failed to calculate checksum for the tmp file after processing")
return errors.Wrapf(err, "failed to calculate checksum for the existing file after processing")
}
s.currentChecksum = checksum
if s.expectedChecksum != "" && s.expectedChecksum != s.currentChecksum {
Expand Down Expand Up @@ -181,10 +186,11 @@ func (s *Service) finishProcessing(err error) {
s.message = err.Error()
s.lock.Unlock()
}
os.RemoveAll(s.tmpFilePath)
}()

if err != nil {
err = errors.Wrapf(err, "failed to finish file %v processing", s.filePath)
err = errors.Wrapf(err, "failed to finish file %v and %v processing", s.tmpFilePath, s.filePath)
return
}

Expand All @@ -193,22 +199,26 @@ func (s *Service) finishProcessing(err error) {
return
}

stat, statErr := os.Stat(s.filePath)
stat, statErr := os.Stat(s.tmpFilePath)
if statErr != nil {
err = errors.Wrapf(err, "failed to stat file %v after getting the file from source", s.filePath)
err = errors.Wrapf(err, "failed to stat file %v after getting the file from source", s.tmpFilePath)
return
}

checksum, cksumErr := util.GetFileChecksum(s.filePath)
checksum, cksumErr := util.GetFileChecksum(s.tmpFilePath)
if cksumErr != nil {
err = errors.Wrapf(cksumErr, "failed to calculate checksum for file %v getting the file from source", s.filePath)
err = errors.Wrapf(cksumErr, "failed to calculate checksum for file %v getting the file from source", s.tmpFilePath)
return
}
s.currentChecksum = checksum
if s.expectedChecksum != "" && s.expectedChecksum != s.currentChecksum {
err = fmt.Errorf("the expected checksum %v doesn't match the the file actual checksum %v", s.expectedChecksum, s.currentChecksum)
return
}
if err := os.Rename(s.tmpFilePath, s.filePath); err != nil {
err = fmt.Errorf("failed to rename the tmp file %v to %v at the end of processing", s.tmpFilePath, s.filePath)
return
}

s.lock.Lock()
s.size = stat.Size()
Expand Down Expand Up @@ -237,7 +247,7 @@ func (s *Service) downloadFromURL(parameters map[string]string) error {
}

go func() {
_, err := s.downloader.DownloadFile(s.ctx, url, s.filePath, s)
_, err := s.downloader.DownloadFile(s.ctx, url, s.tmpFilePath, s)
s.finishProcessing(err)
}()

Expand Down Expand Up @@ -292,7 +302,7 @@ func (s *Service) exportFromVolume(parameters map[string]string) error {
s.finishProcessing(syncErr)
}()

if err := sparserest.Server(ctx, strconv.Itoa(types.DefaultVolumeExportReceiverPort), s.filePath, s); err != nil && err != http.ErrServerClosed {
if err := sparserest.Server(ctx, strconv.Itoa(types.DefaultVolumeExportReceiverPort), s.tmpFilePath, s); err != nil && err != http.ErrServerClosed {
syncErr = err
return
}
Expand All @@ -305,7 +315,7 @@ func (s *Service) exportFromVolume(parameters map[string]string) error {

// The file size will change after conversion.
if qcow2ConversionRequired {
if syncErr = util.ConvertFromRawToQcow2(s.filePath); syncErr != nil {
if syncErr = util.ConvertFromRawToQcow2(s.tmpFilePath); syncErr != nil {
return
}
}
Expand Down Expand Up @@ -405,10 +415,10 @@ func (s *Service) doUpload(request *http.Request) (err error) {
return fmt.Errorf("cannot get the uploaded data since the upload request doesn't contain form 'chunk'")
}

if err := os.Remove(s.filePath); err != nil && !os.IsNotExist(err) {
if err := os.Remove(s.tmpFilePath); err != nil && !os.IsNotExist(err) {
return err
}
f, err := os.OpenFile(s.filePath, os.O_RDWR|os.O_CREATE, 0666)
f, err := os.OpenFile(s.tmpFilePath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ const (

SendingLimit = 3

BackingImageTmpFileName = "backing.tmp"
BackingImageFileName = "backing"
TmpFileSuffix = ".tmp"
BackingImageTmpFileName = BackingImageFileName + TmpFileSuffix
)

type State string
Expand Down

0 comments on commit 3e25f70

Please sign in to comment.