Skip to content

Commit

Permalink
Make it safe to call desktop runner Interrupt multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
RebeccaMahany committed Sep 7, 2023
1 parent 640846b commit 0c8a6da
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 5 deletions.
31 changes: 27 additions & 4 deletions ee/desktop/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ type DesktopUsersProcessesRunner struct {
menuRefreshInterval time.Duration
interrupt chan struct{}
// uidProcs is a map of uid to desktop process
uidProcs map[string]processRecord
uidProcs map[string]processRecord
uidProcsMutex sync.Mutex
// procsWg is a WaitGroup to wait for all desktop processes to finish during an interrupt
procsWg *sync.WaitGroup
// interruptTimeout how long to wait for desktop proccesses to finish on interrupt
Expand Down Expand Up @@ -150,8 +151,9 @@ type processRecord struct {
func New(k types.Knapsack, opts ...desktopUsersProcessesRunnerOption) (*DesktopUsersProcessesRunner, error) {
runner := &DesktopUsersProcessesRunner{
logger: log.NewNopLogger(),
interrupt: make(chan struct{}),
interrupt: make(chan struct{}, 1),
uidProcs: make(map[string]processRecord),
uidProcsMutex: sync.Mutex{},
updateInterval: k.DesktopUpdateInterval(),
menuRefreshInterval: k.DesktopMenuRefreshInterval(),
procsWg: &sync.WaitGroup{},
Expand Down Expand Up @@ -222,8 +224,15 @@ func (r *DesktopUsersProcessesRunner) Execute() error {
// Interrupt stops creating launcher desktop processes and kills any existing ones.
// It also signals the execute loop to exit, so new desktop processes cease to spawn.
func (r *DesktopUsersProcessesRunner) Interrupt(_ error) {
// Tell the execute loop to stop checking, and exit
r.interrupt <- struct{}{}
// Non-blocking send to interrupt channel
select {
case r.interrupt <- struct{}{}:
// First time we've received an interrupt, so we've notified r.Execute.
default:
// Execute loop is no longer running, so there's nothing to interrupt
}

time.Sleep(3 * time.Second)

// Kill any desktop processes that may exist
r.killDesktopProcesses()
Expand All @@ -241,6 +250,9 @@ func (r *DesktopUsersProcessesRunner) Interrupt(_ error) {

// killDesktopProcesses kills any existing desktop processes
func (r *DesktopUsersProcessesRunner) killDesktopProcesses() {
r.uidProcsMutex.Lock()
defer r.uidProcsMutex.Unlock()

wgDone := make(chan struct{})
go func() {
defer close(wgDone)
Expand Down Expand Up @@ -297,6 +309,9 @@ func (r *DesktopUsersProcessesRunner) killDesktopProcesses() {
}

func (r *DesktopUsersProcessesRunner) SendNotification(n notify.Notification) error {
r.uidProcsMutex.Lock()
defer r.uidProcsMutex.Unlock()

if len(r.uidProcs) == 0 {
return errors.New("cannot send notification, no child desktop processes")
}
Expand Down Expand Up @@ -380,6 +395,8 @@ func (r *DesktopUsersProcessesRunner) refreshMenu() {
}

// Tell any running desktop user processes that they should refresh the latest menu data
r.uidProcsMutex.Lock()
defer r.uidProcsMutex.Unlock()
for uid, proc := range r.uidProcs {
client := client.New(r.userServerAuthToken, proc.socketPath)
if err := client.Refresh(); err != nil {
Expand Down Expand Up @@ -531,6 +548,9 @@ func (r *DesktopUsersProcessesRunner) runConsoleUserDesktop() error {

// addProcessTrackingRecordForUser adds process information to the internal tracking state
func (r *DesktopUsersProcessesRunner) addProcessTrackingRecordForUser(uid string, socketPath string, osProcess *os.Process) error {
r.uidProcsMutex.Lock()
defer r.uidProcsMutex.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()

Expand Down Expand Up @@ -592,6 +612,9 @@ func (r *DesktopUsersProcessesRunner) determineExecutablePath() (string, error)
}

func (r *DesktopUsersProcessesRunner) userHasDesktopProcess(uid string) bool {
r.uidProcsMutex.Lock()
defer r.uidProcsMutex.Unlock()

// have no record of process
proc, ok := r.uidProcs[uid]
if !ok {
Expand Down
29 changes: 28 additions & 1 deletion ee/desktop/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestDesktopUserProcessRunner_Execute(t *testing.T) {
assert.NoError(t, r.Execute())
}()

// let is run a few interval
// let it run a few intervals
time.Sleep(r.updateInterval * 3)
r.Interrupt(nil)

Expand Down Expand Up @@ -185,6 +185,33 @@ func TestDesktopUserProcessRunner_Execute(t *testing.T) {
p.Process.Wait()
}
})

// Confirm we can call Interrupt multiple times without blocking
interruptComplete := make(chan struct{})
expectedInterrupts := 3
for i := 0; i < expectedInterrupts; i += 1 {
go func() {
r.Interrupt(nil)
interruptComplete <- struct{}{}
}()
}

receivedInterrupts := 0
for {
if receivedInterrupts >= expectedInterrupts {
break
}

select {
case <-interruptComplete:
receivedInterrupts += 1
continue
case <-time.After(5 * time.Second):
t.Error("could not call interrupt multiple times and return within 5 seconds")
}
}

require.Equal(t, expectedInterrupts, receivedInterrupts)
})
}
}
Expand Down

0 comments on commit 0c8a6da

Please sign in to comment.