Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,29 @@ package main

import (
"fmt"
"goccm"
"github.com/zenthangplus/goccm"
"time"
)

func main() {
// Limit 3 goroutines to run concurrently.
c := goccm.New(3)

for i := 1; i <= 10; i++ {

cm := goccm.New(3)
for i := 0; i < 10; i++ {
// This function have to call before any goroutine
c.Wait()

cm.Wait()
go func(i int) {
fmt.Printf("Job %d is running\n", i)
time.Sleep(2 * time.Second)

// This function have to when a goroutine has finished
// Or you can use `defer c.Done()` at the top of goroutine.
c.Done()
cm.Done()
}(i)
}

// This function have to call to ensure all goroutines have finished
// after close the main program.
c.WaitAllDone()
cm.WaitAllDone()
}
```

Expand All @@ -51,26 +48,23 @@ func main() {
```go
package main

import "goccm"
import "github.com/zenthangplus/goccm"

func main() {
// Create the concurrency manager
// The first argument is the maximum number of goroutines to run concurrently.
c := goccm.New(10)
cm := goccm.New(10)

// Wait until a slot is available for the new goroutine.
c.Wait()
cm.Wait()

// Mark a goroutine as finished
c.Done()
cm.Done()

// Wait for all goroutines are done
c.WaitAllDone()

// Close the manager manually
c.Close()
cm.WaitAllDone()

// Returns the number of goroutines which are running
c.RunningCount()
cm.RunningCount()
}
```
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/zenthangplus/goccm

go 1.15
137 changes: 30 additions & 107 deletions goccm.go
Original file line number Diff line number Diff line change
@@ -1,124 +1,47 @@
package goccm

import "sync/atomic"
// ConcurrencyManager Interface
type ConcurrencyManager interface {
// Wait until a slot is available for the new goroutine.
Wait()

type (
// ConcurrencyManager Interface
ConcurrencyManager interface {
// Wait until a slot is available for the new goroutine.
Wait()
// Mark a goroutine as finished
Done()

// Mark a goroutine as finished
Done()
// Wait for all goroutines are done
WaitAllDone()

// Close the manager manually
Close()

// Wait for all goroutines are done
WaitAllDone()

// Returns the number of goroutines which are running
RunningCount() int32
}

concurrencyManager struct {
// The number of goroutines that are allowed to run concurrently
max int

// The manager channel to coordinate the number of concurrent goroutines.
managerCh chan interface{}

// The done channel indicates when a single goroutine has finished its job.
doneCh chan bool

// This channel indicates when all goroutines have finished their job.
allDoneCh chan bool

// The close flag allows we know when we can close the manager
closed bool

// The running count allows we know the number of goroutines are running
runningCount int32
}
)

// New concurrencyManager
func New(maxGoRoutines int) *concurrencyManager {
// Initiate the manager object
c := concurrencyManager{
max: maxGoRoutines,
managerCh: make(chan interface{}, maxGoRoutines),
doneCh: make(chan bool),
allDoneCh: make(chan bool),
}

// Fill the manager channel by placeholder values
for i := 0; i < c.max; i++ {
c.managerCh <- nil
}

// Start the controller to collect all the jobs
go c.controller()

return &c
// Returns the number of goroutines which are running
RunningCount() int32
}

// Create the controller to collect all the jobs.
// When a goroutine is finished, we can release a slot for another goroutine.
func (c *concurrencyManager) controller() {
for {
// This will block until a goroutine is finished
<-c.doneCh

// Say that another goroutine can now start
c.managerCh <- nil
// Manager coordinates the maximum number of concurrent goroutines.
type concurrencyManager chan struct{}

// When the closed flag is set,
// we need to close the manager if it doesn't have any running goroutine
if c.closed == true && c.runningCount == 0 {
break
}
}

// Say that all goroutines are finished, we can close the manager
c.allDoneCh <- true
}

// Wait until a slot is available for the new goroutine.
// A goroutine have to start after this function.
func (c *concurrencyManager) Wait() {

// Try to receive from the manager channel. When we have something,
// it means a slot is available and we can start a new goroutine.
// Otherwise, it will block until a slot is available.
<-c.managerCh

// Increase the running count to help we know how many goroutines are running.
atomic.AddInt32(&c.runningCount, 1)
// New Manager that handles `maxGoRoutines` number of concurrency.
func New(maxGoRoutines int) *concurrencyManager {
// Create manager channel with maxGoRoutines size
cm := concurrencyManager(make(chan struct{}, maxGoRoutines))
return &cm
}

// Mark a goroutine as finished
func (c *concurrencyManager) Done() {
// Decrease the number of running count
atomic.AddInt32(&c.runningCount, -1)
c.doneCh <- true
// Wait blocks until a slot is allocated from the manager.
func (cm *concurrencyManager) Wait() {
*cm <- struct{}{}
}

// Close the manager manually
func (c *concurrencyManager) Close() {
c.closed = true
// Done returns the routine to the manager.
func (cm *concurrencyManager) Done() {
<-(*cm)
}

// Wait for all goroutines are done
func (c *concurrencyManager) WaitAllDone() {
// Close the manager automatic
c.Close()

// This will block until allDoneCh was marked
<-c.allDoneCh
// Wait for all goroutines to finish.
func (cm *concurrencyManager) WaitAllDone() {
for len(*cm) > 0 {
}
}

// Returns the number of goroutines which are running
func (c *concurrencyManager) RunningCount() int32 {
return c.runningCount
// RunningCount returns the number of running goroutines.
func (cm *concurrencyManager) RunningCount() int {
return len(*cm)
}
71 changes: 32 additions & 39 deletions goccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,52 @@ package goccm

import (
"fmt"
"sync/atomic"
"testing"
"time"
)

func TestExample(t *testing.T) {
c := New(3)
for i := 1; i <= 10; i++ {
c.Wait()
cm := New(3)
for i := 0; i < 10; i++ {
cm.Wait()
go func(i int) {
fmt.Printf("Job %d is running\n", i)
time.Sleep(2 * time.Second)
c.Done()
time.Sleep(200 * time.Millisecond)
cm.Done()
}(i)
}
c.WaitAllDone()
cm.WaitAllDone()
}

func TestManuallyClose(t *testing.T) {
executedJobs := 0
c := New(3)
for i := 1; i <= 1000; i++ {
c.Wait()
func TestConcurrency(t *testing.T) {
var testMaxRunningJobs int
var counter int32
incrementTasks := 1000
maxRunningJobs := 3
cm := New(maxRunningJobs)
for i := 0; i < incrementTasks; i++ {
cm.Wait()
go func() {
executedJobs++
fmt.Printf("Executed jobs %d\n", executedJobs)
time.Sleep(2 * time.Second)
c.Done()
if cm.RunningCount() > testMaxRunningJobs {
testMaxRunningJobs = cm.RunningCount()
}
atomic.AddInt32(&counter, 1)
time.Sleep(5 * time.Millisecond)
cm.Done()
}()
if i == 5 {
c.Close()
break
}
}
c.WaitAllDone()
}

func TestConcurrency(t *testing.T) {
var maxRunningJobs = 3
var testMaxRunningJobs int32
c := New(maxRunningJobs)
for i := 1; i <= 10; i++ {
c.Wait()
go func(i int) {
fmt.Printf("Current running jobs %d\n", c.RunningCount())
if c.RunningCount() > testMaxRunningJobs {
testMaxRunningJobs = c.RunningCount()
}
time.Sleep(2 * time.Second)
c.Done()
}(i)
cm.WaitAllDone()
if testMaxRunningJobs > maxRunningJobs {
t.Errorf("The number of concurrency jobs has exceeded %d. Real result %d.\n", maxRunningJobs, testMaxRunningJobs)
} else {
fmt.Printf("max number of goroutines spawned: %d, expected: %d\n", testMaxRunningJobs, maxRunningJobs)
}
c.WaitAllDone()
if testMaxRunningJobs > int32(maxRunningJobs) {
t.Errorf("The number of concurrency jobs has exceeded %d. Real result %d.", maxRunningJobs, testMaxRunningJobs)
if counter != int32(incrementTasks) {
t.Errorf("counter value expected %d. Real result %d.", incrementTasks, counter)
} else {
fmt.Printf("tasks executed: %d, expected: %d\n", counter, incrementTasks)
}
}

// todo: add benchmark tests