272 lines
6.0 KiB
Go
272 lines
6.0 KiB
Go
|
package goprocess
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
// process implements Process
|
||
|
type process struct {
|
||
|
children map[*processLink]struct{} // process to close with us
|
||
|
waitfors map[*processLink]struct{} // process to only wait for
|
||
|
waiters []*processLink // processes that wait for us. for gc.
|
||
|
|
||
|
teardown TeardownFunc // called to run the teardown logic.
|
||
|
waiting chan struct{} // closed when CloseAfterChildrenClosed is called.
|
||
|
closing chan struct{} // closed once close starts.
|
||
|
closed chan struct{} // closed once close is done.
|
||
|
closeErr error // error to return to clients of Close()
|
||
|
|
||
|
sync.Mutex
|
||
|
}
|
||
|
|
||
|
// newProcess constructs and returns a Process.
|
||
|
// It will call tf TeardownFunc exactly once:
|
||
|
// **after** all children have fully Closed,
|
||
|
// **after** entering <-Closing(), and
|
||
|
// **before** <-Closed().
|
||
|
func newProcess(tf TeardownFunc) *process {
|
||
|
return &process{
|
||
|
teardown: tf,
|
||
|
closed: make(chan struct{}),
|
||
|
closing: make(chan struct{}),
|
||
|
waitfors: make(map[*processLink]struct{}),
|
||
|
children: make(map[*processLink]struct{}),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (p *process) WaitFor(q Process) {
|
||
|
if q == nil {
|
||
|
panic("waiting for nil process")
|
||
|
}
|
||
|
|
||
|
p.Lock()
|
||
|
|
||
|
select {
|
||
|
case <-p.Closed():
|
||
|
panic("Process cannot wait after being closed")
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
pl := newProcessLink(p, q)
|
||
|
p.waitfors[pl] = struct{}{}
|
||
|
p.Unlock()
|
||
|
go pl.AddToChild()
|
||
|
}
|
||
|
|
||
|
func (p *process) AddChildNoWait(child Process) {
|
||
|
if child == nil {
|
||
|
panic("adding nil child process")
|
||
|
}
|
||
|
|
||
|
p.Lock()
|
||
|
|
||
|
select {
|
||
|
case <-p.Closed():
|
||
|
panic("Process cannot add children after being closed")
|
||
|
case <-p.Closing():
|
||
|
go child.Close()
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
pl := newProcessLink(p, child)
|
||
|
p.children[pl] = struct{}{}
|
||
|
p.Unlock()
|
||
|
go pl.AddToChild()
|
||
|
}
|
||
|
|
||
|
func (p *process) AddChild(child Process) {
|
||
|
if child == nil {
|
||
|
panic("adding nil child process")
|
||
|
}
|
||
|
|
||
|
p.Lock()
|
||
|
|
||
|
select {
|
||
|
case <-p.Closed():
|
||
|
panic("Process cannot add children after being closed")
|
||
|
case <-p.Closing():
|
||
|
go child.Close()
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
pl := newProcessLink(p, child)
|
||
|
if p.waitfors != nil { // if p.waitfors hasn't been set nil
|
||
|
p.waitfors[pl] = struct{}{}
|
||
|
}
|
||
|
if p.children != nil { // if p.children hasn't been set nil
|
||
|
p.children[pl] = struct{}{}
|
||
|
}
|
||
|
p.Unlock()
|
||
|
go pl.AddToChild()
|
||
|
}
|
||
|
|
||
|
func (p *process) Go(f ProcessFunc) Process {
|
||
|
child := newProcess(nil)
|
||
|
waitFor := newProcess(nil)
|
||
|
child.WaitFor(waitFor) // prevent child from closing
|
||
|
|
||
|
// add child last, to prevent a closing parent from
|
||
|
// closing all of them prematurely, before running the func.
|
||
|
p.AddChild(child)
|
||
|
go func() {
|
||
|
f(child)
|
||
|
waitFor.Close() // allow child to close.
|
||
|
child.CloseAfterChildren() // close to tear down.
|
||
|
}()
|
||
|
return child
|
||
|
}
|
||
|
|
||
|
// SetTeardown to assign a teardown function
|
||
|
func (p *process) SetTeardown(tf TeardownFunc) {
|
||
|
if tf == nil {
|
||
|
panic("cannot set nil TeardownFunc")
|
||
|
}
|
||
|
|
||
|
p.Lock()
|
||
|
if p.teardown != nil {
|
||
|
panic("cannot SetTeardown twice")
|
||
|
}
|
||
|
|
||
|
p.teardown = tf
|
||
|
select {
|
||
|
case <-p.Closed():
|
||
|
p.closeErr = tf()
|
||
|
default:
|
||
|
}
|
||
|
p.Unlock()
|
||
|
}
|
||
|
|
||
|
// Close is the external close function.
|
||
|
// it's a wrapper around internalClose that waits on Closed()
|
||
|
func (p *process) Close() error {
|
||
|
p.Lock()
|
||
|
|
||
|
// if already closing, or closed, get out. (but wait!)
|
||
|
select {
|
||
|
case <-p.Closing():
|
||
|
p.Unlock()
|
||
|
<-p.Closed()
|
||
|
return p.closeErr
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
p.doClose()
|
||
|
p.Unlock()
|
||
|
return p.closeErr
|
||
|
}
|
||
|
|
||
|
func (p *process) Closing() <-chan struct{} {
|
||
|
return p.closing
|
||
|
}
|
||
|
|
||
|
func (p *process) Closed() <-chan struct{} {
|
||
|
return p.closed
|
||
|
}
|
||
|
|
||
|
func (p *process) Err() error {
|
||
|
<-p.Closed()
|
||
|
return p.closeErr
|
||
|
}
|
||
|
|
||
|
// the _actual_ close process.
|
||
|
func (p *process) doClose() {
|
||
|
// this function is only be called once (protected by p.Lock()).
|
||
|
// and it will panic (on closing channels) otherwise.
|
||
|
|
||
|
close(p.closing) // signal that we're shutting down (Closing)
|
||
|
|
||
|
for len(p.children) > 0 || len(p.waitfors) > 0 {
|
||
|
for plc, _ := range p.children {
|
||
|
child := plc.Child()
|
||
|
if child != nil { // check because child may already have been removed.
|
||
|
go child.Close() // force all children to shut down
|
||
|
}
|
||
|
plc.ParentClear()
|
||
|
}
|
||
|
p.children = nil // clear them. release memory.
|
||
|
|
||
|
// we must be careful not to iterate over waitfors directly, as it may
|
||
|
// change under our feet.
|
||
|
wf := p.waitfors
|
||
|
p.waitfors = nil // clear them. release memory.
|
||
|
for w, _ := range wf {
|
||
|
// Here, we wait UNLOCKED, so that waitfors who are in the middle of
|
||
|
// adding a child to us can finish. we will immediately close the child.
|
||
|
p.Unlock()
|
||
|
<-w.ChildClosed() // wait till all waitfors are fully closed (before teardown)
|
||
|
p.Lock()
|
||
|
w.ParentClear()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if p.teardown != nil {
|
||
|
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
|
||
|
}
|
||
|
close(p.closed) // signal that we're shut down (Closed)
|
||
|
|
||
|
// go remove all the parents from the process links. optimization.
|
||
|
go func(waiters []*processLink) {
|
||
|
for _, pl := range waiters {
|
||
|
pl.ClearChild()
|
||
|
pr, ok := pl.Parent().(*process)
|
||
|
if !ok {
|
||
|
// parent has already been called to close
|
||
|
continue
|
||
|
}
|
||
|
pr.Lock()
|
||
|
delete(pr.waitfors, pl)
|
||
|
delete(pr.children, pl)
|
||
|
pr.Unlock()
|
||
|
}
|
||
|
}(p.waiters) // pass in so
|
||
|
p.waiters = nil // clear them. release memory.
|
||
|
}
|
||
|
|
||
|
// We will only wait on the children we have now.
|
||
|
// We will not wait on children added subsequently.
|
||
|
// this may change in the future.
|
||
|
func (p *process) CloseAfterChildren() error {
|
||
|
p.Lock()
|
||
|
select {
|
||
|
case <-p.Closed():
|
||
|
p.Unlock()
|
||
|
return p.Close() // get error. safe, after p.Closed()
|
||
|
case <-p.waiting: // already called it.
|
||
|
p.Unlock()
|
||
|
<-p.Closed()
|
||
|
return p.Close() // get error. safe, after p.Closed()
|
||
|
default:
|
||
|
}
|
||
|
p.Unlock()
|
||
|
|
||
|
// here only from one goroutine.
|
||
|
|
||
|
nextToWaitFor := func() Process {
|
||
|
p.Lock()
|
||
|
defer p.Unlock()
|
||
|
for e, _ := range p.waitfors {
|
||
|
c := e.Child()
|
||
|
if c == nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case <-c.Closed():
|
||
|
default:
|
||
|
return c
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// wait for all processes we're waiting for are closed.
|
||
|
// the semantics here are simple: we will _only_ close
|
||
|
// if there are no processes currently waiting for.
|
||
|
for next := nextToWaitFor(); next != nil; next = nextToWaitFor() {
|
||
|
<-next.Closed()
|
||
|
}
|
||
|
|
||
|
// YAY! we're done. close
|
||
|
return p.Close()
|
||
|
}
|