From c54e898e733920803c3ea28ff730b61d57aecf98 Mon Sep 17 00:00:00 2001 From: Nikolay Martyanov Date: Sat, 17 Aug 2024 01:55:55 +0200 Subject: [PATCH] pillar/agentlog: Add mutex locks for safe PSI data handling and test synchronization. This commit introduces two mutex locks to ensure thread-safe handling of PSI data and synchronization during testing. The first lock (PsiMutex) is added to protect access to the PSI files, preventing race conditions between the PSI data producer in the tests and the consumer in memprofile. The second lock (psiProducerMutex) ensures that only one PSI data producer runs during tests, avoiding conflicts and ensuring consistent test results. Signed-off-by: Nikolay Martyanov --- pkg/pillar/agentlog/agentlog_test.go | 78 +++++++++++++++------------- pkg/pillar/agentlog/memprofile.go | 7 +++ 2 files changed, 50 insertions(+), 35 deletions(-) diff --git a/pkg/pillar/agentlog/agentlog_test.go b/pkg/pillar/agentlog/agentlog_test.go index 956137a105..843b71c54f 100644 --- a/pkg/pillar/agentlog/agentlog_test.go +++ b/pkg/pillar/agentlog/agentlog_test.go @@ -285,10 +285,16 @@ func matchPsiStats(line string) bool { return re.MatchString(line) } +// Mutex for PSI stats producer - let's avoid running multiple producers at the same time +var psiProducerMutex sync.Mutex + func emulateMemoryPressureStats() (cancel context.CancelFunc, err error) { + // Take the mutex on the producer creation and release it when the producer is done + psiProducerMutex.Lock() // Create a new file for memory pressure stats fakePSIFileHandler, err := os.CreateTemp("", "memory-pressure") if err != nil { + psiProducerMutex.Unlock() return nil, err } @@ -302,12 +308,6 @@ func emulateMemoryPressureStats() (cancel context.CancelFunc, err error) { ctx, cancel := context.WithCancel(context.Background()) go func() { - defer ticker.Stop() - defer fakePSIFileHandler.Close() - defer os.Remove(fakePSIFileName) - defer func() { - agentlog.PressureMemoryFile = originalPressureMemoryFile - }() PsiStats := agentlog.PressureStallInfo{ SomeAvg10: 0.00, SomeAvg60: 0.00, @@ -321,6 +321,7 @@ func emulateMemoryPressureStats() (cancel context.CancelFunc, err error) { for { select { case <-ticker.C: + agentlog.PsiMutex.Lock() PsiStats.SomeAvg10 = generateRandomAvgValue() PsiStats.SomeAvg60 = generateRandomAvgValue() PsiStats.SomeAvg300 = generateRandomAvgValue() @@ -337,7 +338,16 @@ func emulateMemoryPressureStats() (cancel context.CancelFunc, err error) { if err := os.WriteFile(fakePSIFileName, []byte(content), 0644); err != nil { panic(err) } + agentlog.PsiMutex.Unlock() case <-ctx.Done(): + ticker.Stop() + agentlog.PsiMutex.Lock() + fakePSIFileHandler.Close() + os.Remove(fakePSIFileName) + agentlog.PressureMemoryFile = originalPressureMemoryFile + agentlog.PsiMutex.Unlock() + // We destroy this producer, so release the mutex + psiProducerMutex.Unlock() return } } @@ -354,6 +364,8 @@ full avg10=2.00 avg60=0.20 avg300=0.02 total=2000` ) func createFakePSIStatsFile() (cleanupFunc context.CancelFunc, err error) { + // Take the mutex on the producer creation and release it when the producer is done + psiProducerMutex.Lock() // Create a new file for memory pressure stats fakePSIFileHandler, err := os.CreateTemp("", "memory-pressure") if err != nil { @@ -365,23 +377,24 @@ func createFakePSIStatsFile() (cleanupFunc context.CancelFunc, err error) { agentlog.PressureMemoryFile = fakePSIFileName // Write some content to the file + agentlog.PsiMutex.Lock() if err := os.WriteFile(fakePSIFileName, []byte(staticPSIStatsContent), 0644); err != nil { + agentlog.PsiMutex.Unlock() return nil, err } + agentlog.PsiMutex.Unlock() ctx, cancel := context.WithCancel(context.Background()) go func() { - defer func() { - fakePSIFileHandler.Close() - os.Remove(fakePSIFileName) - agentlog.PressureMemoryFile = originalPressureMemoryFile - - }() - select { - case <-ctx.Done(): - return - } + <-ctx.Done() + agentlog.PsiMutex.Lock() + fakePSIFileHandler.Close() + os.Remove(fakePSIFileName) + agentlog.PressureMemoryFile = originalPressureMemoryFile + agentlog.PsiMutex.Unlock() + // We destroy this producer, so release the mutex + psiProducerMutex.Unlock() }() return cancel, nil @@ -413,27 +426,22 @@ func startIntegratedPSICollectorAPI() (cancel context.CancelFunc, err error) { } ctx, cancel := context.WithCancel(context.Background()) go func() { - defer func() { - if started { - http.Post("http://127.0.0.1:6543/stop", "", nil) - // Wait for the server to stop, check if it is still running - for i := 0; i < 100; i++ { - _, err := http.Get("http://127.0.0.1:6543") - if err != nil && strings.Contains(err.Error(), "connection refused") { - started = false - break - } - time.Sleep(100 * time.Millisecond) + <-ctx.Done() + if started { + http.Post("http://127.0.0.1:6543/stop", "", nil) + // Wait for the server to stop, check if it is still running + for i := 0; i < 100; i++ { + _, err := http.Get("http://127.0.0.1:6543") + if err != nil && strings.Contains(err.Error(), "connection refused") { + started = false + break } - if started { - panic("could not stop the server in 10 seconds") - } - psiServerMutex.Unlock() + time.Sleep(100 * time.Millisecond) + } + if started { + panic("could not stop the server in 10 seconds") } - }() - select { - case <-ctx.Done(): - return + psiServerMutex.Unlock() } }() return cancel, nil diff --git a/pkg/pillar/agentlog/memprofile.go b/pkg/pillar/agentlog/memprofile.go index 182461b811..168164a8b3 100644 --- a/pkg/pillar/agentlog/memprofile.go +++ b/pkg/pillar/agentlog/memprofile.go @@ -12,6 +12,7 @@ import ( "os" "runtime" "strings" + "sync" "time" "github.com/lf-edge/eve/pkg/pillar/types" @@ -128,6 +129,10 @@ func GetMemAllocationSites(reportZeroInUse bool) (int, []MemAllocationSite) { return n, sites } +// PsiMutex is the mutex to protect the access to the PSI files. +// We need it to avoid a race condition with the PSI data emulator in tests. +var PsiMutex sync.Mutex + func isPSISupported() bool { _, err := os.Stat(PressureMemoryFile) if err != nil { @@ -138,6 +143,8 @@ func isPSISupported() bool { } func collectMemoryPSI() (*PressureStallInfo, error) { + PsiMutex.Lock() + defer PsiMutex.Unlock() if !isPSISupported() { return nil, fmt.Errorf("PSI is not supported") }