replace atomic state guarding with lock.

This commit is contained in:
Raúl Kripalani 2020-12-09 14:54:09 +00:00
parent 4558d98653
commit b35cdf0c7d

View File

@ -6,7 +6,6 @@ import (
"runtime"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/elastic/gosigar"
@ -79,13 +78,12 @@ const (
stateUnstarted int32 = iota
// stateRunning represents an operational state.
stateRunning
// stateClosing represents a temporary closing state.
stateClosing
)
// _watchdog is a global singleton watchdog.
var _watchdog struct {
state int32 // guarded by atomic, one of state* constants.
lk sync.Mutex
state int32
scope UtilizationType
@ -131,7 +129,10 @@ type Policy interface {
//
// A limit value of 0 will error.
func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) {
if !atomic.CompareAndSwapInt32(&_watchdog.state, stateUnstarted, stateRunning) {
_watchdog.lk.Lock()
defer _watchdog.lk.Unlock()
if _watchdog.state != stateUnstarted {
return ErrAlreadyStarted, nil
}
@ -144,6 +145,7 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
}
_watchdog.state = stateRunning
_watchdog.scope = UtilizationHeap
_watchdog.closing = make(chan struct{})
@ -231,7 +233,10 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
// This threshold is calculated by querying the policy every time that GC runs,
// either triggered by the runtime, or forced by us.
func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) {
if !atomic.CompareAndSwapInt32(&_watchdog.state, stateUnstarted, stateRunning) {
_watchdog.lk.Lock()
defer _watchdog.lk.Unlock()
if _watchdog.state != stateUnstarted {
return ErrAlreadyStarted, nil
}
@ -247,6 +252,7 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor)
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
}
_watchdog.state = stateRunning
_watchdog.scope = UtilizationSystem
_watchdog.closing = make(chan struct{})
@ -360,7 +366,10 @@ func setupGCSentinel(gcTriggered chan struct{}) {
type sentinel struct{ a *int }
var finalizer func(o *sentinel)
finalizer = func(o *sentinel) {
if atomic.LoadInt32(&_watchdog.state) != stateRunning {
_watchdog.lk.Lock()
defer _watchdog.lk.Unlock()
if _watchdog.state != stateRunning {
// this GC triggered after the watchdog was stopped; ignore
// and do not reset the finalizer.
return
@ -380,12 +389,14 @@ func setupGCSentinel(gcTriggered chan struct{}) {
}
func stop() {
if !atomic.CompareAndSwapInt32(&_watchdog.state, stateRunning, stateClosing) {
_watchdog.lk.Lock()
defer _watchdog.lk.Unlock()
if _watchdog.state != stateRunning {
return
}
close(_watchdog.closing)
_watchdog.wg.Wait()
atomic.StoreInt32(&_watchdog.state, stateUnstarted)
_watchdog.state = stateUnstarted
}