Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/bus/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ const (
ConnectionResetTopic = "connection-reset"
ConfigApplyRequestTopic = "config-apply-request"
WriteConfigSuccessfulTopic = "write-config-successful"
ConfigApplySuccessfulTopic = "config-apply-successful"
ReloadSuccessfulTopic = "reload-successful"
EnableWatchersTopic = "enable-watchers"
ConfigApplyFailedTopic = "config-apply-failed"
ConfigApplyCompleteTopic = "config-apply-complete"
RollbackWriteTopic = "rollback-write"
Expand Down
185 changes: 119 additions & 66 deletions internal/file/file_manager_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type (
fileToUpdate *mpi.File,
) error
SetIsConnected(isConnected bool)
MoveFilesFromTempDirectory(ctx context.Context, fileAction *model.FileCache, tempDir string) error
RenameFile(ctx context.Context, hash, fileName, tempDir string) error
UpdateClient(ctx context.Context, fileServiceClient mpi.FileServiceClient)
}

Expand All @@ -80,14 +80,15 @@ type (
err error)
Rollback(ctx context.Context, instanceID string) error
ClearCache()
SetConfigPath(configPath string)
ConfigUpload(ctx context.Context, configUploadRequest *mpi.ConfigUploadRequest) error
ConfigUpdate(ctx context.Context, nginxConfigContext *model.NginxConfigContext)
UpdateCurrentFilesOnDisk(ctx context.Context, updateFiles map[string]*mpi.File, referenced bool) error
DetermineFileActions(
ctx context.Context,
currentFiles map[string]*mpi.File,
modifiedFiles map[string]*model.FileCache,
) (map[string]*model.FileCache, map[string][]byte, error)
) (map[string]*model.FileCache, error)
IsConnected() bool
SetIsConnected(isConnected bool)
ResetClient(ctx context.Context, fileServiceClient mpi.FileServiceClient)
Expand All @@ -101,12 +102,13 @@ type FileManagerService struct {
fileServiceOperator fileServiceOperatorInterface
// map of files and the actions performed on them during config apply
fileActions map[string]*model.FileCache // key is file path
// map of the contents of files which have been updated or deleted during config apply, used during rollback
rollbackFileContents map[string][]byte // key is file path
// map of the files currently on disk, used to determine the file action during config apply
currentFilesOnDisk map[string]*mpi.File // key is file path
previousManifestFiles map[string]*model.ManifestFile
manifestFilePath string
tempConfigDir string
tempRollbackDir string
configPath string
rollbackManifest bool
filesMutex sync.RWMutex
}
Expand All @@ -119,15 +121,19 @@ func NewFileManagerService(fileServiceClient mpi.FileServiceClient, agentConfig
fileOperator: NewFileOperator(manifestLock),
fileServiceOperator: NewFileServiceOperator(agentConfig, fileServiceClient, manifestLock),
fileActions: make(map[string]*model.FileCache),
rollbackFileContents: make(map[string][]byte),
currentFilesOnDisk: make(map[string]*mpi.File),
previousManifestFiles: make(map[string]*model.ManifestFile),
rollbackManifest: true,
manifestFilePath: agentConfig.LibDir + "/manifest.json",
configPath: "/etc/nginx/",
manifestLock: manifestLock,
}
}

func (fms *FileManagerService) SetConfigPath(configPath string) {
fms.configPath = filepath.Dir(configPath)
}

func (fms *FileManagerService) ResetClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) {
fms.fileServiceOperator.UpdateClient(ctx, fileServiceClient)
slog.DebugContext(ctx, "File manager service reset client successfully")
Expand All @@ -144,6 +150,9 @@ func (fms *FileManagerService) SetIsConnected(isConnected bool) {
func (fms *FileManagerService) ConfigApply(ctx context.Context,
configApplyRequest *mpi.ConfigApplyRequest,
) (status model.WriteStatus, err error) {
var configTempErr error
var rollbackTempErr error

fms.rollbackManifest = true
fileOverview := configApplyRequest.GetOverview()

Expand All @@ -156,7 +165,7 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context,
return model.Error, allowedErr
}

diffFiles, fileContent, compareErr := fms.DetermineFileActions(
diffFiles, compareErr := fms.DetermineFileActions(
ctx,
fms.currentFilesOnDisk,
ConvertToMapOfFileCache(fileOverview.GetFiles()),
Expand All @@ -170,15 +179,24 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context,
return model.NoChange, nil
}

fms.rollbackFileContents = fileContent
fms.fileActions = diffFiles

tempDir, tempDirError := fms.createTempConfigDirectory(ctx)
if tempDirError != nil {
return model.Error, tempDirError
fms.tempConfigDir, configTempErr = fms.createTempConfigDirectory("config")
if configTempErr != nil {
return model.Error, configTempErr
}

fms.tempRollbackDir, rollbackTempErr = fms.createTempConfigDirectory("rollback")
if rollbackTempErr != nil {
return model.Error, rollbackTempErr
}

rollbackTempFilesErr := fms.backupFiles(ctx)
if rollbackTempFilesErr != nil {
return model.Error, rollbackTempFilesErr
}

fileErr := fms.executeFileActions(ctx, tempDir)
fileErr := fms.executeFileActions(ctx)
if fileErr != nil {
fms.rollbackManifest = false
return model.RollbackRequired, fileErr
Expand All @@ -194,12 +212,22 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context,
}

func (fms *FileManagerService) ClearCache() {
clear(fms.rollbackFileContents)
slog.Debug("Clearing cache and temp files after config apply")
clear(fms.fileActions)
clear(fms.previousManifestFiles)

configErr := os.RemoveAll(fms.tempConfigDir)
if configErr != nil {
slog.Error("Error removing temp config directory", "path", fms.tempConfigDir, "err", configErr)
}

rollbackErr := os.RemoveAll(fms.tempRollbackDir)
if rollbackErr != nil {
slog.Error("Error removing temp rollback directory", "path", fms.tempRollbackDir, "err", rollbackErr)
}
}

//nolint:revive // cognitive-complexity of 13 max is 12, loop is needed cant be broken up
//nolint:revive,cyclop // cognitive-complexity of 13 max is 12, loop is needed cant be broken up
func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) error {
slog.InfoContext(ctx, "Rolling back config for instance", "instance_id", instanceID)

Expand All @@ -217,16 +245,14 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string)

continue
case model.Delete, model.Update:
fileMeta := fileAction.File.GetFileMeta()
content := fms.rollbackFileContents[fileMeta.GetName()]
err := fms.fileOperator.Write(ctx, content, fileMeta.GetName(), fileMeta.GetPermissions())
content, err := fms.restoreFiles(fileAction)
if err != nil {
return err
}

// currentFilesOnDisk needs to be updated after rollback action is performed
fileMeta.Hash = files.GenerateHash(content)
fms.currentFilesOnDisk[fileMeta.GetName()] = fileAction.File
fileAction.File.FileMeta.Hash = files.GenerateHash(content)
fms.currentFilesOnDisk[fileAction.File.GetFileMeta().GetName()] = fileAction.File
case model.Unchanged:
fallthrough
default:
Expand Down Expand Up @@ -308,20 +334,18 @@ func (fms *FileManagerService) DetermineFileActions(
modifiedFiles map[string]*model.FileCache,
) (
map[string]*model.FileCache,
map[string][]byte,
error,
) {
fms.filesMutex.Lock()
defer fms.filesMutex.Unlock()

fileDiff := make(map[string]*model.FileCache) // Files that have changed, key is file name
fileContents := make(map[string][]byte) // contents of the file, key is file name

_, filesMap, manifestFileErr := fms.manifestFile()

if manifestFileErr != nil {
if !errors.Is(manifestFileErr, os.ErrNotExist) {
return nil, nil, manifestFileErr
return nil, manifestFileErr
}
filesMap = currentFiles
}
Expand All @@ -331,25 +355,16 @@ func (fms *FileManagerService) DetermineFileActions(
for fileName, manifestFile := range filesMap {
_, exists := modifiedFiles[fileName]

if !exists {
// Read file contents before marking it deleted
fileContent, readErr := os.ReadFile(fileName)
if readErr != nil {
if errors.Is(readErr, os.ErrNotExist) {
slog.DebugContext(ctx, "Unable to backup file contents since file does not exist", "file", fileName)
continue
}

return nil, nil, fmt.Errorf("error reading file %s: %w", fileName, readErr)
}
fileContents[fileName] = fileContent
if !fms.agentConfig.IsDirectoryAllowed(fileName) {
return nil, fmt.Errorf("error deleting file %s: file not in allowed directories", fileName)
}

// Allowed directories could have been modified since file was created,
// so we should check before marking for deletion
if !fms.agentConfig.IsDirectoryAllowed(fileName) {
return nil, nil, fmt.Errorf("error deleting file %s: file not in allowed directories", fileName)
}
if _, err := os.Stat(fileName); os.IsNotExist(err) {
slog.DebugContext(ctx, "File already deleted, skipping", "file", fileName)
continue
}

if !exists {
fileDiff[fileName] = &model.FileCache{
File: manifestFile,
Action: model.Delete,
Expand All @@ -359,7 +374,7 @@ func (fms *FileManagerService) DetermineFileActions(

for _, modifiedFile := range modifiedFiles {
fileName := modifiedFile.File.GetFileMeta().GetName()
currentFile, ok := filesMap[modifiedFile.File.GetFileMeta().GetName()]
currentFile, ok := filesMap[fileName]
// default to unchanged action
modifiedFile.Action = model.Unchanged

Expand All @@ -369,25 +384,20 @@ func (fms *FileManagerService) DetermineFileActions(
}
// if file doesn't exist in the current files, file has been added
// set file action
if _, statErr := os.Stat(modifiedFile.File.GetFileMeta().GetName()); errors.Is(statErr, os.ErrNotExist) {
if _, statErr := os.Stat(fileName); errors.Is(statErr, os.ErrNotExist) {
modifiedFile.Action = model.Add
fileDiff[modifiedFile.File.GetFileMeta().GetName()] = modifiedFile
fileDiff[fileName] = modifiedFile

continue
// if file currently exists and file hash is different, file has been updated
// copy contents, set file action
} else if ok && modifiedFile.File.GetFileMeta().GetHash() != currentFile.GetFileMeta().GetHash() {
fileContent, readErr := os.ReadFile(fileName)
if readErr != nil {
return nil, nil, fmt.Errorf("error reading file %s, error: %w", fileName, readErr)
}
modifiedFile.Action = model.Update
fileContents[fileName] = fileContent
fileDiff[modifiedFile.File.GetFileMeta().GetName()] = modifiedFile
fileDiff[fileName] = modifiedFile
}
}

return fileDiff, fileContents, nil
return fileDiff, nil
}

// UpdateCurrentFilesOnDisk updates the FileManagerService currentFilesOnDisk slice which contains the files
Expand Down Expand Up @@ -463,6 +473,59 @@ func (fms *FileManagerService) UpdateManifestFile(ctx context.Context,
return fms.fileOperator.WriteManifestFile(ctx, updatedFiles, fms.agentConfig.LibDir, fms.manifestFilePath)
}

func (fms *FileManagerService) backupFiles(ctx context.Context) error {
for _, file := range fms.fileActions {
if file.Action == model.Add || file.Action == model.Unchanged {
continue
}

filePath := file.File.GetFileMeta().GetName()

if _, err := os.Stat(filePath); os.IsNotExist(err) {
slog.DebugContext(ctx, "Unable to backup file content since file does not exist",
"file", filePath)

continue
}

tempFilePath := filepath.Join(fms.tempRollbackDir, filePath)
slog.DebugContext(ctx, "Attempting to backup file content since file exists", "temp_path", tempFilePath)

moveErr := fms.fileOperator.MoveFile(ctx, filePath, tempFilePath)

if moveErr != nil {
return moveErr
}
}

return nil
}

func (fms *FileManagerService) restoreFiles(fileAction *model.FileCache) ([]byte, error) {
fileMeta := fileAction.File.GetFileMeta()
fileName := fileMeta.GetName()

tempFilePath := filepath.Join(fms.tempRollbackDir, fileName)

// Create parent directories for the target file if they don't exist
if err := os.MkdirAll(filepath.Dir(fileName), dirPerm); err != nil {
return nil, fmt.Errorf("failed to create directories for %s: %w", fileName, err)
}

moveErr := os.Rename(tempFilePath, fileName)
if moveErr != nil {
return nil, fmt.Errorf("failed to rename file, %s to %s: %w", tempFilePath, fileName, moveErr)
}

content, readErr := os.ReadFile(fileMeta.GetName())
if readErr != nil {
return nil, fmt.Errorf("error reading file, unable to generate hash: %s error: %w",
fileMeta.GetName(), readErr)
}

return content, nil
}

func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, map[string]*mpi.File, error) {
if _, err := os.Stat(fms.manifestFilePath); err != nil {
return nil, nil, err
Expand Down Expand Up @@ -491,17 +554,17 @@ func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, m
return manifestFiles, fileMap, nil
}

func (fms *FileManagerService) executeFileActions(ctx context.Context, tempDir string) (actionError error) {
func (fms *FileManagerService) executeFileActions(ctx context.Context) (actionError error) {
// Download files to temporary location
downloadError := fms.downloadUpdatedFilesToTempLocation(ctx, tempDir)
downloadError := fms.downloadUpdatedFilesToTempLocation(ctx, fms.tempConfigDir)
if downloadError != nil {
return downloadError
}

// Remove temp files if there is a failure moving or deleting files
actionError = fms.moveOrDeleteFiles(ctx, tempDir, actionError)
actionError = fms.moveOrDeleteFiles(ctx, fms.tempConfigDir, actionError)
if actionError != nil {
fms.deleteTempFiles(ctx, tempDir)
fms.deleteTempFiles(ctx, fms.tempConfigDir)
}

return actionError
Expand All @@ -528,11 +591,6 @@ func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(
}
}

// Remove temp files if there is an error downloading any files
if updateError != nil {
fms.deleteTempFiles(ctx, tempDir)
}

return updateError
}

Expand All @@ -551,7 +609,8 @@ actionsLoop:

continue
case model.Add, model.Update:
err := fms.fileServiceOperator.MoveFilesFromTempDirectory(ctx, fileAction, tempDir)
fileMeta := fileAction.File.GetFileMeta()
err := fms.fileServiceOperator.RenameFile(ctx, fileMeta.GetHash(), fileMeta.GetName(), tempDir)
if err != nil {
actionError = err

Expand Down Expand Up @@ -649,17 +708,11 @@ func (fms *FileManagerService) convertToFile(manifestFile *model.ManifestFile) *
}
}

func (fms *FileManagerService) createTempConfigDirectory(ctx context.Context) (string, error) {
tempDir, tempDirError := os.MkdirTemp(fms.agentConfig.LibDir, "config")
func (fms *FileManagerService) createTempConfigDirectory(pattern string) (string, error) {
tempDir, tempDirError := os.MkdirTemp(fms.configPath, pattern)
if tempDirError != nil {
return "", fmt.Errorf("failed creating temp config directory: %w", tempDirError)
}
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
slog.ErrorContext(ctx, "error removing temp config directory", "path", path, "err", err)
}
}(tempDir)

return tempDir, nil
}
Expand Down
Loading
Loading