From 8fc60c60eb95cc9fca6c87a948836a99bcac606e Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 26 Sep 2025 13:37:35 +0100 Subject: [PATCH 1/7] Update how watchers are enabled after config apply --- internal/bus/topics.go | 3 +- internal/file/file_plugin.go | 44 +++++--- internal/file/file_plugin_test.go | 22 ++-- internal/model/config.go | 7 +- internal/resource/resource_plugin.go | 4 +- internal/resource/resource_plugin_test.go | 2 +- internal/watcher/watcher_plugin.go | 97 ++++++----------- internal/watcher/watcher_plugin_test.go | 126 +++++++++++----------- 8 files changed, 155 insertions(+), 150 deletions(-) diff --git a/internal/bus/topics.go b/internal/bus/topics.go index 7ba30ac49..5a778b27a 100644 --- a/internal/bus/topics.go +++ b/internal/bus/topics.go @@ -19,7 +19,8 @@ const ( ConnectionResetTopic = "connection-reset" ConfigApplyRequestTopic = "config-apply-request" WriteConfigSuccessfulTopic = "write-config-successful" - ConfigApplySuccessfulTopic = "config-apply-successful" + ReloadSuccessfulTopic = "reload-successful" + EnableWatchersTopic = "enable-watchers" ConfigApplyFailedTopic = "config-apply-failed" ConfigApplyCompleteTopic = "config-apply-complete" RollbackWriteTopic = "rollback-write" diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go index 39c0bdcd5..a5c79d4ef 100644 --- a/internal/file/file_plugin.go +++ b/internal/file/file_plugin.go @@ -106,8 +106,8 @@ func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) { fp.handleConfigApplyRequest(ctxWithMetadata, msg) case bus.ConfigApplyCompleteTopic: fp.handleConfigApplyComplete(ctxWithMetadata, msg) - case bus.ConfigApplySuccessfulTopic: - fp.handleConfigApplySuccess(ctxWithMetadata, msg) + case bus.ReloadSuccessfulTopic: + fp.handleReloadSuccess(ctxWithMetadata, msg) case bus.ConfigApplyFailedTopic: fp.handleConfigApplyFailedRequest(ctxWithMetadata, msg) default: @@ -133,11 +133,28 @@ func (fp *FilePlugin) Subscriptions() []string { bus.ConfigUploadRequestTopic, bus.ConfigApplyRequestTopic, bus.ConfigApplyFailedTopic, - bus.ConfigApplySuccessfulTopic, + bus.ReloadSuccessfulTopic, bus.ConfigApplyCompleteTopic, } } +func (fp *FilePlugin) CleanUpConfigApply(ctx context.Context, + configContext *model.NginxConfigContext, + instanceID string, +) { + enableWatcher := &model.EnableWatchers{ + ConfigContext: configContext, + InstanceID: instanceID, + } + + fp.fileManagerService.ClearCache() + + fp.messagePipe.Process(ctx, &bus.Message{ + Data: enableWatcher, + Topic: bus.EnableWatchersTopic, + }) +} + func (fp *FilePlugin) handleConnectionReset(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "File plugin received connection reset message") if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok { @@ -165,20 +182,21 @@ func (fp *FilePlugin) handleConfigApplyComplete(ctx context.Context, msg *bus.Me return } - fp.fileManagerService.ClearCache() fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) + fp.CleanUpConfigApply(ctx, &model.NginxConfigContext{}, response.GetInstanceId()) } -func (fp *FilePlugin) handleConfigApplySuccess(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received config success message") - successMessage, ok := msg.Data.(*model.ConfigApplySuccess) +func (fp *FilePlugin) handleReloadSuccess(ctx context.Context, msg *bus.Message) { + slog.InfoContext(ctx, "File plugin received reload success message", "data", msg.Data) + + successMessage, ok := msg.Data.(*model.ReloadSuccess) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplySuccess", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to *model.ReloadSuccess", "payload", msg.Data) return } - fp.fileManagerService.ClearCache() + fp.CleanUpConfigApply(ctx, successMessage.ConfigContext, successMessage.DataPlaneResponse.GetInstanceId()) if successMessage.ConfigContext.Files != nil { slog.DebugContext(ctx, "Changes made during config apply, update files on disk") @@ -191,6 +209,7 @@ func (fp *FilePlugin) handleConfigApplySuccess(ctx context.Context, msg *bus.Mes slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError) } } + fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: successMessage.DataPlaneResponse}) } @@ -264,13 +283,8 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes "", ) - successMessage := &model.ConfigApplySuccess{ - ConfigContext: &model.NginxConfigContext{}, - DataPlaneResponse: dpResponse, - } - fp.fileManagerService.ClearCache() - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: successMessage}) + fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: dpResponse}) return case model.Error: diff --git a/internal/file/file_plugin_test.go b/internal/file/file_plugin_test.go index 23403a71f..2955dacf3 100644 --- a/internal/file/file_plugin_test.go +++ b/internal/file/file_plugin_test.go @@ -59,7 +59,7 @@ func TestFilePlugin_Subscriptions(t *testing.T) { bus.ConfigUploadRequestTopic, bus.ConfigApplyRequestTopic, bus.ConfigApplyFailedTopic, - bus.ConfigApplySuccessfulTopic, + bus.ReloadSuccessfulTopic, bus.ConfigApplyCompleteTopic, }, filePlugin.Subscriptions(), @@ -210,13 +210,13 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) { case test.configApplyStatus == model.NoChange: assert.Len(t, messages, 1) - response, ok := messages[0].Data.(*model.ConfigApplySuccess) + response, ok := messages[0].Data.(*mpi.DataPlaneResponse) assert.True(t, ok) - assert.Equal(t, bus.ConfigApplySuccessfulTopic, messages[0].Topic) + assert.Equal(t, bus.ConfigApplyCompleteTopic, messages[0].Topic) assert.Equal( t, mpi.CommandResponse_COMMAND_STATUS_OK, - response.DataPlaneResponse.GetCommandResponse().GetStatus(), + response.GetCommandResponse().GetStatus(), ) case test.message == nil: assert.Empty(t, messages) @@ -432,7 +432,7 @@ func TestFilePlugin_Process_ConfigApplyFailedTopic(t *testing.T) { } } -func TestFilePlugin_Process_ConfigApplyRollbackCompleteTopic(t *testing.T) { +func TestFilePlugin_Process_ConfigApplyReloadSuccessTopic(t *testing.T) { ctx := context.Background() instance := protos.NginxOssInstance([]string{}) mockFileManager := &filefakes.FakeFileManagerServiceInterface{} @@ -460,14 +460,22 @@ func TestFilePlugin_Process_ConfigApplyRollbackCompleteTopic(t *testing.T) { InstanceId: instance.GetInstanceMeta().GetInstanceId(), } - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: &model.ConfigApplySuccess{ + filePlugin.Process(ctx, &bus.Message{Topic: bus.ReloadSuccessfulTopic, Data: &model.ReloadSuccess{ ConfigContext: &model.NginxConfigContext{}, DataPlaneResponse: expectedResponse, }}) messages := messagePipe.Messages() - response, ok := messages[0].Data.(*mpi.DataPlaneResponse) + + watchers, ok := messages[0].Data.(*model.EnableWatchers) + assert.True(t, ok) + assert.Equal(t, bus.EnableWatchersTopic, messages[0].Topic) + assert.Equal(t, &model.NginxConfigContext{}, watchers.ConfigContext) + assert.Equal(t, instance.GetInstanceMeta().GetInstanceId(), watchers.InstanceID) + + response, ok := messages[1].Data.(*mpi.DataPlaneResponse) assert.True(t, ok) + assert.Equal(t, bus.DataPlaneResponseTopic, messages[1].Topic) assert.Equal(t, expectedResponse.GetCommandResponse().GetStatus(), response.GetCommandResponse().GetStatus()) assert.Equal(t, expectedResponse.GetCommandResponse().GetMessage(), response.GetCommandResponse().GetMessage()) diff --git a/internal/model/config.go b/internal/model/config.go index 1aaf95c97..e9aee603a 100644 --- a/internal/model/config.go +++ b/internal/model/config.go @@ -77,11 +77,16 @@ const ( OK ) -type ConfigApplySuccess struct { +type ReloadSuccess struct { ConfigContext *NginxConfigContext DataPlaneResponse *v1.DataPlaneResponse } +type EnableWatchers struct { + ConfigContext *NginxConfigContext + InstanceID string +} + //nolint:revive,cyclop // cyclomatic complexity is 16 func (ncc *NginxConfigContext) Equal(otherNginxConfigContext *NginxConfigContext) bool { if ncc.StubStatus != nil && otherNginxConfigContext.StubStatus != nil { diff --git a/internal/resource/resource_plugin.go b/internal/resource/resource_plugin.go index c3d7467b6..036e817b7 100644 --- a/internal/resource/resource_plugin.go +++ b/internal/resource/resource_plugin.go @@ -242,12 +242,12 @@ func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Mes dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_OK, "Config apply successful", data.InstanceID, "") - successMessage := &model.ConfigApplySuccess{ + successMessage := &model.ReloadSuccess{ ConfigContext: configContext, DataPlaneResponse: dpResponse, } - r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: successMessage}) + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ReloadSuccessfulTopic, Data: successMessage}) } func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) { diff --git a/internal/resource/resource_plugin_test.go b/internal/resource/resource_plugin_test.go index a427262bb..f4ab15bed 100644 --- a/internal/resource/resource_plugin_test.go +++ b/internal/resource/resource_plugin_test.go @@ -142,7 +142,7 @@ func TestResource_Process_Apply(t *testing.T) { }, }, applyErr: nil, - topic: []string{bus.ConfigApplySuccessfulTopic}, + topic: []string{bus.ReloadSuccessfulTopic}, }, { name: "Test 2: Write Config Successful Topic - Fail Status", diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 4e3afd143..785fcb2a8 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -140,12 +140,10 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) { switch msg.Topic { case bus.ConfigApplyRequestTopic: w.handleConfigApplyRequest(ctx, msg) - case bus.ConfigApplySuccessfulTopic: - w.handleConfigApplySuccess(ctx, msg) - case bus.ConfigApplyCompleteTopic: - w.handleConfigApplyComplete(ctx, msg) case bus.DataPlaneHealthRequestTopic: w.handleHealthRequest(ctx) + case bus.EnableWatchersTopic: + w.handleEnableWatchers(ctx, msg) default: slog.DebugContext(ctx, "Watcher plugin unknown topic", "topic", msg.Topic) } @@ -154,12 +152,43 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) { func (*Watcher) Subscriptions() []string { return []string{ bus.ConfigApplyRequestTopic, - bus.ConfigApplySuccessfulTopic, - bus.ConfigApplyCompleteTopic, bus.DataPlaneHealthRequestTopic, + bus.EnableWatchersTopic, } } +func (w *Watcher) handleEnableWatchers(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Watcher plugin received enable watchers message") + enableWatchersMessage, ok := msg.Data.(*model.EnableWatchers) + if !ok { + slog.ErrorContext(ctx, "Unable to cast message payload to *model.EnableWatchers", "payload", + msg.Data, "topic", msg.Topic) + + return + } + + instanceID := enableWatchersMessage.InstanceID + configContext := enableWatchersMessage.ConfigContext + + // if config apply ended in a reload there is no need to reparse the config so an empty config context is sent + // from the file plugin + if configContext.InstanceID != "" { + w.instanceWatcherService.HandleNginxConfigContextUpdate(ctx, instanceID, configContext) + } + + w.watcherMutex.Lock() + w.instancesWithConfigApplyInProgress = slices.DeleteFunc( + w.instancesWithConfigApplyInProgress, + func(element string) bool { + return element == instanceID + }, + ) + + w.fileWatcherService.SetEnabled(true) + w.instanceWatcherService.SetEnabled(true) + w.watcherMutex.Unlock() +} + func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "Watcher plugin received config apply request message") managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) @@ -188,37 +217,6 @@ func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message w.instanceWatcherService.SetEnabled(false) } -func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "Watcher plugin received config apply success message") - successMessage, ok := msg.Data.(*model.ConfigApplySuccess) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplySuccess", "payload", - msg.Data, "topic", msg.Topic) - - return - } - - instanceID := successMessage.DataPlaneResponse.GetInstanceId() - - // If the config apply had no changes to any files, it is results in a ConfigApplySuccessfulTopic with an empty - // configContext being sent, there is no need to reparse the config as no change has occurred. - if successMessage.ConfigContext.InstanceID != "" { - w.instanceWatcherService.HandleNginxConfigContextUpdate(ctx, instanceID, successMessage.ConfigContext) - } - - w.watcherMutex.Lock() - w.instancesWithConfigApplyInProgress = slices.DeleteFunc( - w.instancesWithConfigApplyInProgress, - func(element string) bool { - return element == instanceID - }, - ) - - w.fileWatcherService.SetEnabled(true) - w.instanceWatcherService.SetEnabled(true) - w.watcherMutex.Unlock() -} - func (w *Watcher) handleHealthRequest(ctx context.Context) { slog.DebugContext(ctx, "Watcher plugin received health request message") w.messagePipe.Process(ctx, &bus.Message{ @@ -226,31 +224,6 @@ func (w *Watcher) handleHealthRequest(ctx context.Context) { }) } -func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "Watcher plugin received config apply complete message") - response, ok := msg.Data.(*mpi.DataPlaneResponse) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.DataPlaneResponse", "payload", - msg.Data, "topic", msg.Topic) - - return - } - - instanceID := response.GetInstanceId() - - w.watcherMutex.Lock() - defer w.watcherMutex.Unlock() - w.instancesWithConfigApplyInProgress = slices.DeleteFunc( - w.instancesWithConfigApplyInProgress, - func(element string) bool { - return element == instanceID - }, - ) - - w.instanceWatcherService.SetEnabled(true) - w.fileWatcherService.SetEnabled(true) -} - func (w *Watcher) monitorWatchers(ctx context.Context) { for { select { diff --git a/internal/watcher/watcher_plugin_test.go b/internal/watcher/watcher_plugin_test.go index 20cd99957..9331d320c 100644 --- a/internal/watcher/watcher_plugin_test.go +++ b/internal/watcher/watcher_plugin_test.go @@ -17,8 +17,6 @@ import ( "github.com/nginx/agent/v3/internal/watcher/credentials" "github.com/nginx/agent/v3/internal/bus/busfakes" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/nginx/agent/v3/internal/watcher/health" "github.com/nginx/agent/v3/internal/watcher/instance" "github.com/nginx/agent/v3/internal/watcher/watcherfakes" @@ -26,7 +24,6 @@ import ( mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" "github.com/nginx/agent/v3/internal/bus" "github.com/nginx/agent/v3/internal/logger" - "github.com/nginx/agent/v3/pkg/id" testModel "github.com/nginx/agent/v3/test/model" "github.com/nginx/agent/v3/test/protos" "github.com/nginx/agent/v3/test/types" @@ -169,70 +166,78 @@ func TestWatcher_Process_ConfigApplySuccessfulTopic(t *testing.T) { ctx := context.Background() data := protos.NginxOssInstance([]string{}) - response := &model.ConfigApplySuccess{ - ConfigContext: &model.NginxConfigContext{ - InstanceID: data.GetInstanceMeta().GetInstanceId(), - }, - DataPlaneResponse: &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - Timestamp: timestamppb.Now(), + tests := []struct { + data *model.EnableWatchers + name string + inProgress []string + callCount int + empty bool + }{ + { + name: "Test 1: Reparse Config", + data: &model.EnableWatchers{ + ConfigContext: &model.NginxConfigContext{ + InstanceID: data.GetInstanceMeta().GetInstanceId(), + }, + InstanceID: data.GetInstanceMeta().GetInstanceId(), }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_OK, - Message: "Config apply successful", - Error: "", + callCount: 1, + empty: true, + inProgress: []string{ + data.GetInstanceMeta().GetInstanceId(), }, - InstanceId: data.GetInstanceMeta().GetInstanceId(), }, - } - - message := &bus.Message{ - Topic: bus.ConfigApplySuccessfulTopic, - Data: response, - } - - fakeWatcherService := &watcherfakes.FakeInstanceWatcherServiceInterface{} - watcherPlugin := NewWatcher(types.AgentConfig()) - watcherPlugin.instanceWatcherService = fakeWatcherService - watcherPlugin.instancesWithConfigApplyInProgress = []string{data.GetInstanceMeta().GetInstanceId()} - - watcherPlugin.Process(ctx, message) - - assert.Equal(t, 1, fakeWatcherService.HandleNginxConfigContextUpdateCallCount()) - assert.Empty(t, watcherPlugin.instancesWithConfigApplyInProgress) -} - -func TestWatcher_Process_RollbackCompleteTopic(t *testing.T) { - ctx := context.Background() - ossInstance := protos.NginxOssInstance([]string{}) - - response := &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - Timestamp: timestamppb.Now(), + { + name: "Test 2: Don't Reparse Config", + data: &model.EnableWatchers{ + ConfigContext: &model.NginxConfigContext{}, + InstanceID: data.GetInstanceMeta().GetInstanceId(), + }, + callCount: 0, + empty: true, + inProgress: []string{ + data.GetInstanceMeta().GetInstanceId(), + }, }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_OK, - Message: "Config apply successful", - Error: "", + { + name: "Test 3: More than one inProgress Config", + data: &model.EnableWatchers{ + ConfigContext: &model.NginxConfigContext{ + InstanceID: data.GetInstanceMeta().GetInstanceId(), + }, + InstanceID: data.GetInstanceMeta().GetInstanceId(), + }, + callCount: 1, + empty: false, + inProgress: []string{ + data.GetInstanceMeta().GetInstanceId(), + protos.NginxPlusInstance([]string{}).GetInstanceMeta().GetInstanceId(), + }, }, - InstanceId: ossInstance.GetInstanceMeta().GetInstanceId(), } - message := &bus.Message{ - Topic: bus.ConfigApplyCompleteTopic, - Data: response, + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + message := &bus.Message{ + Topic: bus.EnableWatchersTopic, + Data: test.data, + } + + fakeWatcherService := &watcherfakes.FakeInstanceWatcherServiceInterface{} + watcherPlugin := NewWatcher(types.AgentConfig()) + watcherPlugin.instanceWatcherService = fakeWatcherService + watcherPlugin.instancesWithConfigApplyInProgress = test.inProgress + + watcherPlugin.Process(ctx, message) + + assert.Equal(t, test.callCount, fakeWatcherService.HandleNginxConfigContextUpdateCallCount()) + if test.empty { + assert.Empty(t, watcherPlugin.instancesWithConfigApplyInProgress) + } else { + assert.NotEmpty(t, watcherPlugin.instancesWithConfigApplyInProgress) + } + }) } - - watcherPlugin := NewWatcher(types.AgentConfig()) - watcherPlugin.instancesWithConfigApplyInProgress = []string{ossInstance.GetInstanceMeta().GetInstanceId()} - - watcherPlugin.Process(ctx, message) - - assert.Empty(t, watcherPlugin.instancesWithConfigApplyInProgress) } func TestWatcher_Subscriptions(t *testing.T) { @@ -241,9 +246,8 @@ func TestWatcher_Subscriptions(t *testing.T) { t, []string{ bus.ConfigApplyRequestTopic, - bus.ConfigApplySuccessfulTopic, - bus.ConfigApplyCompleteTopic, bus.DataPlaneHealthRequestTopic, + bus.EnableWatchersTopic, }, watcherPlugin.Subscriptions(), ) From 3c933626c42c2606c6810bad91b5ef62357e860b Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 26 Sep 2025 17:19:50 +0100 Subject: [PATCH 2/7] WIP to rollback using temp files --- internal/file/file_manager_service.go | 154 ++++++++++++------ internal/file/file_manager_service_test.go | 9 +- internal/file/file_operator.go | 37 +++-- internal/file/file_plugin.go | 11 +- internal/file/file_service_operator.go | 15 +- .../fake_file_manager_service_interface.go | 72 +++++--- .../credentials/credential_watcher_service.go | 2 +- internal/watcher/file/file_watcher_service.go | 6 +- internal/watcher/watcher_plugin.go | 1 + 9 files changed, 197 insertions(+), 110 deletions(-) diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index 6492e8c47..c800e899f 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -71,7 +71,7 @@ type ( fileToUpdate *mpi.File, ) error SetIsConnected(isConnected bool) - MoveFilesFromTempDirectory(ctx context.Context, fileAction *model.FileCache, tempDir string) error + MoveFileFromTempDirectory(ctx context.Context, fileAction *model.FileCache, tempDir string) error UpdateClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) } @@ -80,6 +80,7 @@ type ( err error) Rollback(ctx context.Context, instanceID string) error ClearCache() + SetConfigPath(configPath string) ConfigUpload(ctx context.Context, configUploadRequest *mpi.ConfigUploadRequest) error ConfigUpdate(ctx context.Context, nginxConfigContext *model.NginxConfigContext) UpdateCurrentFilesOnDisk(ctx context.Context, updateFiles map[string]*mpi.File, referenced bool) error @@ -87,7 +88,7 @@ type ( ctx context.Context, currentFiles map[string]*mpi.File, modifiedFiles map[string]*model.FileCache, - ) (map[string]*model.FileCache, map[string][]byte, error) + ) (map[string]*model.FileCache, error) IsConnected() bool SetIsConnected(isConnected bool) ResetClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) @@ -107,6 +108,9 @@ type FileManagerService struct { currentFilesOnDisk map[string]*mpi.File // key is file path previousManifestFiles map[string]*model.ManifestFile manifestFilePath string + tempConfigDir string + tempRollbackDir string + configPath string rollbackManifest bool filesMutex sync.RWMutex } @@ -124,10 +128,15 @@ func NewFileManagerService(fileServiceClient mpi.FileServiceClient, agentConfig previousManifestFiles: make(map[string]*model.ManifestFile), rollbackManifest: true, manifestFilePath: agentConfig.LibDir + "/manifest.json", + configPath: "/etc/nginx/", manifestLock: manifestLock, } } +func (fms *FileManagerService) SetConfigPath(configPath string) { + fms.configPath = filepath.Dir(configPath) +} + func (fms *FileManagerService) ResetClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) { fms.fileServiceOperator.UpdateClient(ctx, fileServiceClient) slog.DebugContext(ctx, "File manager service reset client successfully") @@ -144,6 +153,9 @@ func (fms *FileManagerService) SetIsConnected(isConnected bool) { func (fms *FileManagerService) ConfigApply(ctx context.Context, configApplyRequest *mpi.ConfigApplyRequest, ) (status model.WriteStatus, err error) { + var configTempErr error + var rollbackTempErr error + fms.rollbackManifest = true fileOverview := configApplyRequest.GetOverview() @@ -156,7 +168,7 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context, return model.Error, allowedErr } - diffFiles, fileContent, compareErr := fms.DetermineFileActions( + diffFiles, compareErr := fms.DetermineFileActions( ctx, fms.currentFilesOnDisk, ConvertToMapOfFileCache(fileOverview.GetFiles()), @@ -170,15 +182,25 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context, return model.NoChange, nil } - fms.rollbackFileContents = fileContent + // fms.rollbackFileContents = fileContent fms.fileActions = diffFiles - tempDir, tempDirError := fms.createTempConfigDirectory(ctx) - if tempDirError != nil { - return model.Error, tempDirError + fms.tempConfigDir, configTempErr = fms.createTempConfigDirectory(ctx, "config") + if configTempErr != nil { + return model.Error, configTempErr + } + + fms.tempRollbackDir, rollbackTempErr = fms.createTempConfigDirectory(ctx, "rollback") + if rollbackTempErr != nil { + return model.Error, rollbackTempErr } - fileErr := fms.executeFileActions(ctx, tempDir) + rollbackTempFilesErr := fms.RollbackTempFiles(ctx) + if rollbackTempFilesErr != nil { + return model.Error, rollbackTempFilesErr + } + + fileErr := fms.executeFileActions(ctx) if fileErr != nil { fms.rollbackManifest = false return model.RollbackRequired, fileErr @@ -193,13 +215,54 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context, return model.OK, nil } +func (fms *FileManagerService) RollbackTempFiles(ctx context.Context) error { + for _, file := range fms.fileActions { + if file.Action == model.Add || file.Action == model.Unchanged { + continue + } + + filePath := file.File.GetFileMeta().GetName() + + if _, err := os.Stat(filePath); os.IsNotExist(err) { + slog.DebugContext(ctx, "Unable to backup file content since file does not exist", + "file", filePath) + + continue + } + + tempFilePath := filepath.Join(fms.tempRollbackDir, filePath) + slog.DebugContext(ctx, "Attempting to backup file content since file exists", "temp_path", tempFilePath) + + moveErr := fms.fileOperator.MoveFile(ctx, filePath, tempFilePath) + + if moveErr != nil { + return moveErr + } + } + + return nil +} + func (fms *FileManagerService) ClearCache() { + slog.Debug("Clearing cache and temp files after config apply") clear(fms.rollbackFileContents) clear(fms.fileActions) clear(fms.previousManifestFiles) + + configErr := os.RemoveAll(fms.tempConfigDir) + if configErr != nil { + slog.Error("error removing temp config directory", "path", fms.tempConfigDir, "err", configErr) + } + + rollbackErr := os.RemoveAll(fms.tempRollbackDir) + if rollbackErr != nil { + slog.Error("error removing temp rollback directory", "path", fms.tempRollbackDir, "err", rollbackErr) + } + + slog.Info("Cleaned up temp files") } -//nolint:revive // cognitive-complexity of 13 max is 12, loop is needed cant be broken up +//nolint:revive,cyclop // cognitive-complexity of 13 max is 12, loop is needed cant be broken up func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) error { slog.InfoContext(ctx, "Rolling back config for instance", "instance_id", instanceID) @@ -218,10 +281,25 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) continue case model.Delete, model.Update: fileMeta := fileAction.File.GetFileMeta() - content := fms.rollbackFileContents[fileMeta.GetName()] - err := fms.fileOperator.Write(ctx, content, fileMeta.GetName(), fileMeta.GetPermissions()) - if err != nil { - return err + fileName := fileMeta.GetName() + + tempFilePath := filepath.Join(fms.tempRollbackDir, fileName) + + // Create parent directories for the target file if they don't exist + if err := os.MkdirAll(filepath.Dir(fileName), dirPerm); err != nil { + return fmt.Errorf("failed to create directories for %s: %w", fileName, err) + } + + slog.InfoContext(ctx, "Moving files during rollback") + moveErr := os.Rename(tempFilePath, fileName) + if moveErr != nil { + return fmt.Errorf("failed to rename file, %s to %s: %w", tempFilePath, fileName, moveErr) + } + + content, readErr := os.ReadFile(fileMeta.GetName()) + if readErr != nil { + return fmt.Errorf("error reading file, unable to generate hash: %s error: %w", + fileMeta.GetName(), readErr) } // currentFilesOnDisk needs to be updated after rollback action is performed @@ -308,20 +386,18 @@ func (fms *FileManagerService) DetermineFileActions( modifiedFiles map[string]*model.FileCache, ) ( map[string]*model.FileCache, - map[string][]byte, error, ) { fms.filesMutex.Lock() defer fms.filesMutex.Unlock() fileDiff := make(map[string]*model.FileCache) // Files that have changed, key is file name - fileContents := make(map[string][]byte) // contents of the file, key is file name _, filesMap, manifestFileErr := fms.manifestFile() if manifestFileErr != nil { if !errors.Is(manifestFileErr, os.ErrNotExist) { - return nil, nil, manifestFileErr + return nil, manifestFileErr } filesMap = currentFiles } @@ -332,18 +408,6 @@ func (fms *FileManagerService) DetermineFileActions( _, exists := modifiedFiles[fileName] if !exists { - // Read file contents before marking it deleted - fileContent, readErr := os.ReadFile(fileName) - if readErr != nil { - if errors.Is(readErr, os.ErrNotExist) { - slog.DebugContext(ctx, "Unable to backup file contents since file does not exist", "file", fileName) - continue - } - - return nil, nil, fmt.Errorf("error reading file %s: %w", fileName, readErr) - } - fileContents[fileName] = fileContent - fileDiff[fileName] = &model.FileCache{ File: manifestFile, Action: model.Delete, @@ -353,7 +417,7 @@ func (fms *FileManagerService) DetermineFileActions( for _, modifiedFile := range modifiedFiles { fileName := modifiedFile.File.GetFileMeta().GetName() - currentFile, ok := filesMap[modifiedFile.File.GetFileMeta().GetName()] + currentFile, ok := filesMap[fileName] // default to unchanged action modifiedFile.Action = model.Unchanged @@ -363,25 +427,20 @@ func (fms *FileManagerService) DetermineFileActions( } // if file doesn't exist in the current files, file has been added // set file action - if _, statErr := os.Stat(modifiedFile.File.GetFileMeta().GetName()); errors.Is(statErr, os.ErrNotExist) { + if _, statErr := os.Stat(fileName); errors.Is(statErr, os.ErrNotExist) { modifiedFile.Action = model.Add - fileDiff[modifiedFile.File.GetFileMeta().GetName()] = modifiedFile + fileDiff[fileName] = modifiedFile continue // if file currently exists and file hash is different, file has been updated // copy contents, set file action } else if ok && modifiedFile.File.GetFileMeta().GetHash() != currentFile.GetFileMeta().GetHash() { - fileContent, readErr := os.ReadFile(fileName) - if readErr != nil { - return nil, nil, fmt.Errorf("error reading file %s, error: %w", fileName, readErr) - } modifiedFile.Action = model.Update - fileContents[fileName] = fileContent - fileDiff[modifiedFile.File.GetFileMeta().GetName()] = modifiedFile + fileDiff[fileName] = modifiedFile } } - return fileDiff, fileContents, nil + return fileDiff, nil } // UpdateCurrentFilesOnDisk updates the FileManagerService currentFilesOnDisk slice which contains the files @@ -485,17 +544,17 @@ func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, m return manifestFiles, fileMap, nil } -func (fms *FileManagerService) executeFileActions(ctx context.Context, tempDir string) (actionError error) { +func (fms *FileManagerService) executeFileActions(ctx context.Context) (actionError error) { // Download files to temporary location - downloadError := fms.downloadUpdatedFilesToTempLocation(ctx, tempDir) + downloadError := fms.downloadUpdatedFilesToTempLocation(ctx, fms.tempConfigDir) if downloadError != nil { return downloadError } // Remove temp files if there is a failure moving or deleting files - actionError = fms.moveOrDeleteFiles(ctx, tempDir, actionError) + actionError = fms.moveOrDeleteFiles(ctx, fms.tempConfigDir, actionError) if actionError != nil { - fms.deleteTempFiles(ctx, tempDir) + fms.deleteTempFiles(ctx, fms.tempConfigDir) } return actionError @@ -522,11 +581,6 @@ func (fms *FileManagerService) downloadUpdatedFilesToTempLocation( } } - // Remove temp files if there is an error downloading any files - if updateError != nil { - fms.deleteTempFiles(ctx, tempDir) - } - return updateError } @@ -545,7 +599,7 @@ actionsLoop: continue case model.Add, model.Update: - err := fms.fileServiceOperator.MoveFilesFromTempDirectory(ctx, fileAction, tempDir) + err := fms.fileServiceOperator.MoveFileFromTempDirectory(ctx, fileAction, tempDir) if err != nil { actionError = err @@ -643,8 +697,8 @@ func (fms *FileManagerService) convertToFile(manifestFile *model.ManifestFile) * } } -func (fms *FileManagerService) createTempConfigDirectory(ctx context.Context) (string, error) { - tempDir, tempDirError := os.MkdirTemp(fms.agentConfig.LibDir, "config") +func (fms *FileManagerService) createTempConfigDirectory(ctx context.Context, pattern string) (string, error) { + tempDir, tempDirError := os.MkdirTemp(fms.configPath, pattern) if tempDirError != nil { return "", fmt.Errorf("failed creating temp config directory: %w", tempDirError) } diff --git a/internal/file/file_manager_service_test.go b/internal/file/file_manager_service_test.go index 5a2644080..e4984d46d 100644 --- a/internal/file/file_manager_service_test.go +++ b/internal/file/file_manager_service_test.go @@ -625,13 +625,12 @@ func TestFileManagerService_DetermineFileActions(t *testing.T) { require.NoError(tt, err) - diff, contents, fileActionErr := fileManagerService.DetermineFileActions( + diff, fileActionErr := fileManagerService.DetermineFileActions( ctx, test.currentFiles, test.modifiedFiles, ) require.NoError(tt, fileActionErr) - assert.Equal(tt, test.expectedContent, contents) assert.Equal(tt, test.expectedCache, diff) }) } @@ -892,7 +891,7 @@ func TestFileManagerService_fileActions(t *testing.T) { fileManagerService.fileActions = filesCache - actionErr := fileManagerService.executeFileActions(ctx, os.TempDir()) + actionErr := fileManagerService.executeFileActions(ctx) require.NoError(t, actionErr) assert.FileExists(t, addFilePath) @@ -1020,14 +1019,14 @@ func TestFileManagerService_createTempConfigDirectory(t *testing.T) { agentConfig: agentConfig, } - dir, err := fileManagerService.createTempConfigDirectory(t.Context()) + dir, err := fileManagerService.createTempConfigDirectory(t.Context(), "config") assert.NotEmpty(t, dir) require.NoError(t, err) // Test for unknown directory path agentConfig.LibDir = "/unknown/" - dir, err = fileManagerService.createTempConfigDirectory(t.Context()) + dir, err = fileManagerService.createTempConfigDirectory(t.Context(), "config") assert.Empty(t, dir) require.Error(t, err) } diff --git a/internal/file/file_operator.go b/internal/file/file_operator.go index d765efc7d..c54482199 100644 --- a/internal/file/file_operator.go +++ b/internal/file/file_operator.go @@ -14,6 +14,7 @@ import ( "log/slog" "os" "path" + "path/filepath" "sync" "github.com/nginx/agent/v3/internal/model" @@ -188,28 +189,34 @@ func (fo *FileOperator) WriteManifestFile( } func (fo *FileOperator) MoveFile(ctx context.Context, sourcePath, destPath string) error { - inputFile, err := os.Open(sourcePath) - if err != nil { - return err + inputFile, openErr := os.Open(sourcePath) + if openErr != nil { + return fmt.Errorf("failed to open source file %s: %w", sourcePath, openErr) } + defer closeFile(ctx, inputFile) - outputFile, err := os.Create(destPath) - if err != nil { - return err + fileInfo, statErr := inputFile.Stat() + if statErr != nil { + return fmt.Errorf("failed to stat source file %s: %w", sourcePath, statErr) } - defer closeFile(ctx, outputFile) - _, err = io.Copy(outputFile, inputFile) - if err != nil { - closeFile(ctx, inputFile) - return err + if dirErr := os.MkdirAll(filepath.Dir(destPath), dirPerm); dirErr != nil { + return fmt.Errorf("failed to create directories for %s: %w", destPath, dirErr) } - closeFile(ctx, inputFile) + outputFile, createErr := os.OpenFile(destPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, fileInfo.Mode()) + if createErr != nil { + return fmt.Errorf("failed to create destination file %s: %w", destPath, createErr) + } + defer closeFile(ctx, outputFile) - err = os.Remove(sourcePath) - if err != nil { - return err + _, copyErr := io.Copy(outputFile, inputFile) + if copyErr != nil { + return fmt.Errorf("failed to copy data from %s to %s: %w", sourcePath, destPath, copyErr) + } + + if err := os.Chmod(outputFile.Name(), fileInfo.Mode()); err != nil { + return fmt.Errorf("failed to change file permissions chmod: %w", err) } return nil diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go index a5c79d4ef..0e9f4dab5 100644 --- a/internal/file/file_plugin.go +++ b/internal/file/file_plugin.go @@ -142,13 +142,14 @@ func (fp *FilePlugin) CleanUpConfigApply(ctx context.Context, configContext *model.NginxConfigContext, instanceID string, ) { + fp.fileManagerService.ClearCache() + + slog.InfoContext(ctx, "Cleaned up temp files") enableWatcher := &model.EnableWatchers{ ConfigContext: configContext, InstanceID: instanceID, } - fp.fileManagerService.ClearCache() - fp.messagePipe.Process(ctx, &bus.Message{ Data: enableWatcher, Topic: bus.EnableWatchersTopic, @@ -235,7 +236,6 @@ func (fp *FilePlugin) handleConfigApplyFailedRequest(ctx context.Context, msg *b mpi.CommandResponse_COMMAND_STATUS_FAILURE, "Config apply failed, rollback failed", data.InstanceID, data.Error.Error()) - fp.fileManagerService.ClearCache() fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse}) fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) @@ -283,7 +283,6 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes "", ) - fp.fileManagerService.ClearCache() fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: dpResponse}) return @@ -302,7 +301,6 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes err.Error(), ) - fp.fileManagerService.ClearCache() fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: response}) return @@ -335,7 +333,6 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes instanceID, rollbackErr.Error()) - fp.fileManagerService.ClearCache() fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: rollbackResponse}) return @@ -348,7 +345,6 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes instanceID, err.Error()) - fp.fileManagerService.ClearCache() fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: response}) return @@ -373,6 +369,7 @@ func (fp *FilePlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Mess return } + fp.fileManagerService.SetConfigPath(nginxConfigContext.ConfigPath) fp.fileManagerService.ConfigUpdate(ctx, nginxConfigContext) } diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go index dd4b166fc..71fef285f 100644 --- a/internal/file/file_service_operator.go +++ b/internal/file/file_service_operator.go @@ -277,7 +277,7 @@ func (fso *FileServiceOperator) UpdateFile( return fso.sendUpdateFileStream(ctx, fileToUpdate, fso.agentConfig.Client.Grpc.FileChunkSize) } -func (fso *FileServiceOperator) MoveFilesFromTempDirectory( +func (fso *FileServiceOperator) MoveFileFromTempDirectory( ctx context.Context, fileAction *model.FileCache, tempDir string, ) error { fileName := fileAction.File.GetFileMeta().GetName() @@ -289,18 +289,9 @@ func (fso *FileServiceOperator) MoveFilesFromTempDirectory( return fmt.Errorf("failed to create directories for %s: %w", fileName, err) } - moveErr := fso.fileOperator.MoveFile(ctx, tempFilePath, fileName) + moveErr := os.Rename(tempFilePath, fileName) if moveErr != nil { - return fmt.Errorf("failed to move file: %w", moveErr) - } - - if removeError := os.Remove(tempFilePath); removeError != nil && !os.IsNotExist(removeError) { - slog.ErrorContext( - ctx, - "Error deleting temp file", - "file", fileName, - "error", removeError, - ) + return fmt.Errorf("failed to rename file: %w", moveErr) } return fso.validateFileHash(fileName, fileAction.File.GetFileMeta().GetHash()) diff --git a/internal/file/filefakes/fake_file_manager_service_interface.go b/internal/file/filefakes/fake_file_manager_service_interface.go index 9d1943659..0a7a551ea 100644 --- a/internal/file/filefakes/fake_file_manager_service_interface.go +++ b/internal/file/filefakes/fake_file_manager_service_interface.go @@ -46,7 +46,7 @@ type FakeFileManagerServiceInterface struct { configUploadReturnsOnCall map[int]struct { result1 error } - DetermineFileActionsStub func(context.Context, map[string]*v1.File, map[string]*model.FileCache) (map[string]*model.FileCache, map[string][]byte, error) + DetermineFileActionsStub func(context.Context, map[string]*v1.File, map[string]*model.FileCache) (map[string]*model.FileCache, error) determineFileActionsMutex sync.RWMutex determineFileActionsArgsForCall []struct { arg1 context.Context @@ -55,13 +55,11 @@ type FakeFileManagerServiceInterface struct { } determineFileActionsReturns struct { result1 map[string]*model.FileCache - result2 map[string][]byte - result3 error + result2 error } determineFileActionsReturnsOnCall map[int]struct { result1 map[string]*model.FileCache - result2 map[string][]byte - result3 error + result2 error } IsConnectedStub func() bool isConnectedMutex sync.RWMutex @@ -91,6 +89,11 @@ type FakeFileManagerServiceInterface struct { rollbackReturnsOnCall map[int]struct { result1 error } + SetConfigPathStub func(string) + setConfigPathMutex sync.RWMutex + setConfigPathArgsForCall []struct { + arg1 string + } SetIsConnectedStub func(bool) setIsConnectedMutex sync.RWMutex setIsConnectedArgsForCall []struct { @@ -297,7 +300,7 @@ func (fake *FakeFileManagerServiceInterface) ConfigUploadReturnsOnCall(i int, re }{result1} } -func (fake *FakeFileManagerServiceInterface) DetermineFileActions(arg1 context.Context, arg2 map[string]*v1.File, arg3 map[string]*model.FileCache) (map[string]*model.FileCache, map[string][]byte, error) { +func (fake *FakeFileManagerServiceInterface) DetermineFileActions(arg1 context.Context, arg2 map[string]*v1.File, arg3 map[string]*model.FileCache) (map[string]*model.FileCache, error) { fake.determineFileActionsMutex.Lock() ret, specificReturn := fake.determineFileActionsReturnsOnCall[len(fake.determineFileActionsArgsForCall)] fake.determineFileActionsArgsForCall = append(fake.determineFileActionsArgsForCall, struct { @@ -313,9 +316,9 @@ func (fake *FakeFileManagerServiceInterface) DetermineFileActions(arg1 context.C return stub(arg1, arg2, arg3) } if specificReturn { - return ret.result1, ret.result2, ret.result3 + return ret.result1, ret.result2 } - return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 + return fakeReturns.result1, fakeReturns.result2 } func (fake *FakeFileManagerServiceInterface) DetermineFileActionsCallCount() int { @@ -324,7 +327,7 @@ func (fake *FakeFileManagerServiceInterface) DetermineFileActionsCallCount() int return len(fake.determineFileActionsArgsForCall) } -func (fake *FakeFileManagerServiceInterface) DetermineFileActionsCalls(stub func(context.Context, map[string]*v1.File, map[string]*model.FileCache) (map[string]*model.FileCache, map[string][]byte, error)) { +func (fake *FakeFileManagerServiceInterface) DetermineFileActionsCalls(stub func(context.Context, map[string]*v1.File, map[string]*model.FileCache) (map[string]*model.FileCache, error)) { fake.determineFileActionsMutex.Lock() defer fake.determineFileActionsMutex.Unlock() fake.DetermineFileActionsStub = stub @@ -337,33 +340,30 @@ func (fake *FakeFileManagerServiceInterface) DetermineFileActionsArgsForCall(i i return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeFileManagerServiceInterface) DetermineFileActionsReturns(result1 map[string]*model.FileCache, result2 map[string][]byte, result3 error) { +func (fake *FakeFileManagerServiceInterface) DetermineFileActionsReturns(result1 map[string]*model.FileCache, result2 error) { fake.determineFileActionsMutex.Lock() defer fake.determineFileActionsMutex.Unlock() fake.DetermineFileActionsStub = nil fake.determineFileActionsReturns = struct { result1 map[string]*model.FileCache - result2 map[string][]byte - result3 error - }{result1, result2, result3} + result2 error + }{result1, result2} } -func (fake *FakeFileManagerServiceInterface) DetermineFileActionsReturnsOnCall(i int, result1 map[string]*model.FileCache, result2 map[string][]byte, result3 error) { +func (fake *FakeFileManagerServiceInterface) DetermineFileActionsReturnsOnCall(i int, result1 map[string]*model.FileCache, result2 error) { fake.determineFileActionsMutex.Lock() defer fake.determineFileActionsMutex.Unlock() fake.DetermineFileActionsStub = nil if fake.determineFileActionsReturnsOnCall == nil { fake.determineFileActionsReturnsOnCall = make(map[int]struct { result1 map[string]*model.FileCache - result2 map[string][]byte - result3 error + result2 error }) } fake.determineFileActionsReturnsOnCall[i] = struct { result1 map[string]*model.FileCache - result2 map[string][]byte - result3 error - }{result1, result2, result3} + result2 error + }{result1, result2} } func (fake *FakeFileManagerServiceInterface) IsConnected() bool { @@ -514,6 +514,38 @@ func (fake *FakeFileManagerServiceInterface) RollbackReturnsOnCall(i int, result }{result1} } +func (fake *FakeFileManagerServiceInterface) SetConfigPath(arg1 string) { + fake.setConfigPathMutex.Lock() + fake.setConfigPathArgsForCall = append(fake.setConfigPathArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.SetConfigPathStub + fake.recordInvocation("SetConfigPath", []interface{}{arg1}) + fake.setConfigPathMutex.Unlock() + if stub != nil { + fake.SetConfigPathStub(arg1) + } +} + +func (fake *FakeFileManagerServiceInterface) SetConfigPathCallCount() int { + fake.setConfigPathMutex.RLock() + defer fake.setConfigPathMutex.RUnlock() + return len(fake.setConfigPathArgsForCall) +} + +func (fake *FakeFileManagerServiceInterface) SetConfigPathCalls(stub func(string)) { + fake.setConfigPathMutex.Lock() + defer fake.setConfigPathMutex.Unlock() + fake.SetConfigPathStub = stub +} + +func (fake *FakeFileManagerServiceInterface) SetConfigPathArgsForCall(i int) string { + fake.setConfigPathMutex.RLock() + defer fake.setConfigPathMutex.RUnlock() + argsForCall := fake.setConfigPathArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeFileManagerServiceInterface) SetIsConnected(arg1 bool) { fake.setIsConnectedMutex.Lock() fake.setIsConnectedArgsForCall = append(fake.setIsConnectedArgsForCall, struct { @@ -628,6 +660,8 @@ func (fake *FakeFileManagerServiceInterface) Invocations() map[string][][]interf defer fake.resetClientMutex.RUnlock() fake.rollbackMutex.RLock() defer fake.rollbackMutex.RUnlock() + fake.setConfigPathMutex.RLock() + defer fake.setConfigPathMutex.RUnlock() fake.setIsConnectedMutex.RLock() defer fake.setIsConnectedMutex.RUnlock() fake.updateCurrentFilesOnDiskMutex.RLock() diff --git a/internal/watcher/credentials/credential_watcher_service.go b/internal/watcher/credentials/credential_watcher_service.go index 09434e46e..8e9cd744c 100644 --- a/internal/watcher/credentials/credential_watcher_service.go +++ b/internal/watcher/credentials/credential_watcher_service.go @@ -146,7 +146,7 @@ func (cws *CredentialWatcherService) handleEvent(ctx context.Context, event fsno return } - slog.DebugContext(ctx, "Processing FSNotify event", "event", event) + slog.InfoContext(ctx, "Credential Processing FSNotify event", "event", event) switch { case event.Has(fsnotify.Remove): diff --git a/internal/watcher/file/file_watcher_service.go b/internal/watcher/file/file_watcher_service.go index 57dfc19d5..8b8cd9ae6 100644 --- a/internal/watcher/file/file_watcher_service.go +++ b/internal/watcher/file/file_watcher_service.go @@ -89,6 +89,7 @@ func (fws *FileWatcherService) Watch(ctx context.Context, ch chan<- FileUpdateMe if fws.watcher != nil { select { case event := <-fws.watcher.Events: + slog.InfoContext(ctx, "--------- Event received", "event", event) fws.handleEvent(ctx, event) case watcherError := <-fws.watcher.Errors: slog.ErrorContext(ctx, "Unexpected error in file watcher", "error", watcherError) @@ -172,13 +173,16 @@ func (fws *FileWatcherService) removeWatchers(ctx context.Context) { } func (fws *FileWatcherService) handleEvent(ctx context.Context, event fsnotify.Event) { + slog.InfoContext(ctx, "Is enabled", "bool", fws.enabled.Load()) if fws.enabled.Load() { if fws.isEventSkippable(event) { return } - slog.DebugContext(ctx, "Processing FSNotify event", "event", event) + slog.InfoContext(ctx, "Processing FSNotify event", "event", event) fws.filesChanged.Store(true) + } else { + slog.InfoContext(ctx, "Ignoring FSNotify event", "event", event) } } diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 785fcb2a8..c9cdce2d8 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -186,6 +186,7 @@ func (w *Watcher) handleEnableWatchers(ctx context.Context, msg *bus.Message) { w.fileWatcherService.SetEnabled(true) w.instanceWatcherService.SetEnabled(true) + slog.InfoContext(ctx, "Enabled Watchers") w.watcherMutex.Unlock() } From 4c2a20afdf8a0486df9b7e0300f5b87d0cc3725b Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Mon, 29 Sep 2025 17:13:26 +0100 Subject: [PATCH 3/7] Fixed file watcher not disabling correctly --- internal/file/file_manager_service.go | 25 ++---- internal/file/file_manager_service_test.go | 79 ++++++++++++++----- internal/file/file_operator_test.go | 1 - internal/file/file_plugin.go | 4 +- .../credentials/credential_watcher_service.go | 2 +- internal/watcher/file/file_watcher_service.go | 27 +++++-- .../watcher/file/file_watcher_service_test.go | 8 +- internal/watcher/watcher_plugin.go | 5 +- 8 files changed, 98 insertions(+), 53 deletions(-) diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index c800e899f..2f67b2ea9 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -102,8 +102,6 @@ type FileManagerService struct { fileServiceOperator fileServiceOperatorInterface // map of files and the actions performed on them during config apply fileActions map[string]*model.FileCache // key is file path - // map of the contents of files which have been updated or deleted during config apply, used during rollback - rollbackFileContents map[string][]byte // key is file path // map of the files currently on disk, used to determine the file action during config apply currentFilesOnDisk map[string]*mpi.File // key is file path previousManifestFiles map[string]*model.ManifestFile @@ -123,7 +121,6 @@ func NewFileManagerService(fileServiceClient mpi.FileServiceClient, agentConfig fileOperator: NewFileOperator(manifestLock), fileServiceOperator: NewFileServiceOperator(agentConfig, fileServiceClient, manifestLock), fileActions: make(map[string]*model.FileCache), - rollbackFileContents: make(map[string][]byte), currentFilesOnDisk: make(map[string]*mpi.File), previousManifestFiles: make(map[string]*model.ManifestFile), rollbackManifest: true, @@ -182,15 +179,14 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context, return model.NoChange, nil } - // fms.rollbackFileContents = fileContent fms.fileActions = diffFiles - fms.tempConfigDir, configTempErr = fms.createTempConfigDirectory(ctx, "config") + fms.tempConfigDir, configTempErr = fms.createTempConfigDirectory("config") if configTempErr != nil { return model.Error, configTempErr } - fms.tempRollbackDir, rollbackTempErr = fms.createTempConfigDirectory(ctx, "rollback") + fms.tempRollbackDir, rollbackTempErr = fms.createTempConfigDirectory("rollback") if rollbackTempErr != nil { return model.Error, rollbackTempErr } @@ -245,7 +241,6 @@ func (fms *FileManagerService) RollbackTempFiles(ctx context.Context) error { func (fms *FileManagerService) ClearCache() { slog.Debug("Clearing cache and temp files after config apply") - clear(fms.rollbackFileContents) clear(fms.fileActions) clear(fms.previousManifestFiles) @@ -258,8 +253,6 @@ func (fms *FileManagerService) ClearCache() { if rollbackErr != nil { slog.Error("error removing temp rollback directory", "path", fms.tempRollbackDir, "err", rollbackErr) } - - slog.Info("Cleaned up temp files") } //nolint:revive,cyclop // cognitive-complexity of 13 max is 12, loop is needed cant be broken up @@ -290,7 +283,6 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) return fmt.Errorf("failed to create directories for %s: %w", fileName, err) } - slog.InfoContext(ctx, "Moving files during rollback") moveErr := os.Rename(tempFilePath, fileName) if moveErr != nil { return fmt.Errorf("failed to rename file, %s to %s: %w", tempFilePath, fileName, moveErr) @@ -407,6 +399,11 @@ func (fms *FileManagerService) DetermineFileActions( for fileName, manifestFile := range filesMap { _, exists := modifiedFiles[fileName] + if _, err := os.Stat(fileName); os.IsNotExist(err) { + slog.DebugContext(ctx, "File already deleted, skipping", "file", fileName) + continue + } + if !exists { fileDiff[fileName] = &model.FileCache{ File: manifestFile, @@ -697,17 +694,11 @@ func (fms *FileManagerService) convertToFile(manifestFile *model.ManifestFile) * } } -func (fms *FileManagerService) createTempConfigDirectory(ctx context.Context, pattern string) (string, error) { +func (fms *FileManagerService) createTempConfigDirectory(pattern string) (string, error) { tempDir, tempDirError := os.MkdirTemp(fms.configPath, pattern) if tempDirError != nil { return "", fmt.Errorf("failed creating temp config directory: %w", tempDirError) } - defer func(path string) { - err := os.RemoveAll(path) - if err != nil { - slog.ErrorContext(ctx, "error removing temp config directory", "path", path, "err", err) - } - }(tempDir) return tempDir, nil } diff --git a/internal/file/file_manager_service_test.go b/internal/file/file_manager_service_test.go index e4984d46d..c695e4af9 100644 --- a/internal/file/file_manager_service_test.go +++ b/internal/file/file_manager_service_test.go @@ -59,6 +59,7 @@ func TestFileManagerService_ConfigApply_Add(t *testing.T) { agentConfig.AllowedDirectories = []string{tempDir} fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig, &sync.RWMutex{}) + fileManagerService.configPath = filepath.Dir(filePath) fileManagerService.agentConfig.LibDir = manifestDirPath fileManagerService.manifestFilePath = manifestFilePath @@ -108,6 +109,7 @@ func TestFileManagerService_ConfigApply_Add_LargeFile(t *testing.T) { agentConfig.AllowedDirectories = []string{tempDir} fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig, &sync.RWMutex{}) fileManagerService.agentConfig.LibDir = manifestDirPath + fileManagerService.configPath = filepath.Dir(filePath) fileManagerService.manifestFilePath = manifestFilePath request := protos.CreateConfigApplyRequest(overview) @@ -168,6 +170,7 @@ func TestFileManagerService_ConfigApply_Update(t *testing.T) { fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig, &sync.RWMutex{}) fileManagerService.agentConfig.LibDir = manifestDirPath + fileManagerService.configPath = filepath.Dir(tempFile.Name()) fileManagerService.manifestFilePath = manifestFilePath err := fileManagerService.UpdateCurrentFilesOnDisk(ctx, filesOnDisk, false) require.NoError(t, err) @@ -179,7 +182,11 @@ func TestFileManagerService_ConfigApply_Update(t *testing.T) { data, readErr := os.ReadFile(tempFile.Name()) require.NoError(t, readErr) assert.Equal(t, fileContent, data) - assert.Equal(t, fileManagerService.rollbackFileContents[tempFile.Name()], previousFileContent) + + content, err := os.ReadFile(fileManagerService.tempRollbackDir + tempFile.Name()) + require.NoError(t, err) + assert.Equal(t, previousFileContent, content) + assert.Equal(t, fileManagerService.fileActions[tempFile.Name()].File, overview.GetFiles()[0]) assert.True(t, fileManagerService.rollbackManifest) } @@ -219,6 +226,7 @@ func TestFileManagerService_ConfigApply_Delete(t *testing.T) { fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig, &sync.RWMutex{}) fileManagerService.agentConfig.LibDir = manifestDirPath fileManagerService.manifestFilePath = manifestFilePath + fileManagerService.configPath = filepath.Dir(tempFile.Name()) err := fileManagerService.UpdateCurrentFilesOnDisk(ctx, filesOnDisk, false) require.NoError(t, err) @@ -236,7 +244,11 @@ func TestFileManagerService_ConfigApply_Delete(t *testing.T) { writeStatus, err := fileManagerService.ConfigApply(ctx, request) require.NoError(t, err) assert.NoFileExists(t, tempFile.Name()) - assert.Equal(t, fileManagerService.rollbackFileContents[tempFile.Name()], fileContent) + + content, err := os.ReadFile(fileManagerService.tempRollbackDir + tempFile.Name()) + require.NoError(t, err) + assert.Equal(t, fileContent, content) + assert.Equal(t, fileManagerService.fileActions[tempFile.Name()].File.GetFileMeta().GetName(), filesOnDisk[tempFile.Name()].GetFileMeta().GetName(), @@ -278,6 +290,7 @@ func TestFileManagerService_ConfigApply_Failed(t *testing.T) { fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig, &sync.RWMutex{}) fileManagerService.agentConfig.LibDir = manifestDirPath + fileManagerService.configPath = filepath.Dir(filePath) fileManagerService.manifestFilePath = manifestFilePath request := protos.CreateConfigApplyRequest(overview) @@ -322,9 +335,18 @@ func TestFileManagerService_checkAllowedDirectory(t *testing.T) { require.Error(t, err) } +//nolint:usetesting // need to use MkDirTemp instead of t.tempDir for rollback as t.tempDir does not accept a pattern func TestFileManagerService_ClearCache(t *testing.T) { + tempDir := t.TempDir() + rollbackDir, err := os.MkdirTemp(tempDir, "rollback") + require.NoError(t, err) + configDir, err := os.MkdirTemp(tempDir, "config") + require.NoError(t, err) + fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig(), &sync.RWMutex{}) + fileManagerService.tempConfigDir = configDir + fileManagerService.tempRollbackDir = rollbackDir filesCache := map[string]*model.FileCache{ "file/path/test.conf": { @@ -340,25 +362,27 @@ func TestFileManagerService_ClearCache(t *testing.T) { }, } - contentsCache := map[string][]byte{ - "file/path/test.conf": []byte("some test data"), - } - fileManagerService.fileActions = filesCache - fileManagerService.rollbackFileContents = contentsCache assert.NotEmpty(t, fileManagerService.fileActions) - assert.NotEmpty(t, fileManagerService.rollbackFileContents) fileManagerService.ClearCache() assert.Empty(t, fileManagerService.fileActions) - assert.Empty(t, fileManagerService.rollbackFileContents) + + _, statErr := os.Stat(fileManagerService.tempRollbackDir) + assert.True(t, os.IsNotExist(statErr)) + _, statConfigErr := os.Stat(fileManagerService.tempConfigDir) + assert.True(t, os.IsNotExist(statConfigErr)) } +//nolint:usetesting // need to use MkDirTemp instead of t.tempDir for rollback as t.tempDir does not accept a pattern func TestFileManagerService_Rollback(t *testing.T) { ctx := context.Background() tempDir := t.TempDir() + rollbackDir, mkdirErr := os.MkdirTemp(tempDir, "rollback") + require.NoError(t, mkdirErr) + deleteFilePath := filepath.Join(tempDir, "nginx_delete.conf") newFileContent := []byte("location /test {\n return 200 \"This config needs to be rolled back\\n\";\n}") @@ -373,6 +397,25 @@ func TestFileManagerService_Rollback(t *testing.T) { _, writeErr = updateFile.Write(newFileContent) require.NoError(t, writeErr) + helpers.CreateDirWithErrorCheck(t, rollbackDir+tempDir) + + tempAddFile, createErr := os.Create(rollbackDir + addFile.Name()) + require.NoError(t, createErr) + _, writeErr = tempAddFile.Write(oldFileContent) + require.NoError(t, writeErr) + + tempUpdateFile, createErr := os.Create(rollbackDir + updateFile.Name()) + require.NoError(t, createErr) + _, writeErr = tempUpdateFile.Write(oldFileContent) + require.NoError(t, writeErr) + t.Log(tempUpdateFile.Name()) + + tempDeleteFile, createErr := os.Create(rollbackDir + tempDir + "/nginx_delete.conf") + require.NoError(t, createErr) + _, writeErr = tempDeleteFile.Write(oldFileContent) + require.NoError(t, writeErr) + t.Log(tempDeleteFile.Name()) + manifestDirPath := tempDir manifestFilePath := manifestDirPath + "/manifest.json" helpers.CreateFileWithErrorCheck(t, manifestDirPath, "manifest.json") @@ -430,17 +473,14 @@ func TestFileManagerService_Rollback(t *testing.T) { }, }, } - fileContentCache := map[string][]byte{ - deleteFilePath: oldFileContent, - updateFile.Name(): oldFileContent, - } instanceID := protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId() fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig(), &sync.RWMutex{}) - fileManagerService.rollbackFileContents = fileContentCache fileManagerService.fileActions = filesCache fileManagerService.agentConfig.LibDir = manifestDirPath + fileManagerService.tempRollbackDir = rollbackDir + fileManagerService.configPath = filepath.Dir(updateFile.Name()) fileManagerService.manifestFilePath = manifestFilePath err := fileManagerService.Rollback(ctx, instanceID) @@ -622,6 +662,7 @@ func TestFileManagerService_DetermineFileActions(t *testing.T) { fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig(), &sync.RWMutex{}) fileManagerService.agentConfig.LibDir = manifestDirPath fileManagerService.manifestFilePath = manifestFilePath + fileManagerService.configPath = filepath.Dir(updateTestFile.Name()) require.NoError(tt, err) @@ -1013,20 +1054,22 @@ func TestFileManagerService_deleteTempFiles(t *testing.T) { func TestFileManagerService_createTempConfigDirectory(t *testing.T) { agentConfig := types.AgentConfig() - agentConfig.LibDir = t.TempDir() + tempDir := t.TempDir() + configPath := tempDir fileManagerService := FileManagerService{ agentConfig: agentConfig, + configPath: configPath, } - dir, err := fileManagerService.createTempConfigDirectory(t.Context(), "config") + dir, err := fileManagerService.createTempConfigDirectory("config") assert.NotEmpty(t, dir) require.NoError(t, err) // Test for unknown directory path - agentConfig.LibDir = "/unknown/" + fileManagerService.configPath = "/unknown/" - dir, err = fileManagerService.createTempConfigDirectory(t.Context(), "config") + dir, err = fileManagerService.createTempConfigDirectory("config") assert.Empty(t, dir) require.Error(t, err) } diff --git a/internal/file/file_operator_test.go b/internal/file/file_operator_test.go index 4a49fcdd1..bf1a00d44 100644 --- a/internal/file/file_operator_test.go +++ b/internal/file/file_operator_test.go @@ -68,7 +68,6 @@ func TestFileOperator_MoveFile_fileExists(t *testing.T) { err = fileOperator.MoveFile(t.Context(), tempFile, newFile) require.NoError(t, err) - assert.NoFileExists(t, tempFile) assert.FileExists(t, newFile) } diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go index 0e9f4dab5..cb3e2a548 100644 --- a/internal/file/file_plugin.go +++ b/internal/file/file_plugin.go @@ -144,7 +144,6 @@ func (fp *FilePlugin) CleanUpConfigApply(ctx context.Context, ) { fp.fileManagerService.ClearCache() - slog.InfoContext(ctx, "Cleaned up temp files") enableWatcher := &model.EnableWatchers{ ConfigContext: configContext, InstanceID: instanceID, @@ -188,7 +187,7 @@ func (fp *FilePlugin) handleConfigApplyComplete(ctx context.Context, msg *bus.Me } func (fp *FilePlugin) handleReloadSuccess(ctx context.Context, msg *bus.Message) { - slog.InfoContext(ctx, "File plugin received reload success message", "data", msg.Data) + slog.DebugContext(ctx, "File plugin received reload success message", "data", msg.Data) successMessage, ok := msg.Data.(*model.ReloadSuccess) @@ -210,7 +209,6 @@ func (fp *FilePlugin) handleReloadSuccess(ctx context.Context, msg *bus.Message) slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError) } } - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: successMessage.DataPlaneResponse}) } diff --git a/internal/watcher/credentials/credential_watcher_service.go b/internal/watcher/credentials/credential_watcher_service.go index 8e9cd744c..09434e46e 100644 --- a/internal/watcher/credentials/credential_watcher_service.go +++ b/internal/watcher/credentials/credential_watcher_service.go @@ -146,7 +146,7 @@ func (cws *CredentialWatcherService) handleEvent(ctx context.Context, event fsno return } - slog.InfoContext(ctx, "Credential Processing FSNotify event", "event", event) + slog.DebugContext(ctx, "Processing FSNotify event", "event", event) switch { case event.Has(fsnotify.Remove): diff --git a/internal/watcher/file/file_watcher_service.go b/internal/watcher/file/file_watcher_service.go index 8b8cd9ae6..58418857d 100644 --- a/internal/watcher/file/file_watcher_service.go +++ b/internal/watcher/file/file_watcher_service.go @@ -89,7 +89,6 @@ func (fws *FileWatcherService) Watch(ctx context.Context, ch chan<- FileUpdateMe if fws.watcher != nil { select { case event := <-fws.watcher.Events: - slog.InfoContext(ctx, "--------- Event received", "event", event) fws.handleEvent(ctx, event) case watcherError := <-fws.watcher.Errors: slog.ErrorContext(ctx, "Unexpected error in file watcher", "error", watcherError) @@ -98,8 +97,25 @@ func (fws *FileWatcherService) Watch(ctx context.Context, ch chan<- FileUpdateMe } } -func (fws *FileWatcherService) SetEnabled(enabled bool) { - fws.enabled.Store(enabled) +func (fws *FileWatcherService) DisableWatcher(ctx context.Context) { + if fws.watcher != nil && fws.watcher.WatchList() != nil { + paths := fws.watcher.WatchList() + slog.DebugContext(ctx, "Removing watchers", "paths", paths) + for _, filePath := range paths { + err := fws.watcher.Remove(filePath) + if err != nil { + slog.ErrorContext(ctx, "Unable to remove watcher file", "path", filePath, "error", err) + } + } + } + fws.enabled.Store(false) +} + +func (fws *FileWatcherService) EnableWatcher(ctx context.Context) { + if fws.watcher != nil && fws.watcher.WatchList() != nil && len(fws.watcher.WatchList()) == 0 { + fws.addWatchers(ctx) + } + fws.enabled.Store(true) } func (fws *FileWatcherService) Update(ctx context.Context, nginxConfigContext *model.NginxConfigContext) { @@ -173,16 +189,13 @@ func (fws *FileWatcherService) removeWatchers(ctx context.Context) { } func (fws *FileWatcherService) handleEvent(ctx context.Context, event fsnotify.Event) { - slog.InfoContext(ctx, "Is enabled", "bool", fws.enabled.Load()) if fws.enabled.Load() { if fws.isEventSkippable(event) { return } - slog.InfoContext(ctx, "Processing FSNotify event", "event", event) + slog.DebugContext(ctx, "Processing FSNotify event", "event", event) fws.filesChanged.Store(true) - } else { - slog.InfoContext(ctx, "Ignoring FSNotify event", "event", event) } } diff --git a/internal/watcher/file/file_watcher_service_test.go b/internal/watcher/file/file_watcher_service_test.go index 6af8c267f..922625d13 100644 --- a/internal/watcher/file/file_watcher_service_test.go +++ b/internal/watcher/file/file_watcher_service_test.go @@ -8,6 +8,7 @@ package file import ( "bytes" "context" + "log/slog" "os" "path" "path/filepath" @@ -42,10 +43,10 @@ func TestFileWatcherService_SetEnabled(t *testing.T) { fileWatcherService := NewFileWatcherService(types.AgentConfig()) assert.True(t, fileWatcherService.enabled.Load()) - fileWatcherService.SetEnabled(false) + fileWatcherService.DisableWatcher(t.Context()) assert.False(t, fileWatcherService.enabled.Load()) - fileWatcherService.SetEnabled(true) + fileWatcherService.EnableWatcher(t.Context()) assert.True(t, fileWatcherService.enabled.Load()) } @@ -265,7 +266,8 @@ func TestFileWatcherService_Watch(t *testing.T) { defer os.Remove(skippableFile.Name()) select { - case <-channel: + case file := <-channel: + slog.Info("Skippable file updated", "", file) t.Fatalf("Expected file to be skipped: %v", skippableFile.Name()) case <-time.After(150 * time.Millisecond): return diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index c9cdce2d8..5769c45c7 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -184,9 +184,8 @@ func (w *Watcher) handleEnableWatchers(ctx context.Context, msg *bus.Message) { }, ) - w.fileWatcherService.SetEnabled(true) + w.fileWatcherService.EnableWatcher(ctx) w.instanceWatcherService.SetEnabled(true) - slog.InfoContext(ctx, "Enabled Watchers") w.watcherMutex.Unlock() } @@ -214,7 +213,7 @@ func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message defer w.watcherMutex.Unlock() w.instancesWithConfigApplyInProgress = append(w.instancesWithConfigApplyInProgress, instanceID) - w.fileWatcherService.SetEnabled(false) + w.fileWatcherService.DisableWatcher(ctx) w.instanceWatcherService.SetEnabled(false) } From 810d10617ffe1275559652d059e68a6698a21c09 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Tue, 30 Sep 2025 11:31:21 +0100 Subject: [PATCH 4/7] PR feedback --- internal/file/file_manager_service.go | 118 ++++++++++-------- internal/file/file_plugin.go | 10 +- internal/file/file_service_operator.go | 13 +- internal/watcher/file/file_watcher_service.go | 2 + .../watcher/file/file_watcher_service_test.go | 4 +- internal/watcher/watcher_plugin.go | 2 +- 6 files changed, 79 insertions(+), 70 deletions(-) diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index 2f67b2ea9..be6cdf0ae 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -71,7 +71,7 @@ type ( fileToUpdate *mpi.File, ) error SetIsConnected(isConnected bool) - MoveFileFromTempDirectory(ctx context.Context, fileAction *model.FileCache, tempDir string) error + renameFile(ctx context.Context, hash, fileName, tempDir string) error UpdateClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) } @@ -191,7 +191,7 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context, return model.Error, rollbackTempErr } - rollbackTempFilesErr := fms.RollbackTempFiles(ctx) + rollbackTempFilesErr := fms.backupFiles(ctx) if rollbackTempFilesErr != nil { return model.Error, rollbackTempFilesErr } @@ -211,34 +211,6 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context, return model.OK, nil } -func (fms *FileManagerService) RollbackTempFiles(ctx context.Context) error { - for _, file := range fms.fileActions { - if file.Action == model.Add || file.Action == model.Unchanged { - continue - } - - filePath := file.File.GetFileMeta().GetName() - - if _, err := os.Stat(filePath); os.IsNotExist(err) { - slog.DebugContext(ctx, "Unable to backup file content since file does not exist", - "file", filePath) - - continue - } - - tempFilePath := filepath.Join(fms.tempRollbackDir, filePath) - slog.DebugContext(ctx, "Attempting to backup file content since file exists", "temp_path", tempFilePath) - - moveErr := fms.fileOperator.MoveFile(ctx, filePath, tempFilePath) - - if moveErr != nil { - return moveErr - } - } - - return nil -} - func (fms *FileManagerService) ClearCache() { slog.Debug("Clearing cache and temp files after config apply") clear(fms.fileActions) @@ -246,12 +218,12 @@ func (fms *FileManagerService) ClearCache() { configErr := os.RemoveAll(fms.tempConfigDir) if configErr != nil { - slog.Error("error removing temp config directory", "path", fms.tempConfigDir, "err", configErr) + slog.Error("Error removing temp config directory", "path", fms.tempConfigDir, "err", configErr) } rollbackErr := os.RemoveAll(fms.tempRollbackDir) if rollbackErr != nil { - slog.Error("error removing temp rollback directory", "path", fms.tempRollbackDir, "err", rollbackErr) + slog.Error("Error removing temp rollback directory", "path", fms.tempRollbackDir, "err", rollbackErr) } } @@ -273,30 +245,14 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) continue case model.Delete, model.Update: - fileMeta := fileAction.File.GetFileMeta() - fileName := fileMeta.GetName() - - tempFilePath := filepath.Join(fms.tempRollbackDir, fileName) - - // Create parent directories for the target file if they don't exist - if err := os.MkdirAll(filepath.Dir(fileName), dirPerm); err != nil { - return fmt.Errorf("failed to create directories for %s: %w", fileName, err) - } - - moveErr := os.Rename(tempFilePath, fileName) - if moveErr != nil { - return fmt.Errorf("failed to rename file, %s to %s: %w", tempFilePath, fileName, moveErr) - } - - content, readErr := os.ReadFile(fileMeta.GetName()) - if readErr != nil { - return fmt.Errorf("error reading file, unable to generate hash: %s error: %w", - fileMeta.GetName(), readErr) + content, err := fms.restoreFiles(fileAction) + if err != nil { + return err } // currentFilesOnDisk needs to be updated after rollback action is performed - fileMeta.Hash = files.GenerateHash(content) - fms.currentFilesOnDisk[fileMeta.GetName()] = fileAction.File + fileAction.File.FileMeta.Hash = files.GenerateHash(content) + fms.currentFilesOnDisk[fileAction.File.GetFileMeta().GetName()] = fileAction.File case model.Unchanged: fallthrough default: @@ -513,6 +469,59 @@ func (fms *FileManagerService) UpdateManifestFile(ctx context.Context, return fms.fileOperator.WriteManifestFile(ctx, updatedFiles, fms.agentConfig.LibDir, fms.manifestFilePath) } +func (fms *FileManagerService) backupFiles(ctx context.Context) error { + for _, file := range fms.fileActions { + if file.Action == model.Add || file.Action == model.Unchanged { + continue + } + + filePath := file.File.GetFileMeta().GetName() + + if _, err := os.Stat(filePath); os.IsNotExist(err) { + slog.DebugContext(ctx, "Unable to backup file content since file does not exist", + "file", filePath) + + continue + } + + tempFilePath := filepath.Join(fms.tempRollbackDir, filePath) + slog.DebugContext(ctx, "Attempting to backup file content since file exists", "temp_path", tempFilePath) + + moveErr := fms.fileOperator.MoveFile(ctx, filePath, tempFilePath) + + if moveErr != nil { + return moveErr + } + } + + return nil +} + +func (fms *FileManagerService) restoreFiles(fileAction *model.FileCache) ([]byte, error) { + fileMeta := fileAction.File.GetFileMeta() + fileName := fileMeta.GetName() + + tempFilePath := filepath.Join(fms.tempRollbackDir, fileName) + + // Create parent directories for the target file if they don't exist + if err := os.MkdirAll(filepath.Dir(fileName), dirPerm); err != nil { + return nil, fmt.Errorf("failed to create directories for %s: %w", fileName, err) + } + + moveErr := os.Rename(tempFilePath, fileName) + if moveErr != nil { + return nil, fmt.Errorf("failed to rename file, %s to %s: %w", tempFilePath, fileName, moveErr) + } + + content, readErr := os.ReadFile(fileMeta.GetName()) + if readErr != nil { + return nil, fmt.Errorf("error reading file, unable to generate hash: %s error: %w", + fileMeta.GetName(), readErr) + } + + return content, nil +} + func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, map[string]*mpi.File, error) { if _, err := os.Stat(fms.manifestFilePath); err != nil { return nil, nil, err @@ -596,7 +605,8 @@ actionsLoop: continue case model.Add, model.Update: - err := fms.fileServiceOperator.MoveFileFromTempDirectory(ctx, fileAction, tempDir) + fileMeta := fileAction.File.GetFileMeta() + err := fms.fileServiceOperator.renameFile(ctx, fileMeta.GetHash(), fileMeta.GetName(), tempDir) if err != nil { actionError = err diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go index cb3e2a548..58c8b799f 100644 --- a/internal/file/file_plugin.go +++ b/internal/file/file_plugin.go @@ -138,12 +138,10 @@ func (fp *FilePlugin) Subscriptions() []string { } } -func (fp *FilePlugin) CleanUpConfigApply(ctx context.Context, +func (fp *FilePlugin) enableWatchers(ctx context.Context, configContext *model.NginxConfigContext, instanceID string, ) { - fp.fileManagerService.ClearCache() - enableWatcher := &model.EnableWatchers{ ConfigContext: configContext, InstanceID: instanceID, @@ -183,7 +181,8 @@ func (fp *FilePlugin) handleConfigApplyComplete(ctx context.Context, msg *bus.Me } fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) - fp.CleanUpConfigApply(ctx, &model.NginxConfigContext{}, response.GetInstanceId()) + fp.fileManagerService.ClearCache() + fp.enableWatchers(ctx, &model.NginxConfigContext{}, response.GetInstanceId()) } func (fp *FilePlugin) handleReloadSuccess(ctx context.Context, msg *bus.Message) { @@ -196,7 +195,8 @@ func (fp *FilePlugin) handleReloadSuccess(ctx context.Context, msg *bus.Message) return } - fp.CleanUpConfigApply(ctx, successMessage.ConfigContext, successMessage.DataPlaneResponse.GetInstanceId()) + fp.fileManagerService.ClearCache() + fp.enableWatchers(ctx, successMessage.ConfigContext, successMessage.DataPlaneResponse.GetInstanceId()) if successMessage.ConfigContext.Files != nil { slog.DebugContext(ctx, "Changes made during config apply, update files on disk") diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go index 71fef285f..78c9b8fd0 100644 --- a/internal/file/file_service_operator.go +++ b/internal/file/file_service_operator.go @@ -25,7 +25,6 @@ import ( "github.com/nginx/agent/v3/internal/config" internalgrpc "github.com/nginx/agent/v3/internal/grpc" "github.com/nginx/agent/v3/internal/logger" - "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/pkg/files" "github.com/nginx/agent/v3/pkg/id" "google.golang.org/grpc" @@ -277,12 +276,12 @@ func (fso *FileServiceOperator) UpdateFile( return fso.sendUpdateFileStream(ctx, fileToUpdate, fso.agentConfig.Client.Grpc.FileChunkSize) } -func (fso *FileServiceOperator) MoveFileFromTempDirectory( - ctx context.Context, fileAction *model.FileCache, tempDir string, +// renameFile, renames (moves) file from tempDir to new location to update file. +func (fso *FileServiceOperator) renameFile( + ctx context.Context, hash, fileName, dir string, ) error { - fileName := fileAction.File.GetFileMeta().GetName() - slog.DebugContext(ctx, "Updating file", "file", fileName) - tempFilePath := filepath.Join(tempDir, fileName) + slog.DebugContext(ctx, "Renaming file", "file", fileName) + tempFilePath := filepath.Join(dir, fileName) // Create parent directories for the target file if they don't exist if err := os.MkdirAll(filepath.Dir(fileName), dirPerm); err != nil { @@ -294,7 +293,7 @@ func (fso *FileServiceOperator) MoveFileFromTempDirectory( return fmt.Errorf("failed to rename file: %w", moveErr) } - return fso.validateFileHash(fileName, fileAction.File.GetFileMeta().GetHash()) + return fso.validateFileHash(fileName, hash) } func (fso *FileServiceOperator) validateFileHash(filePath, expectedHash string) error { diff --git a/internal/watcher/file/file_watcher_service.go b/internal/watcher/file/file_watcher_service.go index 58418857d..3779d8c99 100644 --- a/internal/watcher/file/file_watcher_service.go +++ b/internal/watcher/file/file_watcher_service.go @@ -98,6 +98,7 @@ func (fws *FileWatcherService) Watch(ctx context.Context, ch chan<- FileUpdateMe } func (fws *FileWatcherService) DisableWatcher(ctx context.Context) { + slog.DebugContext(ctx, "Disabling file watcher") if fws.watcher != nil && fws.watcher.WatchList() != nil { paths := fws.watcher.WatchList() slog.DebugContext(ctx, "Removing watchers", "paths", paths) @@ -112,6 +113,7 @@ func (fws *FileWatcherService) DisableWatcher(ctx context.Context) { } func (fws *FileWatcherService) EnableWatcher(ctx context.Context) { + slog.DebugContext(ctx, "Enabling file watcher") if fws.watcher != nil && fws.watcher.WatchList() != nil && len(fws.watcher.WatchList()) == 0 { fws.addWatchers(ctx) } diff --git a/internal/watcher/file/file_watcher_service_test.go b/internal/watcher/file/file_watcher_service_test.go index 922625d13..5802018ec 100644 --- a/internal/watcher/file/file_watcher_service_test.go +++ b/internal/watcher/file/file_watcher_service_test.go @@ -8,7 +8,6 @@ package file import ( "bytes" "context" - "log/slog" "os" "path" "path/filepath" @@ -266,8 +265,7 @@ func TestFileWatcherService_Watch(t *testing.T) { defer os.Remove(skippableFile.Name()) select { - case file := <-channel: - slog.Info("Skippable file updated", "", file) + case <-channel: t.Fatalf("Expected file to be skipped: %v", skippableFile.Name()) case <-time.After(150 * time.Millisecond): return diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 5769c45c7..834c7283f 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -161,7 +161,7 @@ func (w *Watcher) handleEnableWatchers(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "Watcher plugin received enable watchers message") enableWatchersMessage, ok := msg.Data.(*model.EnableWatchers) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.EnableWatchers", "payload", + slog.ErrorContext(ctx, "Unable to cast message payload to *model.enableWatchers", "payload", msg.Data, "topic", msg.Topic) return From c328a78007cf9989967fb5d816e340065fbea9e3 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Tue, 30 Sep 2025 16:16:47 +0100 Subject: [PATCH 5/7] merge main --- internal/file/file_manager_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index e1d88c3bf..89fa7670b 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -358,7 +358,7 @@ func (fms *FileManagerService) DetermineFileActions( if !fms.agentConfig.IsDirectoryAllowed(fileName) { return nil, fmt.Errorf("error deleting file %s: file not in allowed directories", fileName) } - + if _, err := os.Stat(fileName); os.IsNotExist(err) { slog.DebugContext(ctx, "File already deleted, skipping", "file", fileName) continue From 4db3fb963b5f7156762f8f0e92cbb5ce6b0c32dc Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Tue, 30 Sep 2025 16:42:53 +0100 Subject: [PATCH 6/7] fix test --- internal/file/file_manager_service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/file/file_manager_service_test.go b/internal/file/file_manager_service_test.go index 5023536d9..bdf992b01 100644 --- a/internal/file/file_manager_service_test.go +++ b/internal/file/file_manager_service_test.go @@ -641,7 +641,7 @@ func TestFileManagerService_DetermineFileActions(t *testing.T) { }, { name: "Test 3: File being deleted already doesn't exist", - allowedDirs: []string{tempDir}, + allowedDirs: []string{tempDir, "/unknown"}, modifiedFiles: make(map[string]*model.FileCache), currentFiles: map[string]*mpi.File{ "/unknown/file.conf": { From e69e5779bffae4c7e52346f132b828d13ec2e352 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Wed, 1 Oct 2025 14:24:13 +0100 Subject: [PATCH 7/7] pr feedback --- internal/file/file_manager_service.go | 4 ++-- internal/file/file_service_operator.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index 89fa7670b..859007628 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -71,7 +71,7 @@ type ( fileToUpdate *mpi.File, ) error SetIsConnected(isConnected bool) - renameFile(ctx context.Context, hash, fileName, tempDir string) error + RenameFile(ctx context.Context, hash, fileName, tempDir string) error UpdateClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) } @@ -610,7 +610,7 @@ actionsLoop: continue case model.Add, model.Update: fileMeta := fileAction.File.GetFileMeta() - err := fms.fileServiceOperator.renameFile(ctx, fileMeta.GetHash(), fileMeta.GetName(), tempDir) + err := fms.fileServiceOperator.RenameFile(ctx, fileMeta.GetHash(), fileMeta.GetName(), tempDir) if err != nil { actionError = err diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go index 78c9b8fd0..9994af64c 100644 --- a/internal/file/file_service_operator.go +++ b/internal/file/file_service_operator.go @@ -277,7 +277,7 @@ func (fso *FileServiceOperator) UpdateFile( } // renameFile, renames (moves) file from tempDir to new location to update file. -func (fso *FileServiceOperator) renameFile( +func (fso *FileServiceOperator) RenameFile( ctx context.Context, hash, fileName, dir string, ) error { slog.DebugContext(ctx, "Renaming file", "file", fileName)