2019-06-12 13:12:00 +02:00

298 lines
6.9 KiB
Go

package goprocess
import (
"sync"
)
// process implements Process
type process struct {
children map[*processLink]struct{} // process to close with us
waitfors map[*processLink]struct{} // process to only wait for
waiters []*processLink // processes that wait for us. for gc.
teardown TeardownFunc // called to run the teardown logic.
closing chan struct{} // closed once close starts.
closed chan struct{} // closed once close is done.
closeErr error // error to return to clients of Close()
sync.Mutex
}
// newProcess constructs and returns a Process.
// It will call tf TeardownFunc exactly once:
// **after** all children have fully Closed,
// **after** entering <-Closing(), and
// **before** <-Closed().
func newProcess(tf TeardownFunc) *process {
return &process{
teardown: tf,
closed: make(chan struct{}),
closing: make(chan struct{}),
waitfors: make(map[*processLink]struct{}),
children: make(map[*processLink]struct{}),
}
}
func (p *process) WaitFor(q Process) {
if q == nil {
panic("waiting for nil process")
}
p.Lock()
defer p.Unlock()
select {
case <-p.Closed():
panic("Process cannot wait after being closed")
default:
}
pl := newProcessLink(p, q)
if p.waitfors == nil {
// This may be nil when we're closing. In close, we'll keep
// reading this map till it stays nil.
p.waitfors = make(map[*processLink]struct{}, 1)
}
p.waitfors[pl] = struct{}{}
go pl.AddToChild()
}
func (p *process) AddChildNoWait(child Process) {
if child == nil {
panic("adding nil child process")
}
p.Lock()
defer p.Unlock()
select {
case <-p.Closing():
// Either closed or closing, close child immediately. This is
// correct because we aren't asked to _wait_ on this child.
go child.Close()
// Wait for the child to start closing so the child is in the
// "correct" state after this function finishes (see #17).
<-child.Closing()
return
default:
}
pl := newProcessLink(p, child)
p.children[pl] = struct{}{}
go pl.AddToChild()
}
func (p *process) AddChild(child Process) {
if child == nil {
panic("adding nil child process")
}
p.Lock()
defer p.Unlock()
pl := newProcessLink(p, child)
select {
case <-p.Closed():
// AddChild must not be called on a dead process. Maybe that's
// too strict?
panic("Process cannot add children after being closed")
default:
}
select {
case <-p.Closing():
// Already closing, close child in background.
go child.Close()
// Wait for the child to start closing so the child is in the
// "correct" state after this function finishes (see #17).
<-child.Closing()
default:
// Only add the child when not closing. When closing, just add
// it to the "waitfors" list.
p.children[pl] = struct{}{}
}
if p.waitfors == nil {
// This may be be nil when we're closing. In close, we'll keep
// reading this map till it stays nil.
p.waitfors = make(map[*processLink]struct{}, 1)
}
p.waitfors[pl] = struct{}{}
go pl.AddToChild()
}
func (p *process) Go(f ProcessFunc) Process {
child := newProcess(nil)
waitFor := newProcess(nil)
child.WaitFor(waitFor) // prevent child from closing
// add child last, to prevent a closing parent from
// closing all of them prematurely, before running the func.
p.AddChild(child)
go func() {
f(child)
waitFor.Close() // allow child to close.
child.CloseAfterChildren() // close to tear down.
}()
return child
}
// SetTeardown to assign a teardown function
func (p *process) SetTeardown(tf TeardownFunc) {
if tf == nil {
panic("cannot set nil TeardownFunc")
}
p.Lock()
if p.teardown != nil {
panic("cannot SetTeardown twice")
}
p.teardown = tf
select {
case <-p.Closed():
p.closeErr = tf()
default:
}
p.Unlock()
}
// Close is the external close function.
// it's a wrapper around internalClose that waits on Closed()
func (p *process) Close() error {
p.Lock()
// if already closing, or closed, get out. (but wait!)
select {
case <-p.Closing():
p.Unlock()
<-p.Closed()
return p.closeErr
default:
}
p.doClose()
p.Unlock()
return p.closeErr
}
func (p *process) Closing() <-chan struct{} {
return p.closing
}
func (p *process) Closed() <-chan struct{} {
return p.closed
}
func (p *process) Err() error {
<-p.Closed()
return p.closeErr
}
// the _actual_ close process.
func (p *process) doClose() {
// this function is only be called once (protected by p.Lock()).
// and it will panic (on closing channels) otherwise.
close(p.closing) // signal that we're shutting down (Closing)
// We won't add any children after we start closing so we can do this
// once.
for plc, _ := range p.children {
child := plc.Child()
if child != nil { // check because child may already have been removed.
go child.Close() // force all children to shut down
}
// safe to call multiple times per link
plc.ParentClear()
}
p.children = nil // clear them. release memory.
// We may repeatedly continue to add waiters while we wait to close so
// we have to do this in a loop.
for len(p.waitfors) > 0 {
// we must be careful not to iterate over waitfors directly, as it may
// change under our feet.
wf := p.waitfors
p.waitfors = nil // clear them. release memory.
for w, _ := range wf {
// Here, we wait UNLOCKED, so that waitfors who are in the middle of
// adding a child to us can finish. we will immediately close the child.
p.Unlock()
<-w.ChildClosed() // wait till all waitfors are fully closed (before teardown)
p.Lock()
// safe to call multiple times per link
w.ParentClear()
}
}
if p.teardown != nil {
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
}
close(p.closed) // signal that we're shut down (Closed)
// go remove all the parents from the process links. optimization.
go func(waiters []*processLink) {
for _, pl := range waiters {
pl.ClearChild()
pr, ok := pl.Parent().(*process)
if !ok {
// parent has already been called to close
continue
}
pr.Lock()
delete(pr.waitfors, pl)
delete(pr.children, pl)
pr.Unlock()
}
}(p.waiters) // pass in so
p.waiters = nil // clear them. release memory.
}
// We will only wait on the children we have now.
// We will not wait on children added subsequently.
// this may change in the future.
func (p *process) CloseAfterChildren() error {
p.Lock()
select {
case <-p.Closed():
p.Unlock()
return p.Close() // get error. safe, after p.Closed()
default:
}
p.Unlock()
// here only from one goroutine.
nextToWaitFor := func() Process {
p.Lock()
defer p.Unlock()
for e, _ := range p.waitfors {
c := e.Child()
if c == nil {
continue
}
select {
case <-c.Closed():
default:
return c
}
}
return nil
}
// wait for all processes we're waiting for are closed.
// the semantics here are simple: we will _only_ close
// if there are no processes currently waiting for.
for next := nextToWaitFor(); next != nil; next = nextToWaitFor() {
<-next.Closed()
}
// YAY! we're done. close
return p.Close()
}