Skip to content

Commit

Permalink
Removed non-concurrent case from MapReduce. It didn't align with nm=n…
Browse files Browse the repository at this point in the history
…r=1 semantics. In that case correct behavior is to have one mapper and one reducer working concurrently
  • Loading branch information
destel committed Jun 12, 2024
1 parent 90f22e3 commit 58b0b52
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 24 deletions.
14 changes: 6 additions & 8 deletions internal/core/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func nonConcurrentReduce[A any](in <-chan A, f func(A, A) A) (A, bool) {
// Reduce reduces the input channel into a single value using the provided function,
// using n goroutines for concurrency
func Reduce[A any](in <-chan A, n int, f func(A, A) A) (A, bool) {
if in == nil {
<-in
}

// Phase 0: Optimized non-concurrent case
if n == 1 {
return nonConcurrentReduce(in, f)
Expand Down Expand Up @@ -83,14 +87,8 @@ func reduceIntoMap[K comparable, V any](m map[K]V, k K, v V, f func(V, V) V) {
// If there are multiple values for the same key, they are reduced into a single value using the reducer function and nr goroutines.
// The result is a map where each key is associated with a single value.
func MapReduce[A any, K comparable, V any](in <-chan A, nm int, mapper func(A) (K, V), nr int, reducer func(V, V) V) map[K]V {
// Phase 0: Optimized non-concurrent case
if nm == 1 && nr == 1 {
res := make(map[K]V)
for a := range in {
k, v := mapper(a)
reduceIntoMap(res, k, v, reducer)
}
return res
if in == nil {
<-in
}

// Phase 1: Map
Expand Down
32 changes: 16 additions & 16 deletions internal/th/assertions.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@ func ExpectHang(t *testing.T, waitFor time.Duration, f func()) {
}
}

func ExpectNotHang(t *testing.T, waitFor time.Duration, f func()) {
t.Helper()
done := make(chan struct{})

go func() {
defer close(done)
f()
}()

select {
case <-done:
case <-time.After(waitFor):
t.Errorf("test hanged")
}
}

func ExpectError(t *testing.T, err error, message string) {
t.Helper()
if err == nil {
Expand All @@ -167,22 +183,6 @@ func ExpectNoError(t *testing.T, err error) {
}
}

func ExpectNotHang(t *testing.T, waitFor time.Duration, f func()) {
t.Helper()
done := make(chan struct{})

go func() {
defer close(done)
f()
}()

select {
case <-done:
case <-time.After(waitFor):
t.Errorf("test hanged")
}
}

func ExpectNotPanic(t *testing.T, f func()) {
t.Helper()
defer func() {
Expand Down

0 comments on commit 58b0b52

Please sign in to comment.