125 lines
3.0 KiB
Go
125 lines
3.0 KiB
Go
|
package goccm
|
||
|
|
||
|
import "sync/atomic"
|
||
|
|
||
|
type (
|
||
|
// ConcurrencyManager Interface
|
||
|
ConcurrencyManager interface {
|
||
|
// Wait until a slot is available for the new goroutine.
|
||
|
Wait()
|
||
|
|
||
|
// Mark a goroutine as finished
|
||
|
Done()
|
||
|
|
||
|
// Close the manager manually
|
||
|
Close()
|
||
|
|
||
|
// Wait for all goroutines are done
|
||
|
WaitAllDone()
|
||
|
|
||
|
// Returns the number of goroutines which are running
|
||
|
RunningCount() int32
|
||
|
}
|
||
|
|
||
|
concurrencyManager struct {
|
||
|
// The number of goroutines that are allowed to run concurrently
|
||
|
max int
|
||
|
|
||
|
// The manager channel to coordinate the number of concurrent goroutines.
|
||
|
managerCh chan interface{}
|
||
|
|
||
|
// The done channel indicates when a single goroutine has finished its job.
|
||
|
doneCh chan bool
|
||
|
|
||
|
// This channel indicates when all goroutines have finished their job.
|
||
|
allDoneCh chan bool
|
||
|
|
||
|
// The close flag allows we know when we can close the manager
|
||
|
closed bool
|
||
|
|
||
|
// The running count allows we know the number of goroutines are running
|
||
|
runningCount int32
|
||
|
}
|
||
|
)
|
||
|
|
||
|
// New concurrencyManager
|
||
|
func New(maxGoRoutines int) *concurrencyManager {
|
||
|
// Initiate the manager object
|
||
|
c := concurrencyManager{
|
||
|
max: maxGoRoutines,
|
||
|
managerCh: make(chan interface{}, maxGoRoutines),
|
||
|
doneCh: make(chan bool),
|
||
|
allDoneCh: make(chan bool),
|
||
|
}
|
||
|
|
||
|
// Fill the manager channel by placeholder values
|
||
|
for i := 0; i < c.max; i++ {
|
||
|
c.managerCh <- nil
|
||
|
}
|
||
|
|
||
|
// Start the controller to collect all the jobs
|
||
|
go c.controller()
|
||
|
|
||
|
return &c
|
||
|
}
|
||
|
|
||
|
// Create the controller to collect all the jobs.
|
||
|
// When a goroutine is finished, we can release a slot for another goroutine.
|
||
|
func (c *concurrencyManager) controller() {
|
||
|
for {
|
||
|
// This will block until a goroutine is finished
|
||
|
<-c.doneCh
|
||
|
|
||
|
// Say that another goroutine can now start
|
||
|
c.managerCh <- nil
|
||
|
|
||
|
// When the closed flag is set,
|
||
|
// we need to close the manager if it doesn't have any running goroutine
|
||
|
if c.closed == true && c.runningCount == 0 {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Say that all goroutines are finished, we can close the manager
|
||
|
c.allDoneCh <- true
|
||
|
}
|
||
|
|
||
|
// Wait until a slot is available for the new goroutine.
|
||
|
// A goroutine have to start after this function.
|
||
|
func (c *concurrencyManager) Wait() {
|
||
|
|
||
|
// Try to receive from the manager channel. When we have something,
|
||
|
// it means a slot is available and we can start a new goroutine.
|
||
|
// Otherwise, it will block until a slot is available.
|
||
|
<-c.managerCh
|
||
|
|
||
|
// Increase the running count to help we know how many goroutines are running.
|
||
|
atomic.AddInt32(&c.runningCount, 1)
|
||
|
}
|
||
|
|
||
|
// Mark a goroutine as finished
|
||
|
func (c *concurrencyManager) Done() {
|
||
|
// Decrease the number of running count
|
||
|
atomic.AddInt32(&c.runningCount, -1)
|
||
|
c.doneCh <- true
|
||
|
}
|
||
|
|
||
|
// Close the manager manually
|
||
|
func (c *concurrencyManager) Close() {
|
||
|
c.closed = true
|
||
|
}
|
||
|
|
||
|
// Wait for all goroutines are done
|
||
|
func (c *concurrencyManager) WaitAllDone() {
|
||
|
// Close the manager automatic
|
||
|
c.Close()
|
||
|
|
||
|
// This will block until allDoneCh was marked
|
||
|
<-c.allDoneCh
|
||
|
}
|
||
|
|
||
|
// Returns the number of goroutines which are running
|
||
|
func (c *concurrencyManager) RunningCount() int32 {
|
||
|
return c.runningCount
|
||
|
}
|