From 4558d98653dc08fa997f5c919c0a051c259e6083 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 4 Dec 2020 13:08:00 +0000 Subject: [PATCH 1/7] major rewrite of go-watchdog. This commit introduces a major rewrite of go-watchdog. * HeapDriven and SystemDriven are now distinct run modes. * WIP ProcessDriven that uses cgroups. * Policies are now stateless, pure and greatly simplified. * Policies now return the next utilization at which GC should run. The watchdog enforces that value differently depending on the run mode. * The heap-driven run mode adjusts GOGC dynamically. This places the responsibility on the Go runtime to honour the trigger point, and results in more robust logic that is not vulnerable to very quick bursts within sampling periods. * The heap-driven run mode is no longer polling (interval-driven). Instead, it relies entirely on GC signals. * The Silence and Emergency features of the watermark policy have been removed. If utilization is above the last watermark, the policy will request immediate GC. * Races removed. --- README.md | 20 +- adaptive.go | 74 +++---- adaptive_test.go | 31 +++ go.mod | 3 +- go.sum | 31 ++- log.go | 8 +- sys_linux.go | 24 +++ sys_other.go | 7 + watchdog.go | 523 ++++++++++++++++++++++++++++----------------- watchdog_test.go | 195 +++++++++++------ watermarks.go | 173 ++++----------- watermarks_test.go | 303 ++++---------------------- 12 files changed, 659 insertions(+), 733 deletions(-) create mode 100644 adaptive_test.go create mode 100644 sys_linux.go create mode 100644 sys_other.go diff --git a/README.md b/README.md index e86ab25..0577d32 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,17 @@ [![GoDoc](https://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square)](https://godoc.org/github.com/raulk/go-watchdog) -go-watchdog runs a singleton memory watchdog. It takes system and heap memory -readings periodically, and feeds them to a user-defined policy to determine -whether GC needs to run immediately. +go-watchdog runs a singleton memory watchdog in the process, which watches +memory utilization and forces Go GC in accordance with a user-defined policy. + +There are two kinds of watchdog so far: + +* **heap-driven:** applies a limit to the heap, and obtains current usage through + `runtime.ReadMemStats()`. +* **system-driven:** applies a limit to the total system memory used, and obtains + current usage through [`elastic/go-sigar`](https://github.com/elastic/gosigar). + +A third process-driven watchdog that uses cgroups is underway. This library ships with two policies out of the box: @@ -18,12 +26,6 @@ This library ships with two policies out of the box: You can easily build a custom policy tailored to the allocation patterns of your program. -It is recommended that you set both (a) a memory limit and (b) a scope of -application of that limit (system or heap) when you start the watchdog. -Otherwise, go-watchdog will use the system scope, and will default to the -total system memory as the limit. [elastic/go-sigar](https://github.com/elastic/gosigar) -is used to make the discovery. - ## Why is this even needed? The garbage collector that ships with the go runtime is pretty good in some diff --git a/adaptive.go b/adaptive.go index b8e3a5b..b7bdcd6 100644 --- a/adaptive.go +++ b/adaptive.go @@ -1,54 +1,32 @@ package watchdog -// AdaptivePolicy is a policy that forces GC when the usage surpasses a -// user-configured percentage (Factor) of the available memory that remained -// after the last GC run. +// NewAdaptivePolicy creates a policy that forces GC when the usage surpasses a +// user-configured percentage (factor) of the available memory. // -// TODO tests -type AdaptivePolicy struct { - // Factor determines how much this policy will let the heap expand - // before it triggers. - // - // On every GC run, this policy recalculates the next target as - // (limit-currentHeap)*Factor (i.e. available*Factor). - // - // If the GC target calculated by the runtime is lower than the one - // calculated by this policy, this policy will set the new target, but the - // effect will be nil, since the the go runtime will run GC sooner than us - // anyway. - Factor float64 - - active bool - target uint64 - initialized bool +// This policy recalculates the next target as usage+(limit-usage)*factor, and +// forces immediate GC when used >= limit. +func NewAdaptivePolicy(factor float64) PolicyCtor { + return func(limit uint64) (Policy, error) { + return &adaptivePolicy{ + factor: factor, + limit: limit, + }, nil + } } -var _ Policy = (*AdaptivePolicy)(nil) - -func (a *AdaptivePolicy) Evaluate(input PolicyInput) (trigger bool) { - if !a.initialized { - // when initializing, set the target to the limit; it will be reset - // when the first GC happens. - a.target = input.Limit - a.initialized = true - } - - // determine the value to compare utilisation against. - var actual uint64 - switch input.Scope { - case ScopeSystem: - actual = input.SysStats.ActualUsed - case ScopeHeap: - actual = input.MemStats.HeapAlloc - } - - if input.GCTrigger { - available := float64(input.Limit) - float64(actual) - calc := uint64(available * a.Factor) - a.target = calc - } - if actual >= a.target { - return true - } - return false +type adaptivePolicy struct { + factor float64 + limit uint64 +} + +var _ Policy = (*adaptivePolicy)(nil) + +func (p *adaptivePolicy) Evaluate(_ UtilizationType, used uint64) (next uint64, immediate bool) { + if used >= p.limit { + return used, true + } + + available := float64(p.limit) - float64(used) + next = used + uint64(available*p.factor) + return next, false } diff --git a/adaptive_test.go b/adaptive_test.go new file mode 100644 index 0000000..b30d3b5 --- /dev/null +++ b/adaptive_test.go @@ -0,0 +1,31 @@ +package watchdog + +import ( + "testing" + + "github.com/raulk/clock" + "github.com/stretchr/testify/require" +) + +func TestAdaptivePolicy(t *testing.T) { + clk := clock.NewMock() + Clock = clk + + p, err := NewAdaptivePolicy(0.5)(limit) + require.NoError(t, err) + + // at zero; next = 50%. + next, immediate := p.Evaluate(UtilizationSystem, 0) + require.False(t, immediate) + require.EqualValues(t, limit/2, next) + + // at half; next = 75%. + next, immediate = p.Evaluate(UtilizationSystem, limit/2) + require.False(t, immediate) + require.EqualValues(t, 3*(limit/4), next) + + // at limit; immediate = true. + next, immediate = p.Evaluate(UtilizationSystem, limit) + require.True(t, immediate) + require.EqualValues(t, limit, next) +} diff --git a/go.mod b/go.mod index fd95880..e2c1eaf 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,10 @@ module github.com/raulk/go-watchdog go 1.15 require ( - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327 github.com/elastic/gosigar v0.12.0 github.com/kr/pretty v0.1.0 // indirect github.com/raulk/clock v1.1.0 github.com/stretchr/testify v1.4.0 - golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect ) diff --git a/go.sum b/go.sum index b9232ed..5d43f30 100644 --- a/go.sum +++ b/go.sum @@ -1,27 +1,54 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs= +github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327 h1:7grrpcfCtbZLsjtB0DgMuzs1umsJmpzaHMZ6cO6iAWw= +github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= +github.com/coreos/go-systemd/v22 v22.1.0 h1:kq/SbG2BCKLkDKkjQf5OWwKWUKj1lgs3lFI4PxnR5lg= +github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/elastic/gosigar v0.12.0 h1:AsdhYCJlTudhfOYQyFNgx+fIVTfrDO0V1ST0vHgiapU= github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= +github.com/godbus/dbus/v5 v5.0.3 h1:ZqHaoEF7TBzh4jzPmqVhE/5A1z9of6orkAe5uHoAeME= +github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= +github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= +github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/raulk/clock v1.1.0 h1:dpb29+UKMbLqiU/jqIJptgLR1nn23HLgMY0sTCDza5Y= github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0 h1:8H8QZJ30plJyIVj60H3lr8TZGIq2Fh3Cyrs/ZNg1foU= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9 h1:1/DFK4b7JH8DmkqhUk48onnSfrPzImPoVxuomtbT2nk= +golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= diff --git a/log.go b/log.go index b6977cc..ced2020 100644 --- a/log.go +++ b/log.go @@ -2,17 +2,17 @@ package watchdog import "log" -// Logger is an interface to be implemented by custom loggers. -type Logger interface { +// logger is an interface to be implemented by custom loggers. +type logger interface { Debugf(template string, args ...interface{}) Infof(template string, args ...interface{}) Warnf(template string, args ...interface{}) Errorf(template string, args ...interface{}) } -var _ Logger = (*stdlog)(nil) +var _ logger = (*stdlog)(nil) -// stdlog is a Logger that proxies to a standard log.Logger. +// stdlog is a logger that proxies to a standard log.logger. type stdlog struct { log *log.Logger debug bool diff --git a/sys_linux.go b/sys_linux.go new file mode 100644 index 0000000..ff0f919 --- /dev/null +++ b/sys_linux.go @@ -0,0 +1,24 @@ +package watchdog + +import ( + "github.com/containerd/cgroups" +) + +func ProcessMemoryLimit() uint64 { + var ( + pid = os.Getpid() + memSubsystem = cgroups.SingleSubsystem(cgroups.V1, cgroups.Memory) + ) + cgroup, err := cgroups.Load(memSubsystem, cgroups.PidPath(pid)) + if err != nil { + return 0 + } + metrics, err := cgroup.Stat() + if err != nil { + return 0 + } + if metrics.Memory == nil { + return 0 + } + return metrics.Memory.HierarchicalMemoryLimit +} diff --git a/sys_other.go b/sys_other.go new file mode 100644 index 0000000..fb469f2 --- /dev/null +++ b/sys_other.go @@ -0,0 +1,7 @@ +// +build !linux + +package watchdog + +func ProcessMemoryLimit() uint64 { + return 0 +} diff --git a/watchdog.go b/watchdog.go index 7fc9266..739fe99 100644 --- a/watchdog.go +++ b/watchdog.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "runtime" + "runtime/debug" "sync" "sync/atomic" "time" @@ -12,12 +13,61 @@ import ( "github.com/raulk/clock" ) -// DecimalPrecision is the rounding precision that float calculations will use. -// By default, 4 decimal places. -var DecimalPrecision = 1e4 +// The watchdog is designed to be used as a singleton; global vars are OK for +// that reason. +var ( + // Logger is the logger to use. If nil, it will default to a logger that + // proxies to a standard logger using the "[watchdog]" prefix. + Logger logger = &stdlog{log: log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix)} -// Clock can be used to inject a mock clock for testing. -var Clock = clock.New() + // Clock can be used to inject a mock clock for testing. + Clock = clock.New() + + // NotifyFired, if non-nil, will be called when the policy has fired, + // prior to calling GC, even if GC is disabled. + NotifyFired func() = func() {} +) + +var ( + // ReadMemStats stops the world. But as of go1.9, it should only + // take ~25µs to complete. + // + // Before go1.15, calls to ReadMemStats during an ongoing GC would + // block due to the worldsema lock. As of go1.15, this was optimized + // and the runtime holds on to worldsema less during GC (only during + // sweep termination and mark termination). + // + // For users using go1.14 and earlier, if this call happens during + // GC, it will just block for longer until serviced, but it will not + // take longer in itself. No harm done. + // + // Actual benchmarks + // ----------------- + // + // In Go 1.15.5, ReadMem with no ongoing GC takes ~27µs in a MBP 16 + // i9 busy with another million things. During GC, it takes an + // average of less than 175µs per op. + // + // goos: darwin + // goarch: amd64 + // pkg: github.com/filecoin-project/lotus/api + // BenchmarkReadMemStats-16 44530 27523 ns/op + // BenchmarkReadMemStats-16 43743 26879 ns/op + // BenchmarkReadMemStats-16 45627 26791 ns/op + // BenchmarkReadMemStats-16 44538 26219 ns/op + // BenchmarkReadMemStats-16 44958 26757 ns/op + // BenchmarkReadMemStatsWithGCContention-16 10 183733 p50-ns 211859 p90-ns 211859 p99-ns + // BenchmarkReadMemStatsWithGCContention-16 7 198765 p50-ns 314873 p90-ns 314873 p99-ns + // BenchmarkReadMemStatsWithGCContention-16 10 195151 p50-ns 311408 p90-ns 311408 p99-ns + // BenchmarkReadMemStatsWithGCContention-16 10 217279 p50-ns 295308 p90-ns 295308 p99-ns + // BenchmarkReadMemStatsWithGCContention-16 10 167054 p50-ns 327072 p90-ns 327072 p99-ns + // PASS + // + // See: https://github.com/golang/go/issues/19812 + // See: https://github.com/prometheus/client_golang/issues/403 + memstatsFn = runtime.ReadMemStats + sysmemFn = (*gosigar.Mem).Get +) var ( // ErrAlreadyStarted is returned when the user tries to start the watchdog more than once. @@ -26,239 +76,316 @@ var ( const ( // stateUnstarted represents an unstarted state. - stateUnstarted int64 = iota - // stateStarted represents a started state. - stateStarted + stateUnstarted int32 = iota + // stateRunning represents an operational state. + stateRunning + // stateClosing represents a temporary closing state. + stateClosing ) -// watchdog is a global singleton watchdog. -var watchdog struct { - state int64 - config MemConfig +// _watchdog is a global singleton watchdog. +var _watchdog struct { + state int32 // guarded by atomic, one of state* constants. + + scope UtilizationType closing chan struct{} wg sync.WaitGroup } -// ScopeType defines the scope of the utilisation that we'll apply the limit to. -type ScopeType int +// UtilizationType is the utilization metric in use. +type UtilizationType int const ( - // ScopeSystem specifies that the policy compares against actual used + // UtilizationSystem specifies that the policy compares against actual used // system memory. - ScopeSystem ScopeType = iota - // ScopeHeap specifies that the policy compares against heap used. - ScopeHeap + UtilizationSystem UtilizationType = iota + // UtilizationHeap specifies that the policy compares against heap used. + UtilizationHeap ) -// PolicyInput is the object that's passed to a policy when evaluating it. -type PolicyInput struct { - Scope ScopeType - Limit uint64 - MemStats *runtime.MemStats - SysStats *gosigar.Mem - GCTrigger bool // is this a GC trigger? - Logger Logger -} +// PolicyCtor is a policy constructor. +type PolicyCtor func(limit uint64) (Policy, error) -// Policy encapsulates the logic that the watchdog will run on every tick. +// Policy is polled by the watchdog to determine the next utilisation at which +// a GC should be forced. type Policy interface { - // Evaluate determines whether the policy should fire. It receives the - // limit (either guessed or manually set), go runtime memory stats, and - // system memory stats, amongst other things. It returns whether the policy - // has fired or not. - Evaluate(input PolicyInput) (trigger bool) + // Evaluate determines when the next GC should take place. It receives the + // current usage, and it returns the next usage at which to trigger GC. + // + // The policy can request immediate GC, in which case next should match the + // used memory. + Evaluate(scope UtilizationType, used uint64) (next uint64, immediate bool) } -type MemConfig struct { - // Scope is the scope at which the limit will be applied. - Scope ScopeType - - // Limit is the memory available to this process. If zero, we will fall - // back to querying the system total memory via SIGAR. - Limit uint64 - - // Resolution is the interval at which the watchdog will retrieve memory - // stats and invoke the Policy. - Resolution time.Duration - - // Policy sets the firing policy of this watchdog. - Policy Policy - - // NotifyFired, if non-nil, will be called when the policy has fired, - // prior to calling GC, even if GC is disabled. - NotifyFired func() - - // NotifyOnly, if true, will cause the watchdog to only notify via the - // callbacks, without triggering actual GC. - NotifyOnly bool - - // Logger is the logger to use. If nil, it will default to a logger that - // proxies to a standard logger using the "[watchdog]" prefix. - Logger Logger -} - -// Memory starts the singleton memory watchdog with the provided configuration. -func Memory(config MemConfig) (err error, stop func()) { - if !atomic.CompareAndSwapInt64(&watchdog.state, stateUnstarted, stateStarted) { +// HeapDriven starts a singleton heap-driven watchdog. +// +// The heap-driven watchdog adjusts GOGC dynamically after every GC, to honour +// the policy. When an immediate GC is requested, runtime.GC() is called, and +// the policy is re-evaluated at the end of GC. +// +// It is entirely possible for the policy to keep requesting immediate GC +// repeateadly. This usually signals an emergency situation, and won't prevent +// the program from making progress, since the Go's garbage collection is not +// stop-the-world (for the major part). +// +// A limit value of 0 will error. +func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) { + if !atomic.CompareAndSwapInt32(&_watchdog.state, stateUnstarted, stateRunning) { return ErrAlreadyStarted, nil } - if config.Logger == nil { - config.Logger = &stdlog{log: log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix)} + if limit == 0 { + return fmt.Errorf("cannot use zero limit for heap-driven watchdog"), nil } - // if the user didn't provide a limit, get the total memory. - if config.Limit == 0 { - var mem gosigar.Mem - if err := mem.Get(); err != nil { - return fmt.Errorf("failed to get system memory limit via SIGAR: %w", err), nil + policy, err := policyCtor(limit) + if err != nil { + return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil + } + + _watchdog.scope = UtilizationHeap + _watchdog.closing = make(chan struct{}) + + var gcTriggered = make(chan struct{}, 16) + setupGCSentinel(gcTriggered) + + _watchdog.wg.Add(1) + go func() { + defer _watchdog.wg.Done() + + // get the initial effective GOGC; guess it's 100 (default), and restore + // it to whatever it actually was. This works because SetGCPercent + // returns the previous value. + originalGOGC := debug.SetGCPercent(debug.SetGCPercent(100)) + currGOGC := originalGOGC + + var memstats runtime.MemStats + for { + select { + case <-gcTriggered: + NotifyFired() + + case <-_watchdog.closing: + return + } + + // recompute the next trigger. + memstatsFn(&memstats) + + // heapMarked is the amount of heap that was marked as live by GC. + // it is inferred from our current GOGC and the new target picked. + heapMarked := uint64(float64(memstats.NextGC) / (1 + float64(currGOGC)/100)) + if heapMarked == 0 { + // this shouldn't happen, but just in case; avoiding a div by 0. + continue + } + + // evaluate the policy. + next, immediate := policy.Evaluate(UtilizationHeap, memstats.HeapAlloc) + + if immediate { + // trigger a forced GC; because we're not making the finalizer + // skip sending to the trigger channel, we will get fired again. + // at this stage, the program is under significant pressure, and + // given that Go GC is not STW for the largest part, the worse + // thing that could happen from infinitely GC'ing is that the + // program will run in a degrated state for longer, possibly + // long enough for an operator to intervene. + Logger.Warnf("heap-driven watchdog requested immediate GC; " + + "system is probably under significant pressure; " + + "performance compromised") + forceGC(&memstats) + continue + } + + // calculate how much to set GOGC to honour the next trigger point. + currGOGC = int(((float64(next) / float64(heapMarked)) - float64(1)) * 100) + if currGOGC >= originalGOGC { + Logger.Infof("heap watchdog: requested GOGC percent higher than default; capping at default; requested: %d; default: %d", currGOGC, originalGOGC) + currGOGC = originalGOGC + } else { + if currGOGC < 1 { + currGOGC = 1 + } + Logger.Infof("heap watchdog: setting GOGC percent: %d", currGOGC) + } + + debug.SetGCPercent(currGOGC) + + memstatsFn(&memstats) + Logger.Infof("heap watchdog stats: heap_alloc: %d, heap_marked: %d, next_gc: %d, policy_next_gc: %d, gogc: %d", + memstats.HeapAlloc, heapMarked, memstats.NextGC, next, currGOGC) } - config.Limit = mem.Total - } + }() - watchdog.config = config - watchdog.closing = make(chan struct{}) - - watchdog.wg.Add(1) - go watch() - - return nil, stopMemory + return nil, stop } -func watch() { - var ( - lk sync.Mutex // guards gcTriggered channel, which is drained and flipped to nil on closure. - gcTriggered = make(chan struct{}, 16) +// SystemDriven starts a singleton system-driven watchdog. +// +// The system-driven watchdog keeps a threshold, above which GC will be forced. +// The watchdog polls the system utilization at the specified frequency. When +// the actual utilization exceeds the threshold, a GC is forced. +// +// This threshold is calculated by querying the policy every time that GC runs, +// either triggered by the runtime, or forced by us. +func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) { + if !atomic.CompareAndSwapInt32(&_watchdog.state, stateUnstarted, stateRunning) { + return ErrAlreadyStarted, nil + } - memstats runtime.MemStats - sysmem gosigar.Mem - config = watchdog.config - ) + if limit == 0 { + limit, err = determineLimit(false) + if err != nil { + return err, nil + } + } + + policy, err := policyCtor(limit) + if err != nil { + return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil + } + + _watchdog.scope = UtilizationSystem + _watchdog.closing = make(chan struct{}) + + var gcTriggered = make(chan struct{}, 16) + setupGCSentinel(gcTriggered) + + _watchdog.wg.Add(1) + go func() { + defer _watchdog.wg.Done() + + var ( + memstats runtime.MemStats + sysmem gosigar.Mem + threshold uint64 + ) + + // initialize the threshold. + threshold, immediate := policy.Evaluate(UtilizationSystem, sysmem.ActualUsed) + if immediate { + Logger.Warnf("system-driven watchdog requested immediate GC upon startup; " + + "policy is probably misconfigured; " + + "performance compromised") + forceGC(&memstats) + } + + for { + select { + case <-Clock.After(frequency): + // get the current usage. + if err := sysmemFn(&sysmem); err != nil { + Logger.Warnf("failed to obtain system memory stats; err: %s", err) + continue + } + actual := sysmem.ActualUsed + if actual < threshold { + // nothing to do. + continue + } + // trigger GC; this will emit a gcTriggered event which we'll + // consume next to readjust the threshold. + Logger.Warnf("system-driven watchdog triggering GC; %d/%d bytes (used/threshold)", actual, threshold) + forceGC(&memstats) + + case <-gcTriggered: + NotifyFired() + + // get the current usage. + if err := sysmemFn(&sysmem); err != nil { + Logger.Warnf("failed to obtain system memory stats; err: %s", err) + continue + } + + // adjust the threshold. + threshold, immediate = policy.Evaluate(UtilizationSystem, sysmem.ActualUsed) + if immediate { + Logger.Warnf("system-driven watchdog triggering immediate GC; %d used bytes", sysmem.ActualUsed) + forceGC(&memstats) + } + + case <-_watchdog.closing: + return + } + } + }() + + return nil, stop +} + +func determineLimit(restrictByProcess bool) (uint64, error) { + // TODO. + // if restrictByProcess { + // if pmem := ProcessMemoryLimit(); pmem > 0 { + // Logger.Infof("watchdog using process limit: %d bytes", pmem) + // return pmem, nil + // } + // Logger.Infof("watchdog was unable to determine process limit; falling back to total system memory") + // } + + // populate initial utilisation and system stats. + var sysmem gosigar.Mem + if err := sysmemFn(&sysmem); err != nil { + return 0, fmt.Errorf("failed to get system memory stats: %w", err) + } + return sysmem.Total, nil +} + +// forceGC forces a manual GC. +func forceGC(memstats *runtime.MemStats) { + Logger.Infof("watchdog is forcing GC") + + // it's safe to assume that the finalizer will attempt to run before + // runtime.GC() returns because runtime.GC() waits for the sweep phase to + // finish before returning. + // finalizers are run in the sweep phase. + start := time.Now() + runtime.GC() + took := time.Since(start) + + memstatsFn(memstats) + Logger.Infof("watchdog-triggered GC finished; took: %s; current heap allocated: %d bytes", took, memstats.HeapAlloc) +} + +func setupGCSentinel(gcTriggered chan struct{}) { + logger := Logger // this non-zero sized struct is used as a sentinel to detect when a GC // run has finished, by setting and resetting a finalizer on it. + // it essentially creates a GC notification "flywheel"; every GC will + // trigger this finalizer, which will reset itself so it gets notified + // of the next GC, breaking the cycle when the watchdog is stopped. type sentinel struct{ a *int } - var sentinelObj sentinel var finalizer func(o *sentinel) finalizer = func(o *sentinel) { - lk.Lock() - defer lk.Unlock() - select { - case gcTriggered <- struct{}{}: - default: - config.Logger.Warnf("failed to queue gc trigger; channel backlogged") - } - runtime.SetFinalizer(o, finalizer) - } - finalizer(&sentinelObj) - - defer watchdog.wg.Done() - for { - var eventIsGc bool - select { - case <-Clock.After(config.Resolution): - // exit select. - - case <-gcTriggered: - eventIsGc = true - // exit select. - - case <-watchdog.closing: - runtime.SetFinalizer(&sentinelObj, nil) // clear the sentinel finalizer. - - lk.Lock() - ch := gcTriggered - gcTriggered = nil - lk.Unlock() - - // close and drain - close(ch) - for range ch { - } + if atomic.LoadInt32(&_watchdog.state) != stateRunning { + // this GC triggered after the watchdog was stopped; ignore + // and do not reset the finalizer. return } - // ReadMemStats stops the world. But as of go1.9, it should only - // take ~25µs to complete. - // - // Before go1.15, calls to ReadMemStats during an ongoing GC would - // block due to the worldsema lock. As of go1.15, this was optimized - // and the runtime holds on to worldsema less during GC (only during - // sweep termination and mark termination). - // - // For users using go1.14 and earlier, if this call happens during - // GC, it will just block for longer until serviced, but it will not - // take longer in itself. No harm done. - // - // Actual benchmarks - // ----------------- - // - // In Go 1.15.5, ReadMem with no ongoing GC takes ~27µs in a MBP 16 - // i9 busy with another million things. During GC, it takes an - // average of less than 175µs per op. - // - // goos: darwin - // goarch: amd64 - // pkg: github.com/filecoin-project/lotus/api - // BenchmarkReadMemStats-16 44530 27523 ns/op - // BenchmarkReadMemStats-16 43743 26879 ns/op - // BenchmarkReadMemStats-16 45627 26791 ns/op - // BenchmarkReadMemStats-16 44538 26219 ns/op - // BenchmarkReadMemStats-16 44958 26757 ns/op - // BenchmarkReadMemStatsWithGCContention-16 10 183733 p50-ns 211859 p90-ns 211859 p99-ns - // BenchmarkReadMemStatsWithGCContention-16 7 198765 p50-ns 314873 p90-ns 314873 p99-ns - // BenchmarkReadMemStatsWithGCContention-16 10 195151 p50-ns 311408 p90-ns 311408 p99-ns - // BenchmarkReadMemStatsWithGCContention-16 10 217279 p50-ns 295308 p90-ns 295308 p99-ns - // BenchmarkReadMemStatsWithGCContention-16 10 167054 p50-ns 327072 p90-ns 327072 p99-ns - // PASS - // - // See: https://github.com/golang/go/issues/19812 - // See: https://github.com/prometheus/client_golang/issues/403 + // reset so it triggers on the next GC. + runtime.SetFinalizer(o, finalizer) - if eventIsGc { - config.Logger.Infof("watchdog after GC") - } - - runtime.ReadMemStats(&memstats) - - if err := sysmem.Get(); err != nil { - config.Logger.Warnf("failed to obtain system memory stats; err: %s", err) - } - - trigger := config.Policy.Evaluate(PolicyInput{ - Scope: config.Scope, - Limit: config.Limit, - MemStats: &memstats, - SysStats: &sysmem, - GCTrigger: eventIsGc, - Logger: config.Logger, - }) - - if !trigger { - continue - } - - config.Logger.Infof("watchdog policy fired") - - if f := config.NotifyFired; f != nil { - f() - } - - if !config.NotifyOnly { - config.Logger.Infof("watchdog is triggering GC") - start := time.Now() - runtime.GC() - runtime.ReadMemStats(&memstats) - config.Logger.Infof("watchdog-triggered GC finished; took: %s; current heap allocated: %d bytes", time.Since(start), memstats.HeapAlloc) + select { + case gcTriggered <- struct{}{}: + default: + logger.Warnf("failed to queue gc trigger; channel backlogged") } } + + runtime.SetFinalizer(&sentinel{}, finalizer) // start the flywheel. } -func stopMemory() { - if !atomic.CompareAndSwapInt64(&watchdog.state, stateStarted, stateUnstarted) { +func stop() { + if !atomic.CompareAndSwapInt32(&_watchdog.state, stateRunning, stateClosing) { return } - close(watchdog.closing) - watchdog.wg.Wait() + + close(_watchdog.closing) + _watchdog.wg.Wait() + + atomic.StoreInt32(&_watchdog.state, stateUnstarted) } diff --git a/watchdog_test.go b/watchdog_test.go index ce00e3e..89b8d75 100644 --- a/watchdog_test.go +++ b/watchdog_test.go @@ -1,92 +1,151 @@ package watchdog import ( + "fmt" + "log" + "os" "runtime" + "runtime/debug" "testing" "time" + "github.com/elastic/gosigar" "github.com/raulk/clock" "github.com/stretchr/testify/require" ) -type testPolicy struct { - ch chan *PolicyInput - trigger bool +// These integration tests are a hugely non-deterministic, but necessary to get +// good coverage and confidence. The Go runtime makes its own pacing decisions, +// and those may vary based on machine, OS, kernel memory management, other +// running programs, exogenous memory pressure, and Go runtime versions. +// +// The assertions we use here are lax, but should be sufficient to serve as a +// reasonable litmus test of whether the watchdog is doing what it's supposed +// to or not. + +var ( + limit uint64 = 64 << 20 // 64MiB. +) + +func init() { + Logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true} } -var _ Policy = (*testPolicy)(nil) +func TestControl(t *testing.T) { + debug.SetGCPercent(100) -func (t *testPolicy) Evaluate(input PolicyInput) (trigger bool) { - t.ch <- &input - return t.trigger -} + // retain 1MiB every iteration, up to 100MiB (beyond heap limit!). + var retained [][]byte + for i := 0; i < 100; i++ { + b := make([]byte, 1*1024*1024) + for i := range b { + b[i] = byte(i) + } + retained = append(retained, b) + } -func TestWatchdog(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - tp := &testPolicy{ch: make(chan *PolicyInput, 1)} - notifyCh := make(chan struct{}, 1) - - err, stop := Memory(MemConfig{ - Scope: ScopeHeap, - Limit: 100, - Resolution: 10 * time.Second, - Policy: tp, - NotifyFired: func() { notifyCh <- struct{}{} }, - }) - require.NoError(t, err) - defer stop() - - time.Sleep(500 * time.Millisecond) // wait til the watchdog goroutine starts. - - clk.Add(10 * time.Second) - pi := <-tp.ch - require.EqualValues(t, 100, pi.Limit) - require.NotNil(t, pi.MemStats) - require.NotNil(t, pi.SysStats) - require.Equal(t, ScopeHeap, pi.Scope) - - require.Len(t, notifyCh, 0) + for _, b := range retained { + for i := range b { + b[i] = byte(i) + } + } var ms runtime.MemStats runtime.ReadMemStats(&ms) - require.EqualValues(t, 0, ms.NumGC) - - // now fire. - tp.trigger = true - - clk.Add(10 * time.Second) - pi = <-tp.ch - require.EqualValues(t, 100, pi.Limit) - require.NotNil(t, pi.MemStats) - require.NotNil(t, pi.SysStats) - require.Equal(t, ScopeHeap, pi.Scope) - - time.Sleep(500 * time.Millisecond) // wait until the watchdog runs. - - require.Len(t, notifyCh, 1) - - runtime.ReadMemStats(&ms) - require.EqualValues(t, 1, ms.NumGC) + require.LessOrEqual(t, ms.NumGC, uint32(8)) // a maximum of 8 GCs should've happened. + require.Zero(t, ms.NumForcedGC) // no forced GCs. } -func TestDoubleClose(t *testing.T) { - defer func() { - if r := recover(); r != nil { - t.Fatal(r) - } - }() +func TestHeapDriven(t *testing.T) { + // we can't mock ReadMemStats, because we're relying on the go runtime to + // enforce the GC run, and the go runtime won't use our mock. Therefore, we + // need to do the actual thing. + debug.SetGCPercent(100) - tp := &testPolicy{ch: make(chan *PolicyInput, 1)} + clk := clock.NewMock() + Clock = clk - err, stop := Memory(MemConfig{ - Scope: ScopeHeap, - Limit: 100, - Resolution: 10 * time.Second, - Policy: tp, - }) + observations := make([]*runtime.MemStats, 0, 100) + NotifyFired = func() { + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + observations = append(observations, &ms) + } + + // limit is 64MiB. + err, stopFn := HeapDriven(limit, NewAdaptivePolicy(0.5)) require.NoError(t, err) - stop() - stop() + defer stopFn() + + time.Sleep(500 * time.Millisecond) // give time for the watchdog to init. + + // retain 1MiB every iteration, up to 100MiB (beyond heap limit!). + var retained [][]byte + for i := 0; i < 100; i++ { + retained = append(retained, make([]byte, 1*1024*1024)) + } + + for _, o := range observations { + fmt.Println("heap alloc:", o.HeapAlloc, "next gc:", o.NextGC, "gc count:", o.NumGC, "forced gc:", o.NumForcedGC) + } + + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + require.GreaterOrEqual(t, ms.NumGC, uint32(12)) // over 12 GCs should've taken place. + require.GreaterOrEqual(t, ms.NumForcedGC, uint32(5)) // at least 5 forced GCs. +} + +func TestSystemDriven(t *testing.T) { + debug.SetGCPercent(100) + + clk := clock.NewMock() + Clock = clk + + // mock the system reporting. + var actualUsed uint64 + sysmemFn = func(g *gosigar.Mem) error { + g.ActualUsed = actualUsed + return nil + } + + // limit is 64MiB. + err, stopFn := SystemDriven(limit, 5*time.Second, NewAdaptivePolicy(0.5)) + require.NoError(t, err) + defer stopFn() + + time.Sleep(200 * time.Millisecond) // give time for the watchdog to init. + + notifyCh := make(chan struct{}, 1) + NotifyFired = func() { + notifyCh <- struct{}{} + } + + // first tick; used = 0. + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + require.Len(t, notifyCh, 0) // no GC has taken place. + + // second tick; used = just over 50%; will trigger GC. + actualUsed = (limit / 2) + 1 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + require.Len(t, notifyCh, 1) + <-notifyCh + + // third tick; just below 75%; no GC. + actualUsed = uint64(float64(limit)*0.75) - 1 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + require.Len(t, notifyCh, 0) + + // fourth tick; 75% exactly; will trigger GC. + actualUsed = uint64(float64(limit)*0.75) + 1 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + require.Len(t, notifyCh, 1) + <-notifyCh + + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + require.GreaterOrEqual(t, ms.NumForcedGC, uint32(2)) } diff --git a/watermarks.go b/watermarks.go index 54c986e..b193fd1 100644 --- a/watermarks.go +++ b/watermarks.go @@ -1,142 +1,41 @@ package watchdog -import ( - "math" - "time" -) - -// WatermarkPolicy is a watchdog firing policy that triggers when watermarks are -// surpassed in the increasing direction. -// -// For example, a policy configured with the watermarks 0.50, 0.75, 0.80, and -// 0.99 will trigger at most once, and only once, each time that a watermark -// is surpassed upwards. -// -// Even if utilisation pierces through several watermarks at once between two -// subsequent calls to Evaluate, the policy will fire only once. -// -// It is possible to suppress the watermark policy from firing too often by -// setting a Silence period. When non-zero, if a watermark is surpassed within -// the Silence window (using the last GC timestamp as basis), that event will -// not immediately trigger firing. Instead, the policy will wait until the -// silence period is over, and only then will it fire if utilisation is still -// beyond that watermark. -// -// At last, if an EmergencyWatermark is set, when utilisation is above that -// level, the Silence period will be ignored and the policy will fire -// persistenly, as long as utilisation stays above that watermark. -type WatermarkPolicy struct { - // Watermarks are the percentual amounts of limit. The policy will panic if - // Watermarks is zero length. - Watermarks []float64 - - // EmergencyWatermark is a watermark that, when surpassed, puts this - // watchdog in emergency mode. During emergency mode, the system is - // considered to be under significant memory pressure, and the Quiesce - // period is not honoured. - EmergencyWatermark float64 - - // Silence is the quiet period the watchdog will honour except when in - // emergency mode. - Silence time.Duration - - // internal state. - thresholds []uint64 - currIdx int // idx of the current watermark. - lastIdx int - firedLast bool - silenceNs int64 - initialized bool +// NewWatermarkPolicy creates a watchdog policy that schedules GC at concrete +// watermarks. When queried, it will determine the next trigger point based +// on the current utilisation. If the last watermark is surpassed, +// the policy will request immediate GC. +func NewWatermarkPolicy(watermarks ...float64) PolicyCtor { + return func(limit uint64) (Policy, error) { + p := new(watermarkPolicy) + p.limit = limit + p.thresholds = make([]uint64, 0, len(watermarks)) + for _, m := range watermarks { + p.thresholds = append(p.thresholds, uint64(float64(limit)*m)) + } + Logger.Infof("initialized watermark watchdog policy; watermarks: %v; thresholds: %v", p.watermarks, p.thresholds) + return p, nil + } } -var _ Policy = (*WatermarkPolicy)(nil) - -func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) { - if !w.initialized { - w.thresholds = make([]uint64, 0, len(w.Watermarks)) - for _, m := range w.Watermarks { - w.thresholds = append(w.thresholds, uint64(float64(input.Limit)*m)) - } - w.silenceNs = w.Silence.Nanoseconds() - w.lastIdx = len(w.Watermarks) - 1 - w.initialized = true - input.Logger.Infof("initialized watermark watchdog policy; watermarks: %v; emergency watermark: %f, thresholds: %v; silence period: %s", - w.Watermarks, w.EmergencyWatermark, w.thresholds, w.Silence) - } - - // determine the value to compare utilisation against. - var actual uint64 - switch input.Scope { - case ScopeSystem: - actual = input.SysStats.ActualUsed - case ScopeHeap: - actual = input.MemStats.HeapAlloc - } - - input.Logger.Debugf("watermark policy: evaluating; curr_watermark: %f, utilization: %d/%d/%d (used/curr_threshold/limit)", - w.Watermarks[w.currIdx], actual, w.thresholds[w.currIdx], input.Limit) - - // determine whether we're past the emergency watermark; if we are, we fire - // unconditionally. Disabled if 0. - if w.EmergencyWatermark > 0 { - currPercentage := math.Round((float64(actual)/float64(input.Limit))*DecimalPrecision) / DecimalPrecision // round float. - if pastEmergency := currPercentage >= w.EmergencyWatermark; pastEmergency { - emergencyThreshold := uint64(float64(input.Limit) / w.EmergencyWatermark) - input.Logger.Infof("watermark policy: emergency trigger; perc: %f/%f (%% used/%% emergency), utilization: %d/%d/%d (used/emergency/limit), used: %d, limit: %d, ", - currPercentage, w.EmergencyWatermark, actual, emergencyThreshold, input.Limit) - return true - } - } - - // short-circuit if within the silencing period. - if silencing := w.silenceNs > 0; silencing { - now := Clock.Now().UnixNano() - if elapsed := now - int64(input.MemStats.LastGC); elapsed < w.silenceNs { - input.Logger.Debugf("watermark policy: silenced") - return false - } - } - - // check if the the utilisation is below our current threshold; try - // to downscale before returning false. - if actual < w.thresholds[w.currIdx] { - for w.currIdx > 0 { - if actual >= w.thresholds[w.currIdx-1] { - break - } - w.firedLast = false - w.currIdx-- - } - return false - } - - // we are above our current watermark, but if this is the last watermark and - // we've already fired, suppress the firing. - if w.currIdx == w.lastIdx && w.firedLast { - input.Logger.Debugf("watermark policy: last watermark already fired; skipping") - return false - } - - // if our value is above the last threshold, record the last threshold as - // our current watermark, and also the fact that we've already fired for - // it. - if actual >= w.thresholds[w.lastIdx] { - w.currIdx = w.lastIdx - w.firedLast = true - input.Logger.Infof("watermark policy triggering: %f watermark surpassed", w.Watermarks[w.currIdx]) - return true - } - - input.Logger.Infof("watermark policy triggering: %f watermark surpassed", w.Watermarks[w.currIdx]) - - // we are triggering; update the current watermark, upscaling it to the - // next threshold. - for w.currIdx < w.lastIdx { - w.currIdx++ - if actual < w.thresholds[w.currIdx] { - break - } - } - - return true +type watermarkPolicy struct { + // watermarks are the percentual amounts of limit. + watermarks []float64 + // thresholds are the absolute trigger points of this policy. + thresholds []uint64 + limit uint64 +} + +var _ Policy = (*watermarkPolicy)(nil) + +func (w *watermarkPolicy) Evaluate(_ UtilizationType, used uint64) (next uint64, immediate bool) { + Logger.Debugf("watermark policy: evaluating; utilization: %d/%d (used/limit)", used, w.limit) + var i int + for ; i < len(w.thresholds); i++ { + t := w.thresholds[i] + if used < t { + return t, false + } + } + // we reached the maximum threshold, so fire immediately. + return used, true } diff --git a/watermarks_test.go b/watermarks_test.go index 1d855e7..72d6108 100644 --- a/watermarks_test.go +++ b/watermarks_test.go @@ -1,288 +1,61 @@ package watchdog import ( - "log" - "os" - "runtime" "testing" - "time" - "github.com/elastic/gosigar" "github.com/raulk/clock" "github.com/stretchr/testify/require" ) var ( - limit uint64 = 64 << 20 // 64MiB. - firstWatermark = 0.50 - secondWatermark = 0.75 - thirdWatermark = 0.80 - emergencyWatermark = 0.90 - - logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true} + watermarks = []float64{0.50, 0.75, 0.80} + thresholds = func() []uint64 { + var ret []uint64 + for _, w := range watermarks { + ret = append(ret, uint64(float64(limit)*w)) + } + return ret + }() ) -func TestProgressiveWatermarksSystem(t *testing.T) { +func TestProgressiveWatermarks(t *testing.T) { clk := clock.NewMock() Clock = clk - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark}, - EmergencyWatermark: emergencyWatermark, - } + p, err := NewWatermarkPolicy(watermarks...)(limit) + require.NoError(t, err) - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*firstWatermark) - 1}, - })) + // at zero + next, immediate := p.Evaluate(UtilizationSystem, uint64(0)) + require.False(t, immediate) + require.EqualValues(t, thresholds[0], next) - // trigger the first watermark. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)}, - })) + // before the watermark. + next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])-1) + require.False(t, immediate) + require.EqualValues(t, thresholds[0], next) - // this won't fire because we're still on the same watermark. - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)}, - })) - } + // exactly at the watermark; gives us the next watermark, as the watchdodg would've + // taken care of triggering the first watermark. + next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])) + require.False(t, immediate) + require.EqualValues(t, thresholds[1], next) - // now let's move to the second watermark. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)}, - })) + // after the watermark gives us the next watermark. + next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])+1) + require.False(t, immediate) + require.EqualValues(t, thresholds[1], next) - // this won't fire because we're still on the same watermark. - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)}, - })) - } + // last watermark; always triggers. + next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2])) + require.True(t, immediate) + require.EqualValues(t, uint64(float64(limit)*watermarks[2]), next) - // now let's move to the third and last watermark. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * thirdWatermark)}, - })) + next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2]+1)) + require.True(t, immediate) + require.EqualValues(t, uint64(float64(limit)*watermarks[2])+1, next) - // this won't fire because we're still on the same watermark. - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * thirdWatermark)}, - })) - } -} - -func TestProgressiveWatermarksHeap(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark}, - } - - // now back up step by step, check that all of them fire. - for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} { - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeHeap, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0), HeapAlloc: uint64(float64(limit) * wm)}, - SysStats: &gosigar.Mem{ActualUsed: 0}, - })) - } -} - -func TestDownscalingWatermarks_Reentrancy(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark}, - } - - // crank all the way to the top. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: limit}, - })) - - // now back down, checking that none of the boundaries fire. - for _, wm := range []float64{thirdWatermark, secondWatermark, firstWatermark} { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*wm - 1)}, - })) - } - - // now back up step by step, check that all of them fire. - for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} { - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*wm + 1)}, - })) - } - - // check the top does not fire because it already fired. - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: limit}, - })) - } -} - -func TestEmergencyWatermark(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark}, - EmergencyWatermark: emergencyWatermark, - Silence: 1 * time.Minute, - } - - // every tick triggers, even within the silence period. - for i := 0; i < 100; i++ { - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * emergencyWatermark)}, - })) - } -} - -func TestJumpWatermark(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark}, - } - - // check that jumping to the top only fires once. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: limit}, - })) - - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: limit}, - })) - } - -} - -func TestSilencePeriod(t *testing.T) { - clk := clock.NewMock() - Clock = clk - - var ( - limit uint64 = 64 << 20 // 64MiB. - firstWatermark = 0.50 - secondWatermark = 0.75 - thirdWatermark = 0.80 - ) - - p := WatermarkPolicy{ - Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark}, - Silence: 1 * time.Minute, - } - - // going above the first threshold, but within silencing period, so nothing happens. - for i := 0; i < 100; i++ { - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)}, - })) - } - - // now outside the silencing period, we do fire. - clk.Add(time.Minute) - - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: 0}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)}, - })) - - // but not the second time. - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: 0}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)}, - })) - - // now let's go up inside the silencing period, nothing happens. - require.False(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)}, - })) - - // same thing, outside the silencing period should trigger. - require.True(t, p.Evaluate(PolicyInput{ - Logger: logger, - Scope: ScopeSystem, - Limit: limit, - MemStats: &runtime.MemStats{LastGC: uint64(0)}, - SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)}, - })) + next, immediate = p.Evaluate(UtilizationSystem, limit) + require.True(t, immediate) + require.EqualValues(t, limit, next) } From b35cdf0c7daa270000be77045975eb4452978836 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 9 Dec 2020 14:54:09 +0000 Subject: [PATCH 2/7] replace atomic state guarding with lock. --- watchdog.go | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/watchdog.go b/watchdog.go index 739fe99..de48619 100644 --- a/watchdog.go +++ b/watchdog.go @@ -6,7 +6,6 @@ import ( "runtime" "runtime/debug" "sync" - "sync/atomic" "time" "github.com/elastic/gosigar" @@ -79,13 +78,12 @@ const ( stateUnstarted int32 = iota // stateRunning represents an operational state. stateRunning - // stateClosing represents a temporary closing state. - stateClosing ) // _watchdog is a global singleton watchdog. var _watchdog struct { - state int32 // guarded by atomic, one of state* constants. + lk sync.Mutex + state int32 scope UtilizationType @@ -131,7 +129,10 @@ type Policy interface { // // A limit value of 0 will error. func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) { - if !atomic.CompareAndSwapInt32(&_watchdog.state, stateUnstarted, stateRunning) { + _watchdog.lk.Lock() + defer _watchdog.lk.Unlock() + + if _watchdog.state != stateUnstarted { return ErrAlreadyStarted, nil } @@ -144,6 +145,7 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil } + _watchdog.state = stateRunning _watchdog.scope = UtilizationHeap _watchdog.closing = make(chan struct{}) @@ -231,7 +233,10 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) // This threshold is calculated by querying the policy every time that GC runs, // either triggered by the runtime, or forced by us. func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) { - if !atomic.CompareAndSwapInt32(&_watchdog.state, stateUnstarted, stateRunning) { + _watchdog.lk.Lock() + defer _watchdog.lk.Unlock() + + if _watchdog.state != stateUnstarted { return ErrAlreadyStarted, nil } @@ -247,6 +252,7 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil } + _watchdog.state = stateRunning _watchdog.scope = UtilizationSystem _watchdog.closing = make(chan struct{}) @@ -360,7 +366,10 @@ func setupGCSentinel(gcTriggered chan struct{}) { type sentinel struct{ a *int } var finalizer func(o *sentinel) finalizer = func(o *sentinel) { - if atomic.LoadInt32(&_watchdog.state) != stateRunning { + _watchdog.lk.Lock() + defer _watchdog.lk.Unlock() + + if _watchdog.state != stateRunning { // this GC triggered after the watchdog was stopped; ignore // and do not reset the finalizer. return @@ -380,12 +389,14 @@ func setupGCSentinel(gcTriggered chan struct{}) { } func stop() { - if !atomic.CompareAndSwapInt32(&_watchdog.state, stateRunning, stateClosing) { + _watchdog.lk.Lock() + defer _watchdog.lk.Unlock() + + if _watchdog.state != stateRunning { return } close(_watchdog.closing) _watchdog.wg.Wait() - - atomic.StoreInt32(&_watchdog.state, stateUnstarted) + _watchdog.state = stateUnstarted } From fcf668bfe2e5542e898d2822ba0f514051c6d9a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 9 Dec 2020 14:54:56 +0000 Subject: [PATCH 3/7] nit fixing. --- watchdog.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/watchdog.go b/watchdog.go index de48619..9b0c22b 100644 --- a/watchdog.go +++ b/watchdog.go @@ -149,7 +149,7 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) _watchdog.scope = UtilizationHeap _watchdog.closing = make(chan struct{}) - var gcTriggered = make(chan struct{}, 16) + gcTriggered := make(chan struct{}, 16) setupGCSentinel(gcTriggered) _watchdog.wg.Add(1) @@ -256,7 +256,7 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) _watchdog.scope = UtilizationSystem _watchdog.closing = make(chan struct{}) - var gcTriggered = make(chan struct{}, 16) + gcTriggered := make(chan struct{}, 16) setupGCSentinel(gcTriggered) _watchdog.wg.Add(1) From a4d30cafdce4592634b29b7c68827e6fb65b4ff4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 9 Dec 2020 14:56:03 +0000 Subject: [PATCH 4/7] add log warning. --- watchdog.go | 1 + 1 file changed, 1 insertion(+) diff --git a/watchdog.go b/watchdog.go index 9b0c22b..b8fa111 100644 --- a/watchdog.go +++ b/watchdog.go @@ -180,6 +180,7 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) heapMarked := uint64(float64(memstats.NextGC) / (1 + float64(currGOGC)/100)) if heapMarked == 0 { // this shouldn't happen, but just in case; avoiding a div by 0. + Logger.Warnf("heap-driven watchdog: inferred zero heap marked; skipping evaluation") continue } From 12a1d3f0531dda5d40c005af6b0e136f0e904764 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 9 Dec 2020 14:58:17 +0000 Subject: [PATCH 5/7] demote log to debug. --- watchdog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/watchdog.go b/watchdog.go index b8fa111..f2d5168 100644 --- a/watchdog.go +++ b/watchdog.go @@ -205,7 +205,7 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) // calculate how much to set GOGC to honour the next trigger point. currGOGC = int(((float64(next) / float64(heapMarked)) - float64(1)) * 100) if currGOGC >= originalGOGC { - Logger.Infof("heap watchdog: requested GOGC percent higher than default; capping at default; requested: %d; default: %d", currGOGC, originalGOGC) + Logger.Debugf("heap watchdog: requested GOGC percent higher than default; capping at default; requested: %d; default: %d", currGOGC, originalGOGC) currGOGC = originalGOGC } else { if currGOGC < 1 { From 5f00469e3a758dd830cbaf8c2f7359a00cc40c76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 9 Dec 2020 15:35:23 +0000 Subject: [PATCH 6/7] remove 'immediate' flag in policies. --- adaptive.go | 9 +++--- adaptive_test.go | 11 +++---- watchdog.go | 71 ++++++++++++++++------------------------------ watchdog_test.go | 3 +- watermarks.go | 11 +++---- watermarks_test.go | 29 +++++++------------ 6 files changed, 50 insertions(+), 84 deletions(-) diff --git a/adaptive.go b/adaptive.go index b7bdcd6..73c932b 100644 --- a/adaptive.go +++ b/adaptive.go @@ -3,8 +3,7 @@ package watchdog // NewAdaptivePolicy creates a policy that forces GC when the usage surpasses a // user-configured percentage (factor) of the available memory. // -// This policy recalculates the next target as usage+(limit-usage)*factor, and -// forces immediate GC when used >= limit. +// This policy recalculates the next target as usage+(limit-usage)*factor. func NewAdaptivePolicy(factor float64) PolicyCtor { return func(limit uint64) (Policy, error) { return &adaptivePolicy{ @@ -21,12 +20,12 @@ type adaptivePolicy struct { var _ Policy = (*adaptivePolicy)(nil) -func (p *adaptivePolicy) Evaluate(_ UtilizationType, used uint64) (next uint64, immediate bool) { +func (p *adaptivePolicy) Evaluate(_ UtilizationType, used uint64) (next uint64) { if used >= p.limit { - return used, true + return used } available := float64(p.limit) - float64(used) next = used + uint64(available*p.factor) - return next, false + return next } diff --git a/adaptive_test.go b/adaptive_test.go index b30d3b5..c696bcc 100644 --- a/adaptive_test.go +++ b/adaptive_test.go @@ -15,17 +15,14 @@ func TestAdaptivePolicy(t *testing.T) { require.NoError(t, err) // at zero; next = 50%. - next, immediate := p.Evaluate(UtilizationSystem, 0) - require.False(t, immediate) + next := p.Evaluate(UtilizationSystem, 0) require.EqualValues(t, limit/2, next) // at half; next = 75%. - next, immediate = p.Evaluate(UtilizationSystem, limit/2) - require.False(t, immediate) + next = p.Evaluate(UtilizationSystem, limit/2) require.EqualValues(t, 3*(limit/4), next) - // at limit; immediate = true. - next, immediate = p.Evaluate(UtilizationSystem, limit) - require.True(t, immediate) + // at limit. + next = p.Evaluate(UtilizationSystem, limit) require.EqualValues(t, limit, next) } diff --git a/watchdog.go b/watchdog.go index f2d5168..03ca2e6 100644 --- a/watchdog.go +++ b/watchdog.go @@ -3,6 +3,7 @@ package watchdog import ( "fmt" "log" + "math" "runtime" "runtime/debug" "sync" @@ -12,6 +13,11 @@ import ( "github.com/raulk/clock" ) +// PolicyTempDisabled is a marker value for policies to signal that the policy +// is temporarily disabled. Use it when all hope is lost to turn around from +// significant memory pressure (such as when above an "extreme" watermark). +const PolicyTempDisabled uint64 = math.MaxUint64 + // The watchdog is designed to be used as a singleton; global vars are OK for // that reason. var ( @@ -110,24 +116,15 @@ type PolicyCtor func(limit uint64) (Policy, error) type Policy interface { // Evaluate determines when the next GC should take place. It receives the // current usage, and it returns the next usage at which to trigger GC. - // - // The policy can request immediate GC, in which case next should match the - // used memory. - Evaluate(scope UtilizationType, used uint64) (next uint64, immediate bool) + Evaluate(scope UtilizationType, used uint64) (next uint64) } // HeapDriven starts a singleton heap-driven watchdog. // // The heap-driven watchdog adjusts GOGC dynamically after every GC, to honour -// the policy. When an immediate GC is requested, runtime.GC() is called, and -// the policy is re-evaluated at the end of GC. +// the policy requirements. // -// It is entirely possible for the policy to keep requesting immediate GC -// repeateadly. This usually signals an emergency situation, and won't prevent -// the program from making progress, since the Go's garbage collection is not -// stop-the-world (for the major part). -// -// A limit value of 0 will error. +// A zero-valued limit will error. func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) { _watchdog.lk.Lock() defer _watchdog.lk.Unlock() @@ -185,24 +182,11 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) } // evaluate the policy. - next, immediate := policy.Evaluate(UtilizationHeap, memstats.HeapAlloc) - - if immediate { - // trigger a forced GC; because we're not making the finalizer - // skip sending to the trigger channel, we will get fired again. - // at this stage, the program is under significant pressure, and - // given that Go GC is not STW for the largest part, the worse - // thing that could happen from infinitely GC'ing is that the - // program will run in a degrated state for longer, possibly - // long enough for an operator to intervene. - Logger.Warnf("heap-driven watchdog requested immediate GC; " + - "system is probably under significant pressure; " + - "performance compromised") - forceGC(&memstats) - continue - } + next := policy.Evaluate(UtilizationHeap, memstats.HeapAlloc) // calculate how much to set GOGC to honour the next trigger point. + // next=PolicyTempDisabled value would make currGOGC extremely high, + // greater than originalGOGC, and therefore we'd restore originalGOGC. currGOGC = int(((float64(next) / float64(heapMarked)) - float64(1)) * 100) if currGOGC >= originalGOGC { Logger.Debugf("heap watchdog: requested GOGC percent higher than default; capping at default; requested: %d; default: %d", currGOGC, originalGOGC) @@ -270,15 +254,19 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) threshold uint64 ) - // initialize the threshold. - threshold, immediate := policy.Evaluate(UtilizationSystem, sysmem.ActualUsed) - if immediate { - Logger.Warnf("system-driven watchdog requested immediate GC upon startup; " + - "policy is probably misconfigured; " + - "performance compromised") - forceGC(&memstats) + renewThreshold := func() { + // get the current usage. + if err := sysmemFn(&sysmem); err != nil { + Logger.Warnf("failed to obtain system memory stats; err: %s", err) + return + } + // calculate the threshold. + threshold = policy.Evaluate(UtilizationSystem, sysmem.ActualUsed) } + // initialize the threshold. + renewThreshold() + for { select { case <-Clock.After(frequency): @@ -300,18 +288,7 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) case <-gcTriggered: NotifyFired() - // get the current usage. - if err := sysmemFn(&sysmem); err != nil { - Logger.Warnf("failed to obtain system memory stats; err: %s", err) - continue - } - - // adjust the threshold. - threshold, immediate = policy.Evaluate(UtilizationSystem, sysmem.ActualUsed) - if immediate { - Logger.Warnf("system-driven watchdog triggering immediate GC; %d used bytes", sysmem.ActualUsed) - forceGC(&memstats) - } + renewThreshold() case <-_watchdog.closing: return diff --git a/watchdog_test.go b/watchdog_test.go index 89b8d75..c456032 100644 --- a/watchdog_test.go +++ b/watchdog_test.go @@ -91,8 +91,7 @@ func TestHeapDriven(t *testing.T) { var ms runtime.MemStats runtime.ReadMemStats(&ms) - require.GreaterOrEqual(t, ms.NumGC, uint32(12)) // over 12 GCs should've taken place. - require.GreaterOrEqual(t, ms.NumForcedGC, uint32(5)) // at least 5 forced GCs. + require.GreaterOrEqual(t, ms.NumGC, uint32(10)) // over 12 GCs should've taken place. } func TestSystemDriven(t *testing.T) { diff --git a/watermarks.go b/watermarks.go index b193fd1..cced767 100644 --- a/watermarks.go +++ b/watermarks.go @@ -3,7 +3,8 @@ package watchdog // NewWatermarkPolicy creates a watchdog policy that schedules GC at concrete // watermarks. When queried, it will determine the next trigger point based // on the current utilisation. If the last watermark is surpassed, -// the policy will request immediate GC. +// the policy will be disarmed. It is recommended to set an extreme watermark +// as the last element (e.g. 0.99) to prevent the policy from disarming too soon. func NewWatermarkPolicy(watermarks ...float64) PolicyCtor { return func(limit uint64) (Policy, error) { p := new(watermarkPolicy) @@ -27,15 +28,15 @@ type watermarkPolicy struct { var _ Policy = (*watermarkPolicy)(nil) -func (w *watermarkPolicy) Evaluate(_ UtilizationType, used uint64) (next uint64, immediate bool) { +func (w *watermarkPolicy) Evaluate(_ UtilizationType, used uint64) (next uint64) { Logger.Debugf("watermark policy: evaluating; utilization: %d/%d (used/limit)", used, w.limit) var i int for ; i < len(w.thresholds); i++ { t := w.thresholds[i] if used < t { - return t, false + return t } } - // we reached the maximum threshold, so fire immediately. - return used, true + // we reached the maximum threshold, so we disable this policy. + return PolicyTempDisabled } diff --git a/watermarks_test.go b/watermarks_test.go index 72d6108..d3ee3ee 100644 --- a/watermarks_test.go +++ b/watermarks_test.go @@ -26,36 +26,29 @@ func TestProgressiveWatermarks(t *testing.T) { require.NoError(t, err) // at zero - next, immediate := p.Evaluate(UtilizationSystem, uint64(0)) - require.False(t, immediate) + next := p.Evaluate(UtilizationSystem, uint64(0)) require.EqualValues(t, thresholds[0], next) // before the watermark. - next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])-1) - require.False(t, immediate) + next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])-1) require.EqualValues(t, thresholds[0], next) // exactly at the watermark; gives us the next watermark, as the watchdodg would've // taken care of triggering the first watermark. - next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])) - require.False(t, immediate) + next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])) require.EqualValues(t, thresholds[1], next) // after the watermark gives us the next watermark. - next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])+1) - require.False(t, immediate) + next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])+1) require.EqualValues(t, thresholds[1], next) - // last watermark; always triggers. - next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2])) - require.True(t, immediate) - require.EqualValues(t, uint64(float64(limit)*watermarks[2]), next) + // last watermark; disable the policy. + next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2])) + require.EqualValues(t, PolicyTempDisabled, next) - next, immediate = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2]+1)) - require.True(t, immediate) - require.EqualValues(t, uint64(float64(limit)*watermarks[2])+1, next) + next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2]+1)) + require.EqualValues(t, PolicyTempDisabled, next) - next, immediate = p.Evaluate(UtilizationSystem, limit) - require.True(t, immediate) - require.EqualValues(t, limit, next) + next = p.Evaluate(UtilizationSystem, limit) + require.EqualValues(t, PolicyTempDisabled, next) } From 1df6596e03f2a473639a7bb36628d15ebd97a2c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 9 Dec 2020 16:13:27 +0000 Subject: [PATCH 7/7] recycle timer. --- watchdog.go | 15 ++++++++++++++- watchdog_test.go | 4 ++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/watchdog.go b/watchdog.go index 03ca2e6..466161f 100644 --- a/watchdog.go +++ b/watchdog.go @@ -267,9 +267,19 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) // initialize the threshold. renewThreshold() + // initialize an empty timer. + timer := Clock.Timer(0) + stopTimer := func() { + if !timer.Stop() { + <-timer.C + } + } + for { + timer.Reset(frequency) + select { - case <-Clock.After(frequency): + case <-timer.C: // get the current usage. if err := sysmemFn(&sysmem); err != nil { Logger.Warnf("failed to obtain system memory stats; err: %s", err) @@ -290,7 +300,10 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) renewThreshold() + stopTimer() + case <-_watchdog.closing: + stopTimer() return } } diff --git a/watchdog_test.go b/watchdog_test.go index c456032..67bb6e3 100644 --- a/watchdog_test.go +++ b/watchdog_test.go @@ -52,7 +52,7 @@ func TestControl(t *testing.T) { var ms runtime.MemStats runtime.ReadMemStats(&ms) - require.LessOrEqual(t, ms.NumGC, uint32(8)) // a maximum of 8 GCs should've happened. + require.LessOrEqual(t, ms.NumGC, uint32(5)) // a maximum of 8 GCs should've happened. require.Zero(t, ms.NumForcedGC) // no forced GCs. } @@ -91,7 +91,7 @@ func TestHeapDriven(t *testing.T) { var ms runtime.MemStats runtime.ReadMemStats(&ms) - require.GreaterOrEqual(t, ms.NumGC, uint32(10)) // over 12 GCs should've taken place. + require.GreaterOrEqual(t, ms.NumGC, uint32(9)) // over 9 GCs should've taken place. } func TestSystemDriven(t *testing.T) {