diff --git a/README.md b/README.md index e86ab25..0577d32 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,17 @@ [![GoDoc](https://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square)](https://godoc.org/github.com/raulk/go-watchdog) -go-watchdog runs a singleton memory watchdog. It takes system and heap memory -readings periodically, and feeds them to a user-defined policy to determine -whether GC needs to run immediately. +go-watchdog runs a singleton memory watchdog in the process, which watches +memory utilization and forces Go GC in accordance with a user-defined policy. + +There are two kinds of watchdog so far: + +* **heap-driven:** applies a limit to the heap, and obtains current usage through + `runtime.ReadMemStats()`. +* **system-driven:** applies a limit to the total system memory used, and obtains + current usage through [`elastic/go-sigar`](https://github.com/elastic/gosigar). + +A third process-driven watchdog that uses cgroups is underway. This library ships with two policies out of the box: @@ -18,12 +26,6 @@ This library ships with two policies out of the box: You can easily build a custom policy tailored to the allocation patterns of your program. -It is recommended that you set both (a) a memory limit and (b) a scope of -application of that limit (system or heap) when you start the watchdog. -Otherwise, go-watchdog will use the system scope, and will default to the -total system memory as the limit. [elastic/go-sigar](https://github.com/elastic/gosigar) -is used to make the discovery. - ## Why is this even needed? The garbage collector that ships with the go runtime is pretty good in some diff --git a/adaptive.go b/adaptive.go index b8e3a5b..b7bdcd6 100644 --- a/adaptive.go +++ b/adaptive.go @@ -1,54 +1,32 @@ package watchdog -// AdaptivePolicy is a policy that forces GC when the usage surpasses a -// user-configured percentage (Factor) of the available memory that remained -// after the last GC run. +// NewAdaptivePolicy creates a policy that forces GC when the usage surpasses a +// user-configured percentage (factor) of the available memory. // -// TODO tests -type AdaptivePolicy struct { - // Factor determines how much this policy will let the heap expand - // before it triggers. - // - // On every GC run, this policy recalculates the next target as - // (limit-currentHeap)*Factor (i.e. available*Factor). - // - // If the GC target calculated by the runtime is lower than the one - // calculated by this policy, this policy will set the new target, but the - // effect will be nil, since the the go runtime will run GC sooner than us - // anyway. - Factor float64 - - active bool - target uint64 - initialized bool +// This policy recalculates the next target as usage+(limit-usage)*factor, and +// forces immediate GC when used >= limit. +func NewAdaptivePolicy(factor float64) PolicyCtor { + return func(limit uint64) (Policy, error) { + return &adaptivePolicy{ + factor: factor, + limit: limit, + }, nil + } } -var _ Policy = (*AdaptivePolicy)(nil) - -func (a *AdaptivePolicy) Evaluate(input PolicyInput) (trigger bool) { - if !a.initialized { - // when initializing, set the target to the limit; it will be reset - // when the first GC happens. - a.target = input.Limit - a.initialized = true - } - - // determine the value to compare utilisation against. - var actual uint64 - switch input.Scope { - case ScopeSystem: - actual = input.SysStats.ActualUsed - case ScopeHeap: - actual = input.MemStats.HeapAlloc - } - - if input.GCTrigger { - available := float64(input.Limit) - float64(actual) - calc := uint64(available * a.Factor) - a.target = calc - } - if actual >= a.target { - return true - } - return false +type adaptivePolicy struct { + factor float64 + limit uint64 +} + +var _ Policy = (*adaptivePolicy)(nil) + +func (p *adaptivePolicy) Evaluate(_ UtilizationType, used uint64) (next uint64, immediate bool) { + if used >= p.limit { + return used, true + } + + available := float64(p.limit) - float64(used) + next = used + uint64(available*p.factor) + return next, false } diff --git a/adaptive_test.go b/adaptive_test.go new file mode 100644 index 0000000..b30d3b5 --- /dev/null +++ b/adaptive_test.go @@ -0,0 +1,31 @@ +package watchdog + +import ( + "testing" + + "github.com/raulk/clock" + "github.com/stretchr/testify/require" +) + +func TestAdaptivePolicy(t *testing.T) { + clk := clock.NewMock() + Clock = clk + + p, err := NewAdaptivePolicy(0.5)(limit) + require.NoError(t, err) + + // at zero; next = 50%. + next, immediate := p.Evaluate(UtilizationSystem, 0) + require.False(t, immediate) + require.EqualValues(t, limit/2, next) + + // at half; next = 75%. + next, immediate = p.Evaluate(UtilizationSystem, limit/2) + require.False(t, immediate) + require.EqualValues(t, 3*(limit/4), next) + + // at limit; immediate = true. + next, immediate = p.Evaluate(UtilizationSystem, limit) + require.True(t, immediate) + require.EqualValues(t, limit, next) +} diff --git a/go.mod b/go.mod index fd95880..e2c1eaf 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,10 @@ module github.com/raulk/go-watchdog go 1.15 require ( - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327 github.com/elastic/gosigar v0.12.0 github.com/kr/pretty v0.1.0 // indirect github.com/raulk/clock v1.1.0 github.com/stretchr/testify v1.4.0 - golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect ) diff --git a/go.sum b/go.sum index b9232ed..5d43f30 100644 --- a/go.sum +++ b/go.sum @@ -1,27 +1,54 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs= +github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327 h1:7grrpcfCtbZLsjtB0DgMuzs1umsJmpzaHMZ6cO6iAWw= +github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= +github.com/coreos/go-systemd/v22 v22.1.0 h1:kq/SbG2BCKLkDKkjQf5OWwKWUKj1lgs3lFI4PxnR5lg= +github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/elastic/gosigar v0.12.0 h1:AsdhYCJlTudhfOYQyFNgx+fIVTfrDO0V1ST0vHgiapU= github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= +github.com/godbus/dbus/v5 v5.0.3 h1:ZqHaoEF7TBzh4jzPmqVhE/5A1z9of6orkAe5uHoAeME= +github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= +github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= +github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/raulk/clock v1.1.0 h1:dpb29+UKMbLqiU/jqIJptgLR1nn23HLgMY0sTCDza5Y= github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0 h1:8H8QZJ30plJyIVj60H3lr8TZGIq2Fh3Cyrs/ZNg1foU= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9 h1:1/DFK4b7JH8DmkqhUk48onnSfrPzImPoVxuomtbT2nk= +golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= diff --git a/log.go b/log.go index b6977cc..ced2020 100644 --- a/log.go +++ b/log.go @@ -2,17 +2,17 @@ package watchdog import "log" -// Logger is an interface to be implemented by custom loggers. -type Logger interface { +// logger is an interface to be implemented by custom loggers. +type logger interface { Debugf(template string, args ...interface{}) Infof(template string, args ...interface{}) Warnf(template string, args ...interface{}) Errorf(template string, args ...interface{}) } -var _ Logger = (*stdlog)(nil) +var _ logger = (*stdlog)(nil) -// stdlog is a Logger that proxies to a standard log.Logger. +// stdlog is a logger that proxies to a standard log.logger. type stdlog struct { log *log.Logger debug bool diff --git a/sys_linux.go b/sys_linux.go new file mode 100644 index 0000000..ff0f919 --- /dev/null +++ b/sys_linux.go @@ -0,0 +1,24 @@ +package watchdog + +import ( + "github.com/containerd/cgroups" +) + +func ProcessMemoryLimit() uint64 { + var ( + pid = os.Getpid() + memSubsystem = cgroups.SingleSubsystem(cgroups.V1, cgroups.Memory) + ) + cgroup, err := cgroups.Load(memSubsystem, cgroups.PidPath(pid)) + if err != nil { + return 0 + } + metrics, err := cgroup.Stat() + if err != nil { + return 0 + } + if metrics.Memory == nil { + return 0 + } + return metrics.Memory.HierarchicalMemoryLimit +} diff --git a/sys_other.go b/sys_other.go new file mode 100644 index 0000000..fb469f2 --- /dev/null +++ b/sys_other.go @@ -0,0 +1,7 @@ +// +build !linux + +package watchdog + +func ProcessMemoryLimit() uint64 { + return 0 +} diff --git a/watchdog.go b/watchdog.go index 7fc9266..739fe99 100644 --- a/watchdog.go +++ b/watchdog.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "runtime" + "runtime/debug" "sync" "sync/atomic" "time" @@ -12,12 +13,61 @@ import ( "github.com/raulk/clock" ) -// DecimalPrecision is the rounding precision that float calculations will use. -// By default, 4 decimal places. -var DecimalPrecision = 1e4 +// The watchdog is designed to be used as a singleton; global vars are OK for +// that reason. +var ( + // Logger is the logger to use. If nil, it will default to a logger that + // proxies to a standard logger using the "[watchdog]" prefix. + Logger logger = &stdlog{log: log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix)} -// Clock can be used to inject a mock clock for testing. -var Clock = clock.New() + // Clock can be used to inject a mock clock for testing. + Clock = clock.New() + + // NotifyFired, if non-nil, will be called when the policy has fired, + // prior to calling GC, even if GC is disabled. + NotifyFired func() = func() {} +) + +var ( + // ReadMemStats stops the world. But as of go1.9, it should only + // take ~25µs to complete. + // + // Before go1.15, calls to ReadMemStats during an ongoing GC would + // block due to the worldsema lock. As of go1.15, this was optimized + // and the runtime holds on to worldsema less during GC (only during + // sweep termination and mark termination). + // + // For users using go1.14 and earlier, if this call happens during + // GC, it will just block for longer until serviced, but it will not + // take longer in itself. No harm done. + // + // Actual benchmarks + // ----------------- + // + // In Go 1.15.5, ReadMem with no ongoing GC takes ~27µs in a MBP 16 + // i9 busy with another million things. During GC, it takes an + // average of less than 175µs per op. + // + // goos: darwin + // goarch: amd64 + // pkg: github.com/filecoin-project/lotus/api + // BenchmarkReadMemStats-16 44530 27523 ns/op + // BenchmarkReadMemStats-16 43743 26879 ns/op + // BenchmarkReadMemStats-16 45627 26791 ns/op + // BenchmarkReadMemStats-16 44538 26219 ns/op + // BenchmarkReadMemStats-16 44958 26757 ns/op + // BenchmarkReadMemStatsWithGCContention-16 10 183733 p50-ns 211859 p90-ns 211859 p99-ns + // BenchmarkReadMemStatsWithGCContention-16 7 198765 p50-ns 314873 p90-ns 314873 p99-ns + // BenchmarkReadMemStatsWithGCContention-16 10 195151 p50-ns 311408 p90-ns 311408 p99-ns + // BenchmarkReadMemStatsWithGCContention-16 10 217279 p50-ns 295308 p90-ns 295308 p99-ns + // BenchmarkReadMemStatsWithGCContention-16 10 167054 p50-ns 327072 p90-ns 327072 p99-ns + // PASS + // + // See: https://github.com/golang/go/issues/19812 + // See: https://github.com/prometheus/client_golang/issues/403 + memstatsFn = runtime.ReadMemStats + sysmemFn = (*gosigar.Mem).Get +) var ( // ErrAlreadyStarted is returned when the user tries to start the watchdog more than once. @@ -26,239 +76,316 @@ var ( const ( // stateUnstarted represents an unstarted state. - stateUnstarted int64 = iota - // stateStarted represents a started state. - stateStarted + stateUnstarted int32 = iota + // stateRunning represents an operational state. + stateRunning + // stateClosing represents a temporary closing state. + stateClosing ) -// watchdog is a global singleton watchdog. -var watchdog struct { - state int64 - config MemConfig +// _watchdog is a global singleton watchdog. +var _watchdog struct { + state int32 // guarded by atomic, one of state* constants. + + scope UtilizationType closing chan struct{} wg sync.WaitGroup } -// ScopeType defines the scope of the utilisation that we'll apply the limit to. -type ScopeType int +// UtilizationType is the utilization metric in use. +type UtilizationType int const ( - // ScopeSystem specifies that the policy compares against actual used + // UtilizationSystem specifies that the policy compares against actual used // system memory. - ScopeSystem ScopeType = iota - // ScopeHeap specifies that the policy compares against heap used. - ScopeHeap + UtilizationSystem UtilizationType = iota + // UtilizationHeap specifies that the policy compares against heap used. + UtilizationHeap ) -// PolicyInput is the object that's passed to a policy when evaluating it. -type PolicyInput struct { - Scope ScopeType - Limit uint64 - MemStats *runtime.MemStats - SysStats *gosigar.Mem - GCTrigger bool // is this a GC trigger? - Logger Logger -} +// PolicyCtor is a policy constructor. +type PolicyCtor func(limit uint64) (Policy, error) -// Policy encapsulates the logic that the watchdog will run on every tick. +// Policy is polled by the watchdog to determine the next utilisation at which +// a GC should be forced. type Policy interface { - // Evaluate determines whether the policy should fire. It receives the - // limit (either guessed or manually set), go runtime memory stats, and - // system memory stats, amongst other things. It returns whether the policy - // has fired or not. - Evaluate(input PolicyInput) (trigger bool) + // Evaluate determines when the next GC should take place. It receives the + // current usage, and it returns the next usage at which to trigger GC. + // + // The policy can request immediate GC, in which case next should match the + // used memory. + Evaluate(scope UtilizationType, used uint64) (next uint64, immediate bool) } -type MemConfig struct { - // Scope is the scope at which the limit will be applied. - Scope ScopeType - - // Limit is the memory available to this process. If zero, we will fall - // back to querying the system total memory via SIGAR. - Limit uint64 - - // Resolution is the interval at which the watchdog will retrieve memory - // stats and invoke the Policy. - Resolution time.Duration - - // Policy sets the firing policy of this watchdog. - Policy Policy - - // NotifyFired, if non-nil, will be called when the policy has fired, - // prior to calling GC, even if GC is disabled. - NotifyFired func() - - // NotifyOnly, if true, will cause the watchdog to only notify via the - // callbacks, without triggering actual GC. - NotifyOnly bool - - // Logger is the logger to use. If nil, it will default to a logger that - // proxies to a standard logger using the "[watchdog]" prefix. - Logger Logger -} - -// Memory starts the singleton memory watchdog with the provided configuration. -func Memory(config MemConfig) (err error, stop func()) { - if !atomic.CompareAndSwapInt64(&watchdog.state, stateUnstarted, stateStarted) { +// HeapDriven starts a singleton heap-driven watchdog. +// +// The heap-driven watchdog adjusts GOGC dynamically after every GC, to honour +// the policy. When an immediate GC is requested, runtime.GC() is called, and +// the policy is re-evaluated at the end of GC. +// +// It is entirely possible for the policy to keep requesting immediate GC +// repeateadly. This usually signals an emergency situation, and won't prevent +// the program from making progress, since the Go's garbage collection is not +// stop-the-world (for the major part). +// +// A limit value of 0 will error. +func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) { + if !atomic.CompareAndSwapInt32(&_watchdog.state, stateUnstarted, stateRunning) { return ErrAlreadyStarted, nil } - if config.Logger == nil { - config.Logger = &stdlog{log: log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix)} + if limit == 0 { + return fmt.Errorf("cannot use zero limit for heap-driven watchdog"), nil } - // if the user didn't provide a limit, get the total memory. - if config.Limit == 0 { - var mem gosigar.Mem - if err := mem.Get(); err != nil { - return fmt.Errorf("failed to get system memory limit via SIGAR: %w", err), nil + policy, err := policyCtor(limit) + if err != nil { + return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil + } + + _watchdog.scope = UtilizationHeap + _watchdog.closing = make(chan struct{}) + + var gcTriggered = make(chan struct{}, 16) + setupGCSentinel(gcTriggered) + + _watchdog.wg.Add(1) + go func() { + defer _watchdog.wg.Done() + + // get the initial effective GOGC; guess it's 100 (default), and restore + // it to whatever it actually was. This works because SetGCPercent + // returns the previous value. + originalGOGC := debug.SetGCPercent(debug.SetGCPercent(100)) + currGOGC := originalGOGC + + var memstats runtime.MemStats + for { + select { + case <-gcTriggered: + NotifyFired() + + case <-_watchdog.closing: + return + } + + // recompute the next trigger. + memstatsFn(&memstats) + + // heapMarked is the amount of heap that was marked as live by GC. + // it is inferred from our current GOGC and the new target picked. + heapMarked := uint64(float64(memstats.NextGC) / (1 + float64(currGOGC)/100)) + if heapMarked == 0 { + // this shouldn't happen, but just in case; avoiding a div by 0. + continue + } + + // evaluate the policy. + next, immediate := policy.Evaluate(UtilizationHeap, memstats.HeapAlloc) + + if immediate { + // trigger a forced GC; because we're not making the finalizer + // skip sending to the trigger channel, we will get fired again. + // at this stage, the program is under significant pressure, and + // given that Go GC is not STW for the largest part, the worse + // thing that could happen from infinitely GC'ing is that the + // program will run in a degrated state for longer, possibly + // long enough for an operator to intervene. + Logger.Warnf("heap-driven watchdog requested immediate GC; " + + "system is probably under significant pressure; " + + "performance compromised") + forceGC(&memstats) + continue + } + + // calculate how much to set GOGC to honour the next trigger point. + currGOGC = int(((float64(next) / float64(heapMarked)) - float64(1)) * 100) + if currGOGC >= originalGOGC { + Logger.Infof("heap watchdog: requested GOGC percent higher than default; capping at default; requested: %d; default: %d", currGOGC, originalGOGC) + currGOGC = originalGOGC + } else { + if currGOGC < 1 { + currGOGC = 1 + } + Logger.Infof("heap watchdog: setting GOGC percent: %d", currGOGC) + } + + debug.SetGCPercent(currGOGC) + + memstatsFn(&memstats) + Logger.Infof("heap watchdog stats: heap_alloc: %d, heap_marked: %d, next_gc: %d, policy_next_gc: %d, gogc: %d", + memstats.HeapAlloc, heapMarked, memstats.NextGC, next, currGOGC) } - config.Limit = mem.Total - } + }() - watchdog.config = config - watchdog.closing = make(chan struct{}) - - watchdog.wg.Add(1) - go watch() - - return nil, stopMemory + return nil, stop } -func watch() { - var ( - lk sync.Mutex // guards gcTriggered channel, which is drained and flipped to nil on closure. - gcTriggered = make(chan struct{}, 16) +// SystemDriven starts a singleton system-driven watchdog. +// +// The system-driven watchdog keeps a threshold, above which GC will be forced. +// The watchdog polls the system utilization at the specified frequency. When +// the actual utilization exceeds the threshold, a GC is forced. +// +// This threshold is calculated by querying the policy every time that GC runs, +// either triggered by the runtime, or forced by us. +func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) { + if !atomic.CompareAndSwapInt32(&_watchdog.state, stateUnstarted, stateRunning) { + return ErrAlreadyStarted, nil + } - memstats runtime.MemStats - sysmem gosigar.Mem - config = watchdog.config - ) + if limit == 0 { + limit, err = determineLimit(false) + if err != nil { + return err, nil + } + } + + policy, err := policyCtor(limit) + if err != nil { + return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil + } + + _watchdog.scope = UtilizationSystem + _watchdog.closing = make(chan struct{}) + + var gcTriggered = make(chan struct{}, 16) + setupGCSentinel(gcTriggered) + + _watchdog.wg.Add(1) + go func() { + defer _watchdog.wg.Done() + + var ( + memstats runtime.MemStats + sysmem gosigar.Mem + threshold uint64 + ) + + // initialize the threshold. + threshold, immediate := policy.Evaluate(UtilizationSystem, sysmem.ActualUsed) + if immediate { + Logger.Warnf("system-driven watchdog requested immediate GC upon startup; " + + "policy is probably misconfigured; " + + "performance compromised") + forceGC(&memstats) + } + + for { + select { + case <-Clock.After(frequency): + // get the current usage. + if err := sysmemFn(&sysmem); err != nil { + Logger.Warnf("failed to obtain system memory stats; err: %s", err) + continue + } + actual := sysmem.ActualUsed + if actual < threshold { + // nothing to do. + continue + } + // trigger GC; this will emit a gcTriggered event which we'll + // consume next to readjust the threshold. + Logger.Warnf("system-driven watchdog triggering GC; %d/%d bytes (used/threshold)", actual, threshold) + forceGC(&memstats) + + case <-gcTriggered: + NotifyFired() + + // get the current usage. + if err := sysmemFn(&sysmem); err != nil { + Logger.Warnf("failed to obtain system memory stats; err: %s", err) + continue + } + + // adjust the threshold. + threshold, immediate = policy.Evaluate(UtilizationSystem, sysmem.ActualUsed) + if immediate { + Logger.Warnf("system-driven watchdog triggering immediate GC; %d used bytes", sysmem.ActualUsed) + forceGC(&memstats) + } + + case <-_watchdog.closing: + return + } + } + }() + + return nil, stop +} + +func determineLimit(restrictByProcess bool) (uint64, error) { + // TODO. + // if restrictByProcess { + // if pmem := ProcessMemoryLimit(); pmem > 0 { + // Logger.Infof("watchdog using process limit: %d bytes", pmem) + // return pmem, nil + // } + // Logger.Infof("watchdog was unable to determine process limit; falling back to total system memory") + // } + + // populate initial utilisation and system stats. + var sysmem gosigar.Mem + if err := sysmemFn(&sysmem); err != nil { + return 0, fmt.Errorf("failed to get system memory stats: %w", err) + } + return sysmem.Total, nil +} + +// forceGC forces a manual GC. +func forceGC(memstats *runtime.MemStats) { + Logger.Infof("watchdog is forcing GC") + + // it's safe to assume that the finalizer will attempt to run before + // runtime.GC() returns because runtime.GC() waits for the sweep phase to + // finish before returning. + // finalizers are run in the sweep phase. + start := time.Now() + runtime.GC() + took := time.Since(start) + + memstatsFn(memstats) + Logger.Infof("watchdog-triggered GC finished; took: %s; current heap allocated: %d bytes", took, memstats.HeapAlloc) +} + +func setupGCSentinel(gcTriggered chan struct{}) { + logger := Logger // this non-zero sized struct is used as a sentinel to detect when a GC // run has finished, by setting and resetting a finalizer on it. + // it essentially creates a GC notification "flywheel"; every GC will + // trigger this finalizer, which will reset itself so it gets notified + // of the next GC, breaking the cycle when the watchdog is stopped. type sentinel struct{ a *int } - var sentinelObj sentinel var finalizer func(o *sentinel) finalizer = func(o *sentinel) { - lk.Lock() - defer lk.Unlock() - select { - case gcTriggered <- struct{}{}: - default: - config.Logger.Warnf("failed to queue gc trigger; channel backlogged") - } - runtime.SetFinalizer(o, finalizer) - } - finalizer(&sentinelObj) - - defer watchdog.wg.Done() - for { - var eventIsGc bool - select { - case <-Clock.After(config.Resolution): - // exit select. - - case <-gcTriggered: - eventIsGc = true - // exit select. - - case <-watchdog.closing: - runtime.SetFinalizer(&sentinelObj, nil) // clear the sentinel finalizer. - - lk.Lock() - ch := gcTriggered - gcTriggered = nil - lk.Unlock() - - // close and drain - close(ch) - for range ch { - } + if atomic.LoadInt32(&_watchdog.state) != stateRunning { + // this GC triggered after the watchdog was stopped; ignore + // and do not reset the finalizer. return } - // ReadMemStats stops the world. But as of go1.9, it should only - // take ~25µs to complete. - // - // Before go1.15, calls to ReadMemStats during an ongoing GC would - // block due to the worldsema lock. As of go1.15, this was optimized - // and the runtime holds on to worldsema less during GC (only during - // sweep termination and mark termination). - // - // For users using go1.14 and earlier, if this call happens during - // GC, it will just block for longer until serviced, but it will not - // take longer in itself. No harm done. - // - // Actual benchmarks - // ----------------- - // - // In Go 1.15.5, ReadMem with no ongoing GC takes ~27µs in a MBP 16 - // i9 busy with another million things. During GC, it takes an - // average of less than 175µs per op. - // - // goos: darwin - // goarch: amd64 - // pkg: github.com/filecoin-project/lotus/api - // BenchmarkReadMemStats-16 44530 27523 ns/op - // BenchmarkReadMemStats-16 43743 26879 ns/op - // BenchmarkReadMemStats-16 45627 26791 ns/op - // BenchmarkReadMemStats-16 44538 26219 ns/op - // BenchmarkReadMemStats-16 44958 26757 ns/op - // BenchmarkReadMemStatsWithGCContention-16 10 183733 p50-ns 211859 p90-ns 211859 p99-ns - // BenchmarkReadMemStatsWithGCContention-16 7 198765 p50-ns 314873 p90-ns 314873 p99-ns - // BenchmarkReadMemStatsWithGCContention-16 10 195151 p50-ns 311408 p90-ns 311408 p99-ns - // BenchmarkReadMemStatsWithGCContention-16 10 217279 p50-ns 295308 p90-ns 295308 p99-ns - // BenchmarkReadMemStatsWithGCContention-16 10 167054 p50-ns 327072 p90-ns 327072 p99-ns - // PASS - // - // See: https://github.com/golang/go/issues/19812 - // See: https://github.com/prometheus/client_golang/issues/403 + // reset so it triggers on the next GC. + runtime.SetFinalizer(o, finalizer) - if eventIsGc { - config.Logger.Infof("watchdog after GC") - } - - runtime.ReadMemStats(&memstats) - - if err := sysmem.Get(); err != nil { - config.Logger.Warnf("failed to obtain system memory stats; err: %s", err) - } - - trigger := config.Policy.Evaluate(PolicyInput{ - Scope: config.Scope, - Limit: config.Limit, - MemStats: &memstats, - SysStats: &sysmem, - GCTrigger: eventIsGc, - Logger: config.Logger, - }) - - if !trigger { - continue - } - - config.Logger.Infof("watchdog policy fired") - - if f := config.NotifyFired; f != nil { - f() - } - - if !config.NotifyOnly { - config.Logger.Infof("watchdog is triggering GC") - start := time.Now() - runtime.GC() - runtime.ReadMemStats(&memstats) - config.Logger.Infof("watchdog-triggered GC finished; took: %s; current heap allocated: %d bytes", time.Since(start), memstats.HeapAlloc) + select { + case gcTriggered <- struct{}{}: + default: + logger.Warnf("failed to queue gc trigger; channel backlogged") } } + + runtime.SetFinalizer(&sentinel{}, finalizer) // start the flywheel. } -func stopMemory() { - if !atomic.CompareAndSwapInt64(&watchdog.state, stateStarted, stateUnstarted) { +func stop() { + if !atomic.CompareAndSwapInt32(&_watchdog.state, stateRunning, stateClosing) { return } - close(watchdog.closing) - watchdog.wg.Wait() + + close(_watchdog.closing) + _watchdog.wg.Wait() + + atomic.StoreInt32(&_watchdog.state, stateUnstarted) } diff --git a/watchdog_test.go b/watchdog_test.go index ce00e3e..89b8d75 100644 --- a/watchdog_test.go +++ b/watchdog_test.go @@ -1,92 +1,151 @@ package watchdog import ( + "fmt" + "log" + "os" "runtime" + "runtime/debug" "testing" "time" + "github.com/elastic/gosigar" "github.com/raulk/clock" "github.com/stretchr/testify/require" ) -type testPolicy struct { - ch chan *PolicyInput - trigger bool +// These integration tests are a hugely non-deterministic, but necessary to get +// good coverage and confidence. The Go runtime makes its own pacing decisions, +// and those may vary based on machine, OS, kernel memory management, other +// running programs, exogenous memory pressure, and Go runtime versions. +// +// The assertions we use here are lax, but should be sufficient to serve as a +// reasonable litmus test of whether the watchdog is doing what it's supposed +// to or not. + +var ( + limit uint64 = 64 << 20 // 64MiB. +) + +func init() { + Logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true} } -var _ Policy = (*testPolicy)(nil) +func TestControl(t *testing.T) { + debug.SetGCPercent(100) -func (t *testPolicy) Evaluate(input PolicyInput) (trigger bool) { - t.ch <- &input - return t.trigger -} + // retain 1MiB every iteration, up to 100MiB (beyond heap limit!). + var retained [][]byte + for i := 0; i < 100; i++ { + b := make([]byte, 1*1024*1024) + for i := range b { + b[i] = byte(i) + } + retained = append(retained, b) + } -func TestWatchdog(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - tp := &testPolicy{ch: make(chan *PolicyInput, 1)} - notifyCh := make(chan struct{}, 1) - - err, stop := Memory(MemConfig{ - Scope: ScopeHeap, - Limit: 100, - Resolution: 10 * time.Second, - Policy: tp, - NotifyFired: func() { notifyCh <- struct{}{} }, - }) - require.NoError(t, err) - defer stop() - - time.Sleep(500 * time.Millisecond) // wait til the watchdog goroutine starts. - - clk.Add(10 * time.Second) - pi := <-tp.ch - require.EqualValues(t, 100, pi.Limit) - require.NotNil(t, pi.MemStats) - require.NotNil(t, pi.SysStats) - require.Equal(t, ScopeHeap, pi.Scope) - - require.Len(t, notifyCh, 0) + for _, b := range retained { + for i := range b { + b[i] = byte(i) + } + } var ms runtime.MemStats runtime.ReadMemStats(&ms) - require.EqualValues(t, 0, ms.NumGC) - - // now fire. - tp.trigger = true - - clk.Add(10 * time.Second) - pi = <-tp.ch - require.EqualValues(t, 100, pi.Limit) - require.NotNil(t, pi.MemStats) - require.NotNil(t, pi.SysStats) - require.Equal(t, ScopeHeap, pi.Scope) - - time.Sleep(500 * time.Millisecond) // wait until the watchdog runs. - - require.Len(t, notifyCh, 1) - - runtime.ReadMemStats(&ms) - require.EqualValues(t, 1, ms.NumGC) + require.LessOrEqual(t, ms.NumGC, uint32(8)) // a maximum of 8 GCs should've happened. + require.Zero(t, ms.NumForcedGC) // no forced GCs. } -func TestDoubleClose(t *testing.T) { - defer func() { - if r := recover(); r != nil { - t.Fatal(r) - } - }() +func TestHeapDriven(t *testing.T) { + // we can't mock ReadMemStats, because we're relying on the go runtime to + // enforce the GC run, and the go runtime won't use our mock. Therefore, we + // need to do the actual thing. + debug.SetGCPercent(100) - tp := &testPolicy{ch: make(chan *PolicyInput, 1)} + clk := clock.NewMock() + Clock = clk - err, stop := Memory(MemConfig{ - Scope: ScopeHeap, - Limit: 100, - Resolution: 10 * time.Second, - Policy: tp, - }) + observations := make([]*runtime.MemStats, 0, 100) + NotifyFired = func() { + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + observations = append(observations, &ms) + } + + // limit is 64MiB. + err, stopFn := HeapDriven(limit, NewAdaptivePolicy(0.5)) require.NoError(t, err) - stop() - stop() + defer stopFn() + + time.Sleep(500 * time.Millisecond) // give time for the watchdog to init. + + // retain 1MiB every iteration, up to 100MiB (beyond heap limit!). + var retained [][]byte + for i := 0; i < 100; i++ { + retained = append(retained, make([]byte, 1*1024*1024)) + } + + for _, o := range observations { + fmt.Println("heap alloc:", o.HeapAlloc, "next gc:", o.NextGC, "gc count:", o.NumGC, "forced gc:", o.NumForcedGC) + } + + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + require.GreaterOrEqual(t, ms.NumGC, uint32(12)) // over 12 GCs should've taken place. + require.GreaterOrEqual(t, ms.NumForcedGC, uint32(5)) // at least 5 forced GCs. +} + +func TestSystemDriven(t *testing.T) { + debug.SetGCPercent(100) + + clk := clock.NewMock() + Clock = clk + + // mock the system reporting. + var actualUsed uint64 + sysmemFn = func(g *gosigar.Mem) error { + g.ActualUsed = actualUsed + return nil + } + + // limit is 64MiB. + err, stopFn := SystemDriven(limit, 5*time.Second, NewAdaptivePolicy(0.5)) + require.NoError(t, err) + defer stopFn() + + time.Sleep(200 * time.Millisecond) // give time for the watchdog to init. + + notifyCh := make(chan struct{}, 1) + NotifyFired = func() { + notifyCh <- struct{}{} + } + + // first tick; used = 0. + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + require.Len(t, notifyCh, 0) // no GC has taken place. + + // second tick; used = just over 50%; will trigger GC. + actualUsed = (limit / 2) + 1 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + require.Len(t, notifyCh, 1) + <-notifyCh + + // third tick; just below 75%; no GC. + actualUsed = uint64(float64(limit)*0.75) - 1 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + require.Len(t, notifyCh, 0) + + // fourth tick; 75% exactly; will trigger GC. + actualUsed = uint64(float64(limit)*0.75) + 1 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + require.Len(t, notifyCh, 1) + <-notifyCh + + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + require.GreaterOrEqual(t, ms.NumForcedGC, uint32(2)) } diff --git a/watermarks.go b/watermarks.go index 54c986e..b193fd1 100644 --- a/watermarks.go +++ b/watermarks.go @@ -1,142 +1,41 @@ package watchdog -import ( - "math" - "time" -) - -// WatermarkPolicy is a watchdog firing policy that triggers when watermarks are -// surpassed in the increasing direction. -// -// For example, a policy configured with the watermarks 0.50, 0.75, 0.80, and -// 0.99 will trigger at most once, and only once, each time that a watermark -// is surpassed upwards. -// -// Even if utilisation pierces through several watermarks at once between two -// subsequent calls to Evaluate, the policy will fire only once. -// -// It is possible to suppress the watermark policy from firing too often by -// setting a Silence period. When non-zero, if a watermark is surpassed within -// the Silence window (using the last GC timestamp as basis), that event will -// not immediately trigger firing. Instead, the policy will wait until the -// silence period is over, and only then will it fire if utilisation is still -// beyond that watermark. -// -// At last, if an EmergencyWatermark is set, when utilisation is above that -// level, the Silence period will be ignored and the policy will fire -// persistenly, as long as utilisation stays above that watermark. -type WatermarkPolicy struct { - // Watermarks are the percentual amounts of limit. The policy will panic if - // Watermarks is zero length. - Watermarks []float64 - - // EmergencyWatermark is a watermark that, when surpassed, puts this - // watchdog in emergency mode. During emergency mode, the system is - // considered to be under significant memory pressure, and the Quiesce - // period is not honoured. - EmergencyWatermark float64 - - // Silence is the quiet period the watchdog will honour except when in - // emergency mode. - Silence time.Duration - - // internal state. - thresholds []uint64 - currIdx int // idx of the current watermark. - lastIdx int - firedLast bool - silenceNs int64 - initialized bool +// NewWatermarkPolicy creates a watchdog policy that schedules GC at concrete +// watermarks. When queried, it will determine the next trigger point based +// on the current utilisation. If the last watermark is surpassed, +// the policy will request immediate GC. +func NewWatermarkPolicy(watermarks ...float64) PolicyCtor { + return func(limit uint64) (Policy, error) { + p := new(watermarkPolicy) + p.limit = limit + p.thresholds = make([]uint64, 0, len(watermarks)) + for _, m := range watermarks { + p.thresholds = append(p.thresholds, uint64(float64(limit)*m)) + } + Logger.Infof("initialized watermark watchdog policy; watermarks: %v; thresholds: %v", p.watermarks, p.thresholds) + return p, nil + } } -var _ Policy = (*WatermarkPolicy)(nil) - -func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) { - if !w.initialized { - w.thresholds = make([]uint64, 0, len(w.Watermarks)) - for _, m := range w.Watermarks { - w.thresholds = append(w.thresholds, uint64(float64(input.Limit)*m)) - } - w.silenceNs = w.Silence.Nanoseconds() - w.lastIdx = len(w.Watermarks) - 1 - w.initialized = true - input.Logger.Infof("initialized watermark watchdog policy; watermarks: %v; emergency watermark: %f, thresholds: %v; silence period: %s", - w.Watermarks, w.EmergencyWatermark, w.thresholds, w.Silence) - } - - // determine the value to compare utilisation against. - var actual uint64 - switch input.Scope { - case ScopeSystem: - actual = input.SysStats.ActualUsed - case ScopeHeap: - actual = input.MemStats.HeapAlloc - } - - input.Logger.Debugf("watermark policy: evaluating; curr_watermark: %f, utilization: %d/%d/%d (used/curr_threshold/limit)", - w.Watermarks[w.currIdx], actual, w.thresholds[w.currIdx], input.Limit) - - // determine whether we're past the emergency watermark; if we are, we fire - // unconditionally. Disabled if 0. - if w.EmergencyWatermark > 0 { - currPercentage := math.Round((float64(actual)/float64(input.Limit))*DecimalPrecision) / DecimalPrecision // round float. - if pastEmergency := currPercentage >= w.EmergencyWatermark; pastEmergency { - emergencyThreshold := uint64(float64(input.Limit) / w.EmergencyWatermark) - input.Logger.Infof("watermark policy: emergency trigger; perc: %f/%f (%% used/%% emergency), utilization: %d/%d/%d (used/emergency/limit), used: %d, limit: %d, ", - currPercentage, w.EmergencyWatermark, actual, emergencyThreshold, input.Limit) - return true - } - } - - // short-circuit if within the silencing period. - if silencing := w.silenceNs > 0; silencing { - now := Clock.Now().UnixNano() - if elapsed := now - int64(input.MemStats.LastGC); elapsed < w.silenceNs { - input.Logger.Debugf("watermark policy: silenced") - return false - } - } - - // check if the the utilisation is below our current threshold; try - // to downscale before returning false. - if actual < w.thresholds[w.currIdx] { - for w.currIdx > 0 { - if actual >= w.thresholds[w.currIdx-1] { - break - } - w.firedLast = false - w.currIdx-- - } - return false - } - - // we are above our current watermark, but if this is the last watermark and - // we've already fired, suppress the firing. - if w.currIdx == w.lastIdx && w.firedLast { - input.Logger.Debugf("watermark policy: last watermark already fired; skipping") - return false - } - - // if our value is above the last threshold, record the last threshold as - // our current watermark, and also the fact that we've already fired for - // it. - if actual >= w.thresholds[w.lastIdx] { - w.currIdx = w.lastIdx - w.firedLast = true - input.Logger.Infof("watermark policy triggering: %f watermark surpassed", w.Watermarks[w.currIdx]) - return true - } - - input.Logger.Infof("watermark policy triggering: %f watermark surpassed", w.Watermarks[w.currIdx]) - - // we are triggering; update the current watermark, upscaling it to the - // next threshold. - for w.currIdx < w.lastIdx { - w.currIdx++ - if actual < w.thresholds[w.currIdx] { - break - } - } - - return true +type watermarkPolicy struct { + // watermarks are the percentual amounts of limit. + watermarks []float64 + // thresholds are the absolute trigger points of this policy. + thresholds []uint64 + limit uint64 +} + +var _ Policy = (*watermarkPolicy)(nil) + +func (w *watermarkPolicy) Evaluate(_ UtilizationType, used uint64) (next uint64, immediate bool) { + Logger.Debugf("watermark policy: evaluating; utilization: %d/%d (used/limit)", used, w.limit) + var i int + for ; i < len(w.thresholds); i++ { + t := w.thresholds[i] + if used < t { + return t, false + } + } + // we reached the maximum threshold, so fire immediately. + return used, true } diff --git a/watermarks_test.go b/watermarks_test.go index 1d855e7..72d6108 100644 --- a/watermarks_test.go +++ b/watermarks_test.go @@ -1,288 +1,61 @@ package watchdog import ( - "log" - "os" - "runtime" "testing" - "time" - "github.com/elastic/gosigar" "github.com/raulk/clock" "github.com/stretchr/testify/require" ) var ( - limit uint64 = 64 << 20 // 64MiB. - firstWatermark = 0.50 - secondWatermark = 0.75 - thirdWatermark = 0.80 - emergencyWatermark = 0.90 - - logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true} + watermarks = []float64{0.50, 0.75, 0.80} + thresholds = func() []uint64 { + var ret []uint64 + for _, w := range watermarks { + ret = append(ret, uint64(float64(limit)*w)) + } + return ret + }() ) -func TestProgressiveWatermarksSystem(t *testing.T) { +func TestProgressiveWatermarks(t *testing.T) { clk := clock.NewMock() Clock = clk - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark}, - EmergencyWatermark: emergencyWatermark, - } + p, err := NewWatermarkPolicy(watermarks...)(limit) + require.NoError(t, err) - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*firstWatermark) - 1}, - })) + // at zero + next, immediate := p.Evaluate(UtilizationSystem, uint64(0)) + require.False(t, immediate) + require.EqualValues(t, thresholds[0], next) - // trigger the first watermark. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)}, - })) + // before the watermark. + next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])-1) + require.False(t, immediate) + require.EqualValues(t, thresholds[0], next) - // this won't fire because we're still on the same watermark. - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)}, - })) - } + // exactly at the watermark; gives us the next watermark, as the watchdodg would've + // taken care of triggering the first watermark. + next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])) + require.False(t, immediate) + require.EqualValues(t, thresholds[1], next) - // now let's move to the second watermark. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)}, - })) + // after the watermark gives us the next watermark. + next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])+1) + require.False(t, immediate) + require.EqualValues(t, thresholds[1], next) - // this won't fire because we're still on the same watermark. - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)}, - })) - } + // last watermark; always triggers. + next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2])) + require.True(t, immediate) + require.EqualValues(t, uint64(float64(limit)*watermarks[2]), next) - // now let's move to the third and last watermark. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * thirdWatermark)}, - })) + next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2]+1)) + require.True(t, immediate) + require.EqualValues(t, uint64(float64(limit)*watermarks[2])+1, next) - // this won't fire because we're still on the same watermark. - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * thirdWatermark)}, - })) - } -} - -func TestProgressiveWatermarksHeap(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark}, - } - - // now back up step by step, check that all of them fire. - for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} { - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeHeap, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0), HeapAlloc: uint64(float64(limit) * wm)}, - SysStats: &gosigar.Mem{ActualUsed: 0}, - })) - } -} - -func TestDownscalingWatermarks_Reentrancy(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark}, - } - - // crank all the way to the top. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: limit}, - })) - - // now back down, checking that none of the boundaries fire. - for _, wm := range []float64{thirdWatermark, secondWatermark, firstWatermark} { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*wm - 1)}, - })) - } - - // now back up step by step, check that all of them fire. - for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} { - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*wm + 1)}, - })) - } - - // check the top does not fire because it already fired. - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: limit}, - })) - } -} - -func TestEmergencyWatermark(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark}, - EmergencyWatermark: emergencyWatermark, - Silence: 1 * time.Minute, - } - - // every tick triggers, even within the silence period. - for i := 0; i < 100; i++ { - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * emergencyWatermark)}, - })) - } -} - -func TestJumpWatermark(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark}, - } - - // check that jumping to the top only fires once. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: limit}, - })) - - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: limit}, - })) - } - -} - -func TestSilencePeriod(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - var ( - limit uint64 = 64 << 20 // 64MiB. - firstWatermark = 0.50 - secondWatermark = 0.75 - thirdWatermark = 0.80 - ) - - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark}, - Silence: 1 * time.Minute, - } - - // going above the first threshold, but within silencing period, so nothing happens. - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)}, - })) - } - - // now outside the silencing period, we do fire. - clk.Add(time.Minute) - - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: 0}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)}, - })) - - // but not the second time. - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: 0}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)}, - })) - - // now let's go up inside the silencing period, nothing happens. - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)}, - })) - - // same thing, outside the silencing period should trigger. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)}, - })) + next, immediate = p.Evaluate(UtilizationSystem, limit) + require.True(t, immediate) + require.EqualValues(t, limit, next) }