2022-03-02 17:46:16 -04:00

125 lines
3.0 KiB
Go

package goccm
import "sync/atomic"
type (
// ConcurrencyManager Interface
ConcurrencyManager interface {
// Wait until a slot is available for the new goroutine.
Wait()
// Mark a goroutine as finished
Done()
// Close the manager manually
Close()
// Wait for all goroutines are done
WaitAllDone()
// Returns the number of goroutines which are running
RunningCount() int32
}
concurrencyManager struct {
// The number of goroutines that are allowed to run concurrently
max int
// The manager channel to coordinate the number of concurrent goroutines.
managerCh chan interface{}
// The done channel indicates when a single goroutine has finished its job.
doneCh chan bool
// This channel indicates when all goroutines have finished their job.
allDoneCh chan bool
// The close flag allows we know when we can close the manager
closed bool
// The running count allows we know the number of goroutines are running
runningCount int32
}
)
// New concurrencyManager
func New(maxGoRoutines int) *concurrencyManager {
// Initiate the manager object
c := concurrencyManager{
max: maxGoRoutines,
managerCh: make(chan interface{}, maxGoRoutines),
doneCh: make(chan bool),
allDoneCh: make(chan bool),
}
// Fill the manager channel by placeholder values
for i := 0; i < c.max; i++ {
c.managerCh <- nil
}
// Start the controller to collect all the jobs
go c.controller()
return &c
}
// Create the controller to collect all the jobs.
// When a goroutine is finished, we can release a slot for another goroutine.
func (c *concurrencyManager) controller() {
for {
// This will block until a goroutine is finished
<-c.doneCh
// Say that another goroutine can now start
c.managerCh <- nil
// When the closed flag is set,
// we need to close the manager if it doesn't have any running goroutine
if c.closed == true && c.runningCount == 0 {
break
}
}
// Say that all goroutines are finished, we can close the manager
c.allDoneCh <- true
}
// Wait until a slot is available for the new goroutine.
// A goroutine have to start after this function.
func (c *concurrencyManager) Wait() {
// Try to receive from the manager channel. When we have something,
// it means a slot is available and we can start a new goroutine.
// Otherwise, it will block until a slot is available.
<-c.managerCh
// Increase the running count to help we know how many goroutines are running.
atomic.AddInt32(&c.runningCount, 1)
}
// Mark a goroutine as finished
func (c *concurrencyManager) Done() {
// Decrease the number of running count
atomic.AddInt32(&c.runningCount, -1)
c.doneCh <- true
}
// Close the manager manually
func (c *concurrencyManager) Close() {
c.closed = true
}
// Wait for all goroutines are done
func (c *concurrencyManager) WaitAllDone() {
// Close the manager automatic
c.Close()
// This will block until allDoneCh was marked
<-c.allDoneCh
}
// Returns the number of goroutines which are running
func (c *concurrencyManager) RunningCount() int32 {
return c.runningCount
}