Skip to content

Commit

Permalink
Channel is not drained by ForEach (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
destel authored Mar 19, 2024
1 parent df77820 commit f14a30b
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 44 deletions.
1 change: 1 addition & 0 deletions chans/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func ForEach[A any](in <-chan A, n int, f func(A) bool) {
if n == 1 {
for a := range in {
if !f(a) {
defer DrainNB(in)
break
}
}
Expand Down
17 changes: 10 additions & 7 deletions chans/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,24 +194,27 @@ func TestForEach(t *testing.T) {

t.Run(th.Name("early exit", n), func(t *testing.T) {
th.ExpectNotHang(t, 10*time.Second, func() {
done := make(chan struct{})
defer close(done)
cnt := int64(0)

sum := int64(0)

in := th.InfiniteChan(done)
in := th.FromRange(0, 1000)

ForEach(in, n, func(x int) bool {
if x == 100 {
return false
}
atomic.AddInt64(&sum, int64(x))
atomic.AddInt64(&cnt, 1)
return true
})

if sum < 99*100/2 {
if cnt < 100 {
t.Errorf("expected at least 100 iterations to complete")
}
if cnt > 150 {
t.Errorf("early exit did not happen")
}

time.Sleep(1 * time.Second)
th.ExpectDrainedChan(t, in)
})
})

Expand Down
38 changes: 21 additions & 17 deletions echans/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,48 +335,52 @@ func TestForEach(t *testing.T) {

t.Run(th.Name("error in input", n), func(t *testing.T) {
th.ExpectNotHang(t, 10*time.Second, func() {
done := make(chan struct{})
defer close(done)

in := Wrap(th.InfiniteChan(done), nil)
in := Wrap(th.FromRange(0, 1000), nil)
in = replaceWithError(in, 100, fmt.Errorf("err100"))

sum := int64(0)
cnt := int64(0)
err := ForEach(in, n, func(x int) error {
atomic.AddInt64(&sum, int64(x))
atomic.AddInt64(&cnt, 1)
return nil
})

th.ExpectError(t, err, "err100")

fmt.Println(sum, 99*100/2)

if sum < 99*100/2 {
if cnt < 100 {
t.Errorf("expected at least 100 iterations to complete")
}
if cnt > 150 {
t.Errorf("early exit did not happen")
}

time.Sleep(1 * time.Second)
th.ExpectDrainedChan(t, in)
})
})

t.Run(th.Name("error in func", n), func(t *testing.T) {
th.ExpectNotHang(t, 10*time.Second, func() {
done := make(chan struct{})
defer close(done)

in := Wrap(th.InfiniteChan(done), nil)
in := Wrap(th.FromRange(0, 1000), nil)

sum := int64(0)
cnt := int64(0)
err := ForEach(in, n, func(x int) error {
if x == 100 {
return fmt.Errorf("err100")
}
atomic.AddInt64(&sum, int64(x))
atomic.AddInt64(&cnt, 1)
return nil
})

th.ExpectError(t, err, "err100")
if sum < 99*100/2 {
if cnt < 100 {
t.Errorf("expected at least 100 iterations to complete")
}
if cnt > 150 {
t.Errorf("early exit did not happen")
}

// wait until it drained
time.Sleep(1 * time.Second)
th.ExpectDrainedChan(t, in)
})
})

Expand Down
2 changes: 1 addition & 1 deletion echans/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func TestFromToSlice(t *testing.T) {
th.ExpectError(t, err, "err15")

time.Sleep(1 * time.Second)
th.ExpectClosedChan(t, in)
th.ExpectDrainedChan(t, in)
})
}
4 changes: 2 additions & 2 deletions internal/common/loops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestBreakable(t *testing.T) {
}

th.ExpectValue(t, maxSeen, 9999)
th.ExpectClosedChan(t, in)
th.ExpectDrainedChan(t, in)
})

t.Run("early exit", func(t *testing.T) {
Expand All @@ -120,7 +120,7 @@ func TestBreakable(t *testing.T) {

}

th.ExpectClosedChan(t, in)
th.ExpectDrainedChan(t, in)
})

}
2 changes: 1 addition & 1 deletion internal/th/assertions.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func ExpectUnsorted[T ordered](t *testing.T, arr []T) {
}
}

func ExpectClosedChan[A any](t *testing.T, ch <-chan A) {
func ExpectDrainedChan[A any](t *testing.T, ch <-chan A) {
t.Helper()
select {
case x, ok := <-ch:
Expand Down
16 changes: 0 additions & 16 deletions internal/th/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,6 @@ func FromRange(start, end int) <-chan int {
return ch
}

// InfiniteChan generates infinite sequence of natural numbers. It stops when stop channel is closed.
func InfiniteChan(stop <-chan struct{}) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; ; i++ {
select {
case <-stop:
return
case ch <- i:
}
}
}()
return ch
}

func Send[T any](ch chan<- T, items ...T) {
for _, item := range items {
ch <- item
Expand Down

0 comments on commit f14a30b

Please sign in to comment.