Skip to content

Commit

Permalink
fix: make actor system GetOrElseTimeout actually timeout. (#4499)
Browse files Browse the repository at this point in the history
(cherry picked from commit 28c7b89)
  • Loading branch information
ioga authored and mackrorysd committed Jul 8, 2022
1 parent 250aca7 commit 6dba678
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
21 changes: 14 additions & 7 deletions master/pkg/actor/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,19 @@ func (r *response) Source() *Ref {
return r.source
}

func (r *response) get() Message {
func (r *response) get(cancel <-chan bool) Message {
if r.fetched {
return r.result
}
r.lock.Lock()
defer r.lock.Unlock()
r.fetched = true
r.result = <-r.future
return r.result
select {
case r.result = <-r.future:
return r.result
case <-cancel:
return nil
}
}

func (r *response) Get() Message {
Expand All @@ -76,20 +80,23 @@ func (r *response) GetOrElse(defaultValue Message) Message {
if r.Empty() {
return defaultValue
}
return r.get()
return r.get(nil)
}

func (r *response) GetOrElseTimeout(defaultValue Message, timeout time.Duration) (Message, bool) {
future := make(chan Message, 1)
cancel := make(chan bool, 1)

go func() {
future <- r.get()
future <- r.get(cancel)
}()
t := time.NewTimer(timeout)
defer t.Stop()
select {
case result := <-future:
return result, true
case <-t.C:
cancel <- true
r.lock.Lock()
defer r.lock.Unlock()
r.fetched = true
Expand All @@ -99,11 +106,11 @@ func (r *response) GetOrElseTimeout(defaultValue Message, timeout time.Duration)
}

func (r *response) Empty() bool {
return r.get() == errNoResponse
return r.get(nil) == errNoResponse
}

func (r *response) Error() error {
err, ok := r.get().(error)
err, ok := r.get(nil).(error)
if r.Empty() || !ok {
return nil
}
Expand Down
23 changes: 22 additions & 1 deletion master/pkg/actor/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,35 @@ import (

func TestResponseTimeout(t *testing.T) {
system := NewSystem(t.Name())
sleepDuration := 1 * time.Second
ref, _ := system.ActorOf(Addr("test"), ActorFunc(func(context *Context) error {
if context.ExpectingResponse() {
time.Sleep(1 * time.Second)
time.Sleep(sleepDuration)
context.Respond(false)
}
return nil
}))
start := time.Now()
result, ok := system.Ask(ref, "").GetOrElseTimeout(true, 1*time.Millisecond)
duration := time.Now().Sub(start)
assert.Assert(t, duration < sleepDuration, "test duration: %d ms", duration.Milliseconds())
assert.Assert(t, result.(bool))
assert.Assert(t, !ok)
}

func TestResponseTimeoutOk(t *testing.T) {
system := NewSystem(t.Name())
ref, _ := system.ActorOf(Addr("test"), ActorFunc(func(context *Context) error {
if context.ExpectingResponse() {
context.Respond(false)
}
return nil
}))
start := time.Now()
timeoutDuration := 1 * time.Second
result, ok := system.Ask(ref, "").GetOrElseTimeout(true, timeoutDuration)
duration := time.Now().Sub(start)
assert.Assert(t, duration < timeoutDuration, "test duration: %d ms", duration.Milliseconds())
assert.Assert(t, result.(bool) == false)
assert.Assert(t, ok)
}

0 comments on commit 6dba678

Please sign in to comment.