Skip to content

Commit 4b44737

Browse files
committed
fix: change cache key to integrate pid Creation Time
1 parent b0ee366 commit 4b44737

File tree

2 files changed

+50
-56
lines changed

2 files changed

+50
-56
lines changed

extjvm/java_process/java_process_watcher.go

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package java_process
22

33
import (
44
"context"
5+
"fmt"
56
"github.com/procyon-projects/chrono"
67
"github.com/rs/zerolog/log"
78
"github.com/shirou/gopsutil/process"
@@ -23,12 +24,11 @@ type DiscoveryWork struct {
2324
}
2425

2526
var (
26-
discoveredPidsMutex sync.Mutex
27-
discoveredPids []int32
28-
ignoredPidsMutex sync.Mutex
29-
ignoredPids []int32
30-
listeners []Listener
31-
RunningStates = []string{"R", "W", "S", "I", "L"} // Running, Waiting, Sleeping
27+
discoveredPids sync.Map
28+
ignoredPidsMutex sync.Mutex
29+
ignoredPids []string
30+
listeners []Listener
31+
RunningStates = []string{"R", "W", "S", "I", "L"} // Running, Waiting, Sleeping
3232

3333
discoveryJobs = make(chan DiscoveryWork)
3434
)
@@ -56,7 +56,7 @@ func Start() {
5656
_, err = taskScheduler.ScheduleWithFixedDelay(func(ctx context.Context) {
5757
// Clear ignored pids because they might be reused
5858
ignoredPidsMutex.Lock()
59-
ignoredPids = []int32{}
59+
ignoredPids = []string{}
6060
ignoredPidsMutex.Unlock()
6161
}, 1*time.Hour)
6262

@@ -88,16 +88,32 @@ func updatePids() {
8888
return
8989
}
9090
for _, p := range processes {
91-
if utils.Contains(ignoredPids, p.Pid) {
91+
createTime, err := p.CreateTime()
92+
if err != nil {
93+
log.Err(err).Msgf("Failed to get process creation time. Pid: %d. Error: %s", p.Pid, err.Error())
94+
return
95+
}
96+
cacheId := createCacheId(p.Pid, createTime)
97+
if utils.ContainsString(ignoredPids, cacheId) {
9298
log.Trace().Msgf("Process %d is ignored", p.Pid)
9399
continue
94100
}
95101
discover(p, initialRetries)
96102
}
97103
}
98104

105+
func createCacheId(pid int32, createTime int64) string {
106+
return fmt.Sprintf("%d-%d", pid, createTime)
107+
}
108+
99109
func discoverProcessJVM(job DiscoveryWork) {
100-
if !utils.Contains(discoveredPids, job.p.Pid) {
110+
createTime, err := job.p.CreateTime()
111+
if err != nil {
112+
log.Err(err).Msgf("Failed to get process creation time. Pid: %d. Error: %s", job.p.Pid, err.Error())
113+
return
114+
}
115+
cacheId := createCacheId(job.p.Pid, createTime)
116+
if _, ok := discoveredPids.Load(cacheId); !ok {
101117
if IsRunning(job.p) {
102118
if !checkProcessPathAvailable(job.p) {
103119
log.Debug().Msgf("Process %d is running but path is not available yet.", job.p.Pid)
@@ -129,25 +145,34 @@ func discoverProcessJVM(job DiscoveryWork) {
129145
}
130146

131147
func addPidToDiscoveredPids(job DiscoveryWork) {
132-
discoveredPidsMutex.Lock()
133-
discoveredPids = append(discoveredPids, job.p.Pid)
134-
discoveredPidsMutex.Unlock()
148+
createTime, err := job.p.CreateTime()
149+
if err != nil {
150+
log.Err(err).Msgf("Failed to get process creation time. Pid: %d. Error: %s", job.p.Pid, err.Error())
151+
return
152+
}
153+
cacheId := createCacheId(job.p.Pid, createTime)
154+
discoveredPids.Store(cacheId, job.p.Pid)
135155
}
136156

137157
func RemovePidFromDiscoveredPids(pid int32) {
138-
discoveredPidsMutex.Lock()
139-
for i, p := range discoveredPids {
140-
if p == pid {
141-
discoveredPids = append(discoveredPids[:i], discoveredPids[i+1:]...)
142-
break
158+
discoveredPids.Range(func(key, value interface{}) bool {
159+
if value.(int32) == pid {
160+
discoveredPids.Delete(key)
161+
return false
143162
}
144-
}
145-
discoveredPidsMutex.Unlock()
163+
return true
164+
})
146165
}
147166

148167
func addPidToIgnoredPids(job DiscoveryWork) {
168+
createTime, err := job.p.CreateTime()
169+
if err != nil {
170+
log.Err(err).Msgf("Failed to get process creation time. Pid: %d. Error: %s", job.p.Pid, err.Error())
171+
return
172+
}
173+
cacheId := createCacheId(job.p.Pid, createTime)
149174
ignoredPidsMutex.Lock()
150-
ignoredPids = append(ignoredPids, job.p.Pid)
175+
ignoredPids = append(ignoredPids, cacheId)
151176
ignoredPidsMutex.Unlock()
152177
}
153178

@@ -166,16 +191,15 @@ func notifyListenersForNewProcess(p *process.Process) bool {
166191

167192
func AddListener(listener Listener) {
168193
listeners = append(listeners, listener)
169-
for _, pid := range discoveredPids {
170-
p, err := process.NewProcess(pid)
194+
discoveredPids.Range(func(key, pid interface{}) bool {
195+
p, err := process.NewProcess(pid.(int32))
171196
if err != nil {
172197
log.Warn().Err(err).Msg("Error in listener for notifyListenersForNewProcess")
173-
continue
174-
}
175-
if isJava(p) {
198+
} else if isJava(p) {
176199
listener.NewJavaProcess(p)
177200
}
178-
}
201+
return true
202+
})
179203
}
180204

181205
func RemoveListener(listener Listener) {

extjvm/java_process/java_process_watcher_test.go

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)