mirror of
https://github.com/status-im/go-watchdog.git
synced 2025-01-31 11:45:10 +00:00
Merge pull request #4 from raulk/refactor
This commit is contained in:
commit
a99678239d
20
README.md
20
README.md
@ -5,9 +5,17 @@
|
||||
[![godocs](https://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square)](https://godoc.org/github.com/raulk/go-watchdog)
|
||||
[![build status](https://circleci.com/gh/raulk/go-watchdog.svg?style=svg)](<LINK>)
|
||||
|
||||
go-watchdog runs a singleton memory watchdog. It takes system and heap memory
|
||||
readings periodically, and feeds them to a user-defined policy to determine
|
||||
whether GC needs to run immediately.
|
||||
go-watchdog runs a singleton memory watchdog in the process, which watches
|
||||
memory utilization and forces Go GC in accordance with a user-defined policy.
|
||||
|
||||
There are two kinds of watchdog so far:
|
||||
|
||||
* **heap-driven:** applies a limit to the heap, and obtains current usage through
|
||||
`runtime.ReadMemStats()`.
|
||||
* **system-driven:** applies a limit to the total system memory used, and obtains
|
||||
current usage through [`elastic/go-sigar`](https://github.com/elastic/gosigar).
|
||||
|
||||
A third process-driven watchdog that uses cgroups is underway.
|
||||
|
||||
This library ships with two policies out of the box:
|
||||
|
||||
@ -19,12 +27,6 @@ This library ships with two policies out of the box:
|
||||
You can easily build a custom policy tailored to the allocation patterns of your
|
||||
program.
|
||||
|
||||
It is recommended that you set both (a) a memory limit and (b) a scope of
|
||||
application of that limit (system or heap) when you start the watchdog.
|
||||
Otherwise, go-watchdog will use the system scope, and will default to the
|
||||
total system memory as the limit. [elastic/go-sigar](https://github.com/elastic/gosigar)
|
||||
is used to make the discovery.
|
||||
|
||||
## Why is this even needed?
|
||||
|
||||
The garbage collector that ships with the go runtime is pretty good in some
|
||||
|
73
adaptive.go
73
adaptive.go
@ -1,54 +1,31 @@
|
||||
package watchdog
|
||||
|
||||
// AdaptivePolicy is a policy that forces GC when the usage surpasses a
|
||||
// user-configured percentage (Factor) of the available memory that remained
|
||||
// after the last GC run.
|
||||
// NewAdaptivePolicy creates a policy that forces GC when the usage surpasses a
|
||||
// user-configured percentage (factor) of the available memory.
|
||||
//
|
||||
// TODO tests
|
||||
type AdaptivePolicy struct {
|
||||
// Factor determines how much this policy will let the heap expand
|
||||
// before it triggers.
|
||||
//
|
||||
// On every GC run, this policy recalculates the next target as
|
||||
// (limit-currentHeap)*Factor (i.e. available*Factor).
|
||||
//
|
||||
// If the GC target calculated by the runtime is lower than the one
|
||||
// calculated by this policy, this policy will set the new target, but the
|
||||
// effect will be nil, since the the go runtime will run GC sooner than us
|
||||
// anyway.
|
||||
Factor float64
|
||||
|
||||
active bool
|
||||
target uint64
|
||||
initialized bool
|
||||
// This policy recalculates the next target as usage+(limit-usage)*factor.
|
||||
func NewAdaptivePolicy(factor float64) PolicyCtor {
|
||||
return func(limit uint64) (Policy, error) {
|
||||
return &adaptivePolicy{
|
||||
factor: factor,
|
||||
limit: limit,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
var _ Policy = (*AdaptivePolicy)(nil)
|
||||
|
||||
func (a *AdaptivePolicy) Evaluate(input PolicyInput) (trigger bool) {
|
||||
if !a.initialized {
|
||||
// when initializing, set the target to the limit; it will be reset
|
||||
// when the first GC happens.
|
||||
a.target = input.Limit
|
||||
a.initialized = true
|
||||
}
|
||||
|
||||
// determine the value to compare utilisation against.
|
||||
var actual uint64
|
||||
switch input.Scope {
|
||||
case ScopeSystem:
|
||||
actual = input.SysStats.ActualUsed
|
||||
case ScopeHeap:
|
||||
actual = input.MemStats.HeapAlloc
|
||||
}
|
||||
|
||||
if input.GCTrigger {
|
||||
available := float64(input.Limit) - float64(actual)
|
||||
calc := uint64(available * a.Factor)
|
||||
a.target = calc
|
||||
}
|
||||
if actual >= a.target {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
type adaptivePolicy struct {
|
||||
factor float64
|
||||
limit uint64
|
||||
}
|
||||
|
||||
var _ Policy = (*adaptivePolicy)(nil)
|
||||
|
||||
func (p *adaptivePolicy) Evaluate(_ UtilizationType, used uint64) (next uint64) {
|
||||
if used >= p.limit {
|
||||
return used
|
||||
}
|
||||
|
||||
available := float64(p.limit) - float64(used)
|
||||
next = used + uint64(available*p.factor)
|
||||
return next
|
||||
}
|
||||
|
28
adaptive_test.go
Normal file
28
adaptive_test.go
Normal file
@ -0,0 +1,28 @@
|
||||
package watchdog
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/raulk/clock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAdaptivePolicy(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p, err := NewAdaptivePolicy(0.5)(limit)
|
||||
require.NoError(t, err)
|
||||
|
||||
// at zero; next = 50%.
|
||||
next := p.Evaluate(UtilizationSystem, 0)
|
||||
require.EqualValues(t, limit/2, next)
|
||||
|
||||
// at half; next = 75%.
|
||||
next = p.Evaluate(UtilizationSystem, limit/2)
|
||||
require.EqualValues(t, 3*(limit/4), next)
|
||||
|
||||
// at limit.
|
||||
next = p.Evaluate(UtilizationSystem, limit)
|
||||
require.EqualValues(t, limit, next)
|
||||
}
|
3
go.mod
3
go.mod
@ -3,11 +3,10 @@ module github.com/raulk/go-watchdog
|
||||
go 1.15
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327
|
||||
github.com/elastic/gosigar v0.12.0
|
||||
github.com/kr/pretty v0.1.0 // indirect
|
||||
github.com/raulk/clock v1.1.0
|
||||
github.com/stretchr/testify v1.4.0
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
|
||||
)
|
||||
|
31
go.sum
31
go.sum
@ -1,27 +1,54 @@
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs=
|
||||
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327 h1:7grrpcfCtbZLsjtB0DgMuzs1umsJmpzaHMZ6cO6iAWw=
|
||||
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE=
|
||||
github.com/coreos/go-systemd/v22 v22.1.0 h1:kq/SbG2BCKLkDKkjQf5OWwKWUKj1lgs3lFI4PxnR5lg=
|
||||
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
|
||||
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
|
||||
github.com/elastic/gosigar v0.12.0 h1:AsdhYCJlTudhfOYQyFNgx+fIVTfrDO0V1ST0vHgiapU=
|
||||
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
|
||||
github.com/godbus/dbus/v5 v5.0.3 h1:ZqHaoEF7TBzh4jzPmqVhE/5A1z9of6orkAe5uHoAeME=
|
||||
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0=
|
||||
github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/raulk/clock v1.1.0 h1:dpb29+UKMbLqiU/jqIJptgLR1nn23HLgMY0sTCDza5Y=
|
||||
github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0=
|
||||
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
|
||||
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||
golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0 h1:8H8QZJ30plJyIVj60H3lr8TZGIq2Fh3Cyrs/ZNg1foU=
|
||||
golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9 h1:1/DFK4b7JH8DmkqhUk48onnSfrPzImPoVxuomtbT2nk=
|
||||
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
|
8
log.go
8
log.go
@ -2,17 +2,17 @@ package watchdog
|
||||
|
||||
import "log"
|
||||
|
||||
// Logger is an interface to be implemented by custom loggers.
|
||||
type Logger interface {
|
||||
// logger is an interface to be implemented by custom loggers.
|
||||
type logger interface {
|
||||
Debugf(template string, args ...interface{})
|
||||
Infof(template string, args ...interface{})
|
||||
Warnf(template string, args ...interface{})
|
||||
Errorf(template string, args ...interface{})
|
||||
}
|
||||
|
||||
var _ Logger = (*stdlog)(nil)
|
||||
var _ logger = (*stdlog)(nil)
|
||||
|
||||
// stdlog is a Logger that proxies to a standard log.Logger.
|
||||
// stdlog is a logger that proxies to a standard log.logger.
|
||||
type stdlog struct {
|
||||
log *log.Logger
|
||||
debug bool
|
||||
|
24
sys_linux.go
Normal file
24
sys_linux.go
Normal file
@ -0,0 +1,24 @@
|
||||
package watchdog
|
||||
|
||||
import (
|
||||
"github.com/containerd/cgroups"
|
||||
)
|
||||
|
||||
func ProcessMemoryLimit() uint64 {
|
||||
var (
|
||||
pid = os.Getpid()
|
||||
memSubsystem = cgroups.SingleSubsystem(cgroups.V1, cgroups.Memory)
|
||||
)
|
||||
cgroup, err := cgroups.Load(memSubsystem, cgroups.PidPath(pid))
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
metrics, err := cgroup.Stat()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
if metrics.Memory == nil {
|
||||
return 0
|
||||
}
|
||||
return metrics.Memory.HierarchicalMemoryLimit
|
||||
}
|
7
sys_other.go
Normal file
7
sys_other.go
Normal file
@ -0,0 +1,7 @@
|
||||
// +build !linux
|
||||
|
||||
package watchdog
|
||||
|
||||
func ProcessMemoryLimit() uint64 {
|
||||
return 0
|
||||
}
|
523
watchdog.go
523
watchdog.go
@ -3,21 +3,76 @@ package watchdog
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/elastic/gosigar"
|
||||
"github.com/raulk/clock"
|
||||
)
|
||||
|
||||
// DecimalPrecision is the rounding precision that float calculations will use.
|
||||
// By default, 4 decimal places.
|
||||
var DecimalPrecision = 1e4
|
||||
// PolicyTempDisabled is a marker value for policies to signal that the policy
|
||||
// is temporarily disabled. Use it when all hope is lost to turn around from
|
||||
// significant memory pressure (such as when above an "extreme" watermark).
|
||||
const PolicyTempDisabled uint64 = math.MaxUint64
|
||||
|
||||
// Clock can be used to inject a mock clock for testing.
|
||||
var Clock = clock.New()
|
||||
// The watchdog is designed to be used as a singleton; global vars are OK for
|
||||
// that reason.
|
||||
var (
|
||||
// Logger is the logger to use. If nil, it will default to a logger that
|
||||
// proxies to a standard logger using the "[watchdog]" prefix.
|
||||
Logger logger = &stdlog{log: log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix)}
|
||||
|
||||
// Clock can be used to inject a mock clock for testing.
|
||||
Clock = clock.New()
|
||||
|
||||
// NotifyFired, if non-nil, will be called when the policy has fired,
|
||||
// prior to calling GC, even if GC is disabled.
|
||||
NotifyFired func() = func() {}
|
||||
)
|
||||
|
||||
var (
|
||||
// ReadMemStats stops the world. But as of go1.9, it should only
|
||||
// take ~25µs to complete.
|
||||
//
|
||||
// Before go1.15, calls to ReadMemStats during an ongoing GC would
|
||||
// block due to the worldsema lock. As of go1.15, this was optimized
|
||||
// and the runtime holds on to worldsema less during GC (only during
|
||||
// sweep termination and mark termination).
|
||||
//
|
||||
// For users using go1.14 and earlier, if this call happens during
|
||||
// GC, it will just block for longer until serviced, but it will not
|
||||
// take longer in itself. No harm done.
|
||||
//
|
||||
// Actual benchmarks
|
||||
// -----------------
|
||||
//
|
||||
// In Go 1.15.5, ReadMem with no ongoing GC takes ~27µs in a MBP 16
|
||||
// i9 busy with another million things. During GC, it takes an
|
||||
// average of less than 175µs per op.
|
||||
//
|
||||
// goos: darwin
|
||||
// goarch: amd64
|
||||
// pkg: github.com/filecoin-project/lotus/api
|
||||
// BenchmarkReadMemStats-16 44530 27523 ns/op
|
||||
// BenchmarkReadMemStats-16 43743 26879 ns/op
|
||||
// BenchmarkReadMemStats-16 45627 26791 ns/op
|
||||
// BenchmarkReadMemStats-16 44538 26219 ns/op
|
||||
// BenchmarkReadMemStats-16 44958 26757 ns/op
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 183733 p50-ns 211859 p90-ns 211859 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 7 198765 p50-ns 314873 p90-ns 314873 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 195151 p50-ns 311408 p90-ns 311408 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 217279 p50-ns 295308 p90-ns 295308 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 167054 p50-ns 327072 p90-ns 327072 p99-ns
|
||||
// PASS
|
||||
//
|
||||
// See: https://github.com/golang/go/issues/19812
|
||||
// See: https://github.com/prometheus/client_golang/issues/403
|
||||
memstatsFn = runtime.ReadMemStats
|
||||
sysmemFn = (*gosigar.Mem).Get
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrAlreadyStarted is returned when the user tries to start the watchdog more than once.
|
||||
@ -26,239 +81,313 @@ var (
|
||||
|
||||
const (
|
||||
// stateUnstarted represents an unstarted state.
|
||||
stateUnstarted int64 = iota
|
||||
// stateStarted represents a started state.
|
||||
stateStarted
|
||||
stateUnstarted int32 = iota
|
||||
// stateRunning represents an operational state.
|
||||
stateRunning
|
||||
)
|
||||
|
||||
// watchdog is a global singleton watchdog.
|
||||
var watchdog struct {
|
||||
state int64
|
||||
config MemConfig
|
||||
// _watchdog is a global singleton watchdog.
|
||||
var _watchdog struct {
|
||||
lk sync.Mutex
|
||||
state int32
|
||||
|
||||
scope UtilizationType
|
||||
|
||||
closing chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// ScopeType defines the scope of the utilisation that we'll apply the limit to.
|
||||
type ScopeType int
|
||||
// UtilizationType is the utilization metric in use.
|
||||
type UtilizationType int
|
||||
|
||||
const (
|
||||
// ScopeSystem specifies that the policy compares against actual used
|
||||
// UtilizationSystem specifies that the policy compares against actual used
|
||||
// system memory.
|
||||
ScopeSystem ScopeType = iota
|
||||
// ScopeHeap specifies that the policy compares against heap used.
|
||||
ScopeHeap
|
||||
UtilizationSystem UtilizationType = iota
|
||||
// UtilizationHeap specifies that the policy compares against heap used.
|
||||
UtilizationHeap
|
||||
)
|
||||
|
||||
// PolicyInput is the object that's passed to a policy when evaluating it.
|
||||
type PolicyInput struct {
|
||||
Scope ScopeType
|
||||
Limit uint64
|
||||
MemStats *runtime.MemStats
|
||||
SysStats *gosigar.Mem
|
||||
GCTrigger bool // is this a GC trigger?
|
||||
Logger Logger
|
||||
}
|
||||
// PolicyCtor is a policy constructor.
|
||||
type PolicyCtor func(limit uint64) (Policy, error)
|
||||
|
||||
// Policy encapsulates the logic that the watchdog will run on every tick.
|
||||
// Policy is polled by the watchdog to determine the next utilisation at which
|
||||
// a GC should be forced.
|
||||
type Policy interface {
|
||||
// Evaluate determines whether the policy should fire. It receives the
|
||||
// limit (either guessed or manually set), go runtime memory stats, and
|
||||
// system memory stats, amongst other things. It returns whether the policy
|
||||
// has fired or not.
|
||||
Evaluate(input PolicyInput) (trigger bool)
|
||||
// Evaluate determines when the next GC should take place. It receives the
|
||||
// current usage, and it returns the next usage at which to trigger GC.
|
||||
Evaluate(scope UtilizationType, used uint64) (next uint64)
|
||||
}
|
||||
|
||||
type MemConfig struct {
|
||||
// Scope is the scope at which the limit will be applied.
|
||||
Scope ScopeType
|
||||
// HeapDriven starts a singleton heap-driven watchdog.
|
||||
//
|
||||
// The heap-driven watchdog adjusts GOGC dynamically after every GC, to honour
|
||||
// the policy requirements.
|
||||
//
|
||||
// A zero-valued limit will error.
|
||||
func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) {
|
||||
_watchdog.lk.Lock()
|
||||
defer _watchdog.lk.Unlock()
|
||||
|
||||
// Limit is the memory available to this process. If zero, we will fall
|
||||
// back to querying the system total memory via SIGAR.
|
||||
Limit uint64
|
||||
|
||||
// Resolution is the interval at which the watchdog will retrieve memory
|
||||
// stats and invoke the Policy.
|
||||
Resolution time.Duration
|
||||
|
||||
// Policy sets the firing policy of this watchdog.
|
||||
Policy Policy
|
||||
|
||||
// NotifyFired, if non-nil, will be called when the policy has fired,
|
||||
// prior to calling GC, even if GC is disabled.
|
||||
NotifyFired func()
|
||||
|
||||
// NotifyOnly, if true, will cause the watchdog to only notify via the
|
||||
// callbacks, without triggering actual GC.
|
||||
NotifyOnly bool
|
||||
|
||||
// Logger is the logger to use. If nil, it will default to a logger that
|
||||
// proxies to a standard logger using the "[watchdog]" prefix.
|
||||
Logger Logger
|
||||
}
|
||||
|
||||
// Memory starts the singleton memory watchdog with the provided configuration.
|
||||
func Memory(config MemConfig) (err error, stop func()) {
|
||||
if !atomic.CompareAndSwapInt64(&watchdog.state, stateUnstarted, stateStarted) {
|
||||
if _watchdog.state != stateUnstarted {
|
||||
return ErrAlreadyStarted, nil
|
||||
}
|
||||
|
||||
if config.Logger == nil {
|
||||
config.Logger = &stdlog{log: log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix)}
|
||||
if limit == 0 {
|
||||
return fmt.Errorf("cannot use zero limit for heap-driven watchdog"), nil
|
||||
}
|
||||
|
||||
// if the user didn't provide a limit, get the total memory.
|
||||
if config.Limit == 0 {
|
||||
var mem gosigar.Mem
|
||||
if err := mem.Get(); err != nil {
|
||||
return fmt.Errorf("failed to get system memory limit via SIGAR: %w", err), nil
|
||||
policy, err := policyCtor(limit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
|
||||
}
|
||||
|
||||
_watchdog.state = stateRunning
|
||||
_watchdog.scope = UtilizationHeap
|
||||
_watchdog.closing = make(chan struct{})
|
||||
|
||||
gcTriggered := make(chan struct{}, 16)
|
||||
setupGCSentinel(gcTriggered)
|
||||
|
||||
_watchdog.wg.Add(1)
|
||||
go func() {
|
||||
defer _watchdog.wg.Done()
|
||||
|
||||
// get the initial effective GOGC; guess it's 100 (default), and restore
|
||||
// it to whatever it actually was. This works because SetGCPercent
|
||||
// returns the previous value.
|
||||
originalGOGC := debug.SetGCPercent(debug.SetGCPercent(100))
|
||||
currGOGC := originalGOGC
|
||||
|
||||
var memstats runtime.MemStats
|
||||
for {
|
||||
select {
|
||||
case <-gcTriggered:
|
||||
NotifyFired()
|
||||
|
||||
case <-_watchdog.closing:
|
||||
return
|
||||
}
|
||||
|
||||
// recompute the next trigger.
|
||||
memstatsFn(&memstats)
|
||||
|
||||
// heapMarked is the amount of heap that was marked as live by GC.
|
||||
// it is inferred from our current GOGC and the new target picked.
|
||||
heapMarked := uint64(float64(memstats.NextGC) / (1 + float64(currGOGC)/100))
|
||||
if heapMarked == 0 {
|
||||
// this shouldn't happen, but just in case; avoiding a div by 0.
|
||||
Logger.Warnf("heap-driven watchdog: inferred zero heap marked; skipping evaluation")
|
||||
continue
|
||||
}
|
||||
|
||||
// evaluate the policy.
|
||||
next := policy.Evaluate(UtilizationHeap, memstats.HeapAlloc)
|
||||
|
||||
// calculate how much to set GOGC to honour the next trigger point.
|
||||
// next=PolicyTempDisabled value would make currGOGC extremely high,
|
||||
// greater than originalGOGC, and therefore we'd restore originalGOGC.
|
||||
currGOGC = int(((float64(next) / float64(heapMarked)) - float64(1)) * 100)
|
||||
if currGOGC >= originalGOGC {
|
||||
Logger.Debugf("heap watchdog: requested GOGC percent higher than default; capping at default; requested: %d; default: %d", currGOGC, originalGOGC)
|
||||
currGOGC = originalGOGC
|
||||
} else {
|
||||
if currGOGC < 1 {
|
||||
currGOGC = 1
|
||||
}
|
||||
Logger.Infof("heap watchdog: setting GOGC percent: %d", currGOGC)
|
||||
}
|
||||
|
||||
debug.SetGCPercent(currGOGC)
|
||||
|
||||
memstatsFn(&memstats)
|
||||
Logger.Infof("heap watchdog stats: heap_alloc: %d, heap_marked: %d, next_gc: %d, policy_next_gc: %d, gogc: %d",
|
||||
memstats.HeapAlloc, heapMarked, memstats.NextGC, next, currGOGC)
|
||||
}
|
||||
config.Limit = mem.Total
|
||||
}
|
||||
}()
|
||||
|
||||
watchdog.config = config
|
||||
watchdog.closing = make(chan struct{})
|
||||
|
||||
watchdog.wg.Add(1)
|
||||
go watch()
|
||||
|
||||
return nil, stopMemory
|
||||
return nil, stop
|
||||
}
|
||||
|
||||
func watch() {
|
||||
var (
|
||||
lk sync.Mutex // guards gcTriggered channel, which is drained and flipped to nil on closure.
|
||||
gcTriggered = make(chan struct{}, 16)
|
||||
// SystemDriven starts a singleton system-driven watchdog.
|
||||
//
|
||||
// The system-driven watchdog keeps a threshold, above which GC will be forced.
|
||||
// The watchdog polls the system utilization at the specified frequency. When
|
||||
// the actual utilization exceeds the threshold, a GC is forced.
|
||||
//
|
||||
// This threshold is calculated by querying the policy every time that GC runs,
|
||||
// either triggered by the runtime, or forced by us.
|
||||
func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) {
|
||||
_watchdog.lk.Lock()
|
||||
defer _watchdog.lk.Unlock()
|
||||
|
||||
memstats runtime.MemStats
|
||||
sysmem gosigar.Mem
|
||||
config = watchdog.config
|
||||
)
|
||||
if _watchdog.state != stateUnstarted {
|
||||
return ErrAlreadyStarted, nil
|
||||
}
|
||||
|
||||
if limit == 0 {
|
||||
limit, err = determineLimit(false)
|
||||
if err != nil {
|
||||
return err, nil
|
||||
}
|
||||
}
|
||||
|
||||
policy, err := policyCtor(limit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
|
||||
}
|
||||
|
||||
_watchdog.state = stateRunning
|
||||
_watchdog.scope = UtilizationSystem
|
||||
_watchdog.closing = make(chan struct{})
|
||||
|
||||
gcTriggered := make(chan struct{}, 16)
|
||||
setupGCSentinel(gcTriggered)
|
||||
|
||||
_watchdog.wg.Add(1)
|
||||
go func() {
|
||||
defer _watchdog.wg.Done()
|
||||
|
||||
var (
|
||||
memstats runtime.MemStats
|
||||
sysmem gosigar.Mem
|
||||
threshold uint64
|
||||
)
|
||||
|
||||
renewThreshold := func() {
|
||||
// get the current usage.
|
||||
if err := sysmemFn(&sysmem); err != nil {
|
||||
Logger.Warnf("failed to obtain system memory stats; err: %s", err)
|
||||
return
|
||||
}
|
||||
// calculate the threshold.
|
||||
threshold = policy.Evaluate(UtilizationSystem, sysmem.ActualUsed)
|
||||
}
|
||||
|
||||
// initialize the threshold.
|
||||
renewThreshold()
|
||||
|
||||
// initialize an empty timer.
|
||||
timer := Clock.Timer(0)
|
||||
stopTimer := func() {
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
timer.Reset(frequency)
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
// get the current usage.
|
||||
if err := sysmemFn(&sysmem); err != nil {
|
||||
Logger.Warnf("failed to obtain system memory stats; err: %s", err)
|
||||
continue
|
||||
}
|
||||
actual := sysmem.ActualUsed
|
||||
if actual < threshold {
|
||||
// nothing to do.
|
||||
continue
|
||||
}
|
||||
// trigger GC; this will emit a gcTriggered event which we'll
|
||||
// consume next to readjust the threshold.
|
||||
Logger.Warnf("system-driven watchdog triggering GC; %d/%d bytes (used/threshold)", actual, threshold)
|
||||
forceGC(&memstats)
|
||||
|
||||
case <-gcTriggered:
|
||||
NotifyFired()
|
||||
|
||||
renewThreshold()
|
||||
|
||||
stopTimer()
|
||||
|
||||
case <-_watchdog.closing:
|
||||
stopTimer()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil, stop
|
||||
}
|
||||
|
||||
func determineLimit(restrictByProcess bool) (uint64, error) {
|
||||
// TODO.
|
||||
// if restrictByProcess {
|
||||
// if pmem := ProcessMemoryLimit(); pmem > 0 {
|
||||
// Logger.Infof("watchdog using process limit: %d bytes", pmem)
|
||||
// return pmem, nil
|
||||
// }
|
||||
// Logger.Infof("watchdog was unable to determine process limit; falling back to total system memory")
|
||||
// }
|
||||
|
||||
// populate initial utilisation and system stats.
|
||||
var sysmem gosigar.Mem
|
||||
if err := sysmemFn(&sysmem); err != nil {
|
||||
return 0, fmt.Errorf("failed to get system memory stats: %w", err)
|
||||
}
|
||||
return sysmem.Total, nil
|
||||
}
|
||||
|
||||
// forceGC forces a manual GC.
|
||||
func forceGC(memstats *runtime.MemStats) {
|
||||
Logger.Infof("watchdog is forcing GC")
|
||||
|
||||
// it's safe to assume that the finalizer will attempt to run before
|
||||
// runtime.GC() returns because runtime.GC() waits for the sweep phase to
|
||||
// finish before returning.
|
||||
// finalizers are run in the sweep phase.
|
||||
start := time.Now()
|
||||
runtime.GC()
|
||||
took := time.Since(start)
|
||||
|
||||
memstatsFn(memstats)
|
||||
Logger.Infof("watchdog-triggered GC finished; took: %s; current heap allocated: %d bytes", took, memstats.HeapAlloc)
|
||||
}
|
||||
|
||||
func setupGCSentinel(gcTriggered chan struct{}) {
|
||||
logger := Logger
|
||||
|
||||
// this non-zero sized struct is used as a sentinel to detect when a GC
|
||||
// run has finished, by setting and resetting a finalizer on it.
|
||||
// it essentially creates a GC notification "flywheel"; every GC will
|
||||
// trigger this finalizer, which will reset itself so it gets notified
|
||||
// of the next GC, breaking the cycle when the watchdog is stopped.
|
||||
type sentinel struct{ a *int }
|
||||
var sentinelObj sentinel
|
||||
var finalizer func(o *sentinel)
|
||||
finalizer = func(o *sentinel) {
|
||||
lk.Lock()
|
||||
defer lk.Unlock()
|
||||
select {
|
||||
case gcTriggered <- struct{}{}:
|
||||
default:
|
||||
config.Logger.Warnf("failed to queue gc trigger; channel backlogged")
|
||||
}
|
||||
runtime.SetFinalizer(o, finalizer)
|
||||
}
|
||||
finalizer(&sentinelObj)
|
||||
_watchdog.lk.Lock()
|
||||
defer _watchdog.lk.Unlock()
|
||||
|
||||
defer watchdog.wg.Done()
|
||||
for {
|
||||
var eventIsGc bool
|
||||
select {
|
||||
case <-Clock.After(config.Resolution):
|
||||
// exit select.
|
||||
|
||||
case <-gcTriggered:
|
||||
eventIsGc = true
|
||||
// exit select.
|
||||
|
||||
case <-watchdog.closing:
|
||||
runtime.SetFinalizer(&sentinelObj, nil) // clear the sentinel finalizer.
|
||||
|
||||
lk.Lock()
|
||||
ch := gcTriggered
|
||||
gcTriggered = nil
|
||||
lk.Unlock()
|
||||
|
||||
// close and drain
|
||||
close(ch)
|
||||
for range ch {
|
||||
}
|
||||
if _watchdog.state != stateRunning {
|
||||
// this GC triggered after the watchdog was stopped; ignore
|
||||
// and do not reset the finalizer.
|
||||
return
|
||||
}
|
||||
|
||||
// ReadMemStats stops the world. But as of go1.9, it should only
|
||||
// take ~25µs to complete.
|
||||
//
|
||||
// Before go1.15, calls to ReadMemStats during an ongoing GC would
|
||||
// block due to the worldsema lock. As of go1.15, this was optimized
|
||||
// and the runtime holds on to worldsema less during GC (only during
|
||||
// sweep termination and mark termination).
|
||||
//
|
||||
// For users using go1.14 and earlier, if this call happens during
|
||||
// GC, it will just block for longer until serviced, but it will not
|
||||
// take longer in itself. No harm done.
|
||||
//
|
||||
// Actual benchmarks
|
||||
// -----------------
|
||||
//
|
||||
// In Go 1.15.5, ReadMem with no ongoing GC takes ~27µs in a MBP 16
|
||||
// i9 busy with another million things. During GC, it takes an
|
||||
// average of less than 175µs per op.
|
||||
//
|
||||
// goos: darwin
|
||||
// goarch: amd64
|
||||
// pkg: github.com/filecoin-project/lotus/api
|
||||
// BenchmarkReadMemStats-16 44530 27523 ns/op
|
||||
// BenchmarkReadMemStats-16 43743 26879 ns/op
|
||||
// BenchmarkReadMemStats-16 45627 26791 ns/op
|
||||
// BenchmarkReadMemStats-16 44538 26219 ns/op
|
||||
// BenchmarkReadMemStats-16 44958 26757 ns/op
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 183733 p50-ns 211859 p90-ns 211859 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 7 198765 p50-ns 314873 p90-ns 314873 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 195151 p50-ns 311408 p90-ns 311408 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 217279 p50-ns 295308 p90-ns 295308 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 167054 p50-ns 327072 p90-ns 327072 p99-ns
|
||||
// PASS
|
||||
//
|
||||
// See: https://github.com/golang/go/issues/19812
|
||||
// See: https://github.com/prometheus/client_golang/issues/403
|
||||
// reset so it triggers on the next GC.
|
||||
runtime.SetFinalizer(o, finalizer)
|
||||
|
||||
if eventIsGc {
|
||||
config.Logger.Infof("watchdog after GC")
|
||||
}
|
||||
|
||||
runtime.ReadMemStats(&memstats)
|
||||
|
||||
if err := sysmem.Get(); err != nil {
|
||||
config.Logger.Warnf("failed to obtain system memory stats; err: %s", err)
|
||||
}
|
||||
|
||||
trigger := config.Policy.Evaluate(PolicyInput{
|
||||
Scope: config.Scope,
|
||||
Limit: config.Limit,
|
||||
MemStats: &memstats,
|
||||
SysStats: &sysmem,
|
||||
GCTrigger: eventIsGc,
|
||||
Logger: config.Logger,
|
||||
})
|
||||
|
||||
if !trigger {
|
||||
continue
|
||||
}
|
||||
|
||||
config.Logger.Infof("watchdog policy fired")
|
||||
|
||||
if f := config.NotifyFired; f != nil {
|
||||
f()
|
||||
}
|
||||
|
||||
if !config.NotifyOnly {
|
||||
config.Logger.Infof("watchdog is triggering GC")
|
||||
start := time.Now()
|
||||
runtime.GC()
|
||||
runtime.ReadMemStats(&memstats)
|
||||
config.Logger.Infof("watchdog-triggered GC finished; took: %s; current heap allocated: %d bytes", time.Since(start), memstats.HeapAlloc)
|
||||
select {
|
||||
case gcTriggered <- struct{}{}:
|
||||
default:
|
||||
logger.Warnf("failed to queue gc trigger; channel backlogged")
|
||||
}
|
||||
}
|
||||
|
||||
runtime.SetFinalizer(&sentinel{}, finalizer) // start the flywheel.
|
||||
}
|
||||
|
||||
func stopMemory() {
|
||||
if !atomic.CompareAndSwapInt64(&watchdog.state, stateStarted, stateUnstarted) {
|
||||
func stop() {
|
||||
_watchdog.lk.Lock()
|
||||
defer _watchdog.lk.Unlock()
|
||||
|
||||
if _watchdog.state != stateRunning {
|
||||
return
|
||||
}
|
||||
close(watchdog.closing)
|
||||
watchdog.wg.Wait()
|
||||
|
||||
close(_watchdog.closing)
|
||||
_watchdog.wg.Wait()
|
||||
_watchdog.state = stateUnstarted
|
||||
}
|
||||
|
194
watchdog_test.go
194
watchdog_test.go
@ -1,92 +1,150 @@
|
||||
package watchdog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/elastic/gosigar"
|
||||
"github.com/raulk/clock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testPolicy struct {
|
||||
ch chan *PolicyInput
|
||||
trigger bool
|
||||
// These integration tests are a hugely non-deterministic, but necessary to get
|
||||
// good coverage and confidence. The Go runtime makes its own pacing decisions,
|
||||
// and those may vary based on machine, OS, kernel memory management, other
|
||||
// running programs, exogenous memory pressure, and Go runtime versions.
|
||||
//
|
||||
// The assertions we use here are lax, but should be sufficient to serve as a
|
||||
// reasonable litmus test of whether the watchdog is doing what it's supposed
|
||||
// to or not.
|
||||
|
||||
var (
|
||||
limit uint64 = 64 << 20 // 64MiB.
|
||||
)
|
||||
|
||||
func init() {
|
||||
Logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true}
|
||||
}
|
||||
|
||||
var _ Policy = (*testPolicy)(nil)
|
||||
func TestControl(t *testing.T) {
|
||||
debug.SetGCPercent(100)
|
||||
|
||||
func (t *testPolicy) Evaluate(input PolicyInput) (trigger bool) {
|
||||
t.ch <- &input
|
||||
return t.trigger
|
||||
}
|
||||
// retain 1MiB every iteration, up to 100MiB (beyond heap limit!).
|
||||
var retained [][]byte
|
||||
for i := 0; i < 100; i++ {
|
||||
b := make([]byte, 1*1024*1024)
|
||||
for i := range b {
|
||||
b[i] = byte(i)
|
||||
}
|
||||
retained = append(retained, b)
|
||||
}
|
||||
|
||||
func TestWatchdog(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
tp := &testPolicy{ch: make(chan *PolicyInput, 1)}
|
||||
notifyCh := make(chan struct{}, 1)
|
||||
|
||||
err, stop := Memory(MemConfig{
|
||||
Scope: ScopeHeap,
|
||||
Limit: 100,
|
||||
Resolution: 10 * time.Second,
|
||||
Policy: tp,
|
||||
NotifyFired: func() { notifyCh <- struct{}{} },
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer stop()
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // wait til the watchdog goroutine starts.
|
||||
|
||||
clk.Add(10 * time.Second)
|
||||
pi := <-tp.ch
|
||||
require.EqualValues(t, 100, pi.Limit)
|
||||
require.NotNil(t, pi.MemStats)
|
||||
require.NotNil(t, pi.SysStats)
|
||||
require.Equal(t, ScopeHeap, pi.Scope)
|
||||
|
||||
require.Len(t, notifyCh, 0)
|
||||
for _, b := range retained {
|
||||
for i := range b {
|
||||
b[i] = byte(i)
|
||||
}
|
||||
}
|
||||
|
||||
var ms runtime.MemStats
|
||||
runtime.ReadMemStats(&ms)
|
||||
require.EqualValues(t, 0, ms.NumGC)
|
||||
|
||||
// now fire.
|
||||
tp.trigger = true
|
||||
|
||||
clk.Add(10 * time.Second)
|
||||
pi = <-tp.ch
|
||||
require.EqualValues(t, 100, pi.Limit)
|
||||
require.NotNil(t, pi.MemStats)
|
||||
require.NotNil(t, pi.SysStats)
|
||||
require.Equal(t, ScopeHeap, pi.Scope)
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // wait until the watchdog runs.
|
||||
|
||||
require.Len(t, notifyCh, 1)
|
||||
|
||||
runtime.ReadMemStats(&ms)
|
||||
require.EqualValues(t, 1, ms.NumGC)
|
||||
require.LessOrEqual(t, ms.NumGC, uint32(5)) // a maximum of 8 GCs should've happened.
|
||||
require.Zero(t, ms.NumForcedGC) // no forced GCs.
|
||||
}
|
||||
|
||||
func TestDoubleClose(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatal(r)
|
||||
}
|
||||
}()
|
||||
func TestHeapDriven(t *testing.T) {
|
||||
// we can't mock ReadMemStats, because we're relying on the go runtime to
|
||||
// enforce the GC run, and the go runtime won't use our mock. Therefore, we
|
||||
// need to do the actual thing.
|
||||
debug.SetGCPercent(100)
|
||||
|
||||
tp := &testPolicy{ch: make(chan *PolicyInput, 1)}
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
err, stop := Memory(MemConfig{
|
||||
Scope: ScopeHeap,
|
||||
Limit: 100,
|
||||
Resolution: 10 * time.Second,
|
||||
Policy: tp,
|
||||
})
|
||||
observations := make([]*runtime.MemStats, 0, 100)
|
||||
NotifyFired = func() {
|
||||
var ms runtime.MemStats
|
||||
runtime.ReadMemStats(&ms)
|
||||
observations = append(observations, &ms)
|
||||
}
|
||||
|
||||
// limit is 64MiB.
|
||||
err, stopFn := HeapDriven(limit, NewAdaptivePolicy(0.5))
|
||||
require.NoError(t, err)
|
||||
stop()
|
||||
stop()
|
||||
defer stopFn()
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // give time for the watchdog to init.
|
||||
|
||||
// retain 1MiB every iteration, up to 100MiB (beyond heap limit!).
|
||||
var retained [][]byte
|
||||
for i := 0; i < 100; i++ {
|
||||
retained = append(retained, make([]byte, 1*1024*1024))
|
||||
}
|
||||
|
||||
for _, o := range observations {
|
||||
fmt.Println("heap alloc:", o.HeapAlloc, "next gc:", o.NextGC, "gc count:", o.NumGC, "forced gc:", o.NumForcedGC)
|
||||
}
|
||||
|
||||
var ms runtime.MemStats
|
||||
runtime.ReadMemStats(&ms)
|
||||
require.GreaterOrEqual(t, ms.NumGC, uint32(9)) // over 9 GCs should've taken place.
|
||||
}
|
||||
|
||||
func TestSystemDriven(t *testing.T) {
|
||||
debug.SetGCPercent(100)
|
||||
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
// mock the system reporting.
|
||||
var actualUsed uint64
|
||||
sysmemFn = func(g *gosigar.Mem) error {
|
||||
g.ActualUsed = actualUsed
|
||||
return nil
|
||||
}
|
||||
|
||||
// limit is 64MiB.
|
||||
err, stopFn := SystemDriven(limit, 5*time.Second, NewAdaptivePolicy(0.5))
|
||||
require.NoError(t, err)
|
||||
defer stopFn()
|
||||
|
||||
time.Sleep(200 * time.Millisecond) // give time for the watchdog to init.
|
||||
|
||||
notifyCh := make(chan struct{}, 1)
|
||||
NotifyFired = func() {
|
||||
notifyCh <- struct{}{}
|
||||
}
|
||||
|
||||
// first tick; used = 0.
|
||||
clk.Add(5 * time.Second)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
require.Len(t, notifyCh, 0) // no GC has taken place.
|
||||
|
||||
// second tick; used = just over 50%; will trigger GC.
|
||||
actualUsed = (limit / 2) + 1
|
||||
clk.Add(5 * time.Second)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
require.Len(t, notifyCh, 1)
|
||||
<-notifyCh
|
||||
|
||||
// third tick; just below 75%; no GC.
|
||||
actualUsed = uint64(float64(limit)*0.75) - 1
|
||||
clk.Add(5 * time.Second)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
require.Len(t, notifyCh, 0)
|
||||
|
||||
// fourth tick; 75% exactly; will trigger GC.
|
||||
actualUsed = uint64(float64(limit)*0.75) + 1
|
||||
clk.Add(5 * time.Second)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
require.Len(t, notifyCh, 1)
|
||||
<-notifyCh
|
||||
|
||||
var ms runtime.MemStats
|
||||
runtime.ReadMemStats(&ms)
|
||||
require.GreaterOrEqual(t, ms.NumForcedGC, uint32(2))
|
||||
}
|
||||
|
174
watermarks.go
174
watermarks.go
@ -1,142 +1,42 @@
|
||||
package watchdog
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
// WatermarkPolicy is a watchdog firing policy that triggers when watermarks are
|
||||
// surpassed in the increasing direction.
|
||||
//
|
||||
// For example, a policy configured with the watermarks 0.50, 0.75, 0.80, and
|
||||
// 0.99 will trigger at most once, and only once, each time that a watermark
|
||||
// is surpassed upwards.
|
||||
//
|
||||
// Even if utilisation pierces through several watermarks at once between two
|
||||
// subsequent calls to Evaluate, the policy will fire only once.
|
||||
//
|
||||
// It is possible to suppress the watermark policy from firing too often by
|
||||
// setting a Silence period. When non-zero, if a watermark is surpassed within
|
||||
// the Silence window (using the last GC timestamp as basis), that event will
|
||||
// not immediately trigger firing. Instead, the policy will wait until the
|
||||
// silence period is over, and only then will it fire if utilisation is still
|
||||
// beyond that watermark.
|
||||
//
|
||||
// At last, if an EmergencyWatermark is set, when utilisation is above that
|
||||
// level, the Silence period will be ignored and the policy will fire
|
||||
// persistenly, as long as utilisation stays above that watermark.
|
||||
type WatermarkPolicy struct {
|
||||
// Watermarks are the percentual amounts of limit. The policy will panic if
|
||||
// Watermarks is zero length.
|
||||
Watermarks []float64
|
||||
|
||||
// EmergencyWatermark is a watermark that, when surpassed, puts this
|
||||
// watchdog in emergency mode. During emergency mode, the system is
|
||||
// considered to be under significant memory pressure, and the Quiesce
|
||||
// period is not honoured.
|
||||
EmergencyWatermark float64
|
||||
|
||||
// Silence is the quiet period the watchdog will honour except when in
|
||||
// emergency mode.
|
||||
Silence time.Duration
|
||||
|
||||
// internal state.
|
||||
thresholds []uint64
|
||||
currIdx int // idx of the current watermark.
|
||||
lastIdx int
|
||||
firedLast bool
|
||||
silenceNs int64
|
||||
initialized bool
|
||||
// NewWatermarkPolicy creates a watchdog policy that schedules GC at concrete
|
||||
// watermarks. When queried, it will determine the next trigger point based
|
||||
// on the current utilisation. If the last watermark is surpassed,
|
||||
// the policy will be disarmed. It is recommended to set an extreme watermark
|
||||
// as the last element (e.g. 0.99) to prevent the policy from disarming too soon.
|
||||
func NewWatermarkPolicy(watermarks ...float64) PolicyCtor {
|
||||
return func(limit uint64) (Policy, error) {
|
||||
p := new(watermarkPolicy)
|
||||
p.limit = limit
|
||||
p.thresholds = make([]uint64, 0, len(watermarks))
|
||||
for _, m := range watermarks {
|
||||
p.thresholds = append(p.thresholds, uint64(float64(limit)*m))
|
||||
}
|
||||
Logger.Infof("initialized watermark watchdog policy; watermarks: %v; thresholds: %v", p.watermarks, p.thresholds)
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
|
||||
var _ Policy = (*WatermarkPolicy)(nil)
|
||||
|
||||
func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) {
|
||||
if !w.initialized {
|
||||
w.thresholds = make([]uint64, 0, len(w.Watermarks))
|
||||
for _, m := range w.Watermarks {
|
||||
w.thresholds = append(w.thresholds, uint64(float64(input.Limit)*m))
|
||||
}
|
||||
w.silenceNs = w.Silence.Nanoseconds()
|
||||
w.lastIdx = len(w.Watermarks) - 1
|
||||
w.initialized = true
|
||||
input.Logger.Infof("initialized watermark watchdog policy; watermarks: %v; emergency watermark: %f, thresholds: %v; silence period: %s",
|
||||
w.Watermarks, w.EmergencyWatermark, w.thresholds, w.Silence)
|
||||
}
|
||||
|
||||
// determine the value to compare utilisation against.
|
||||
var actual uint64
|
||||
switch input.Scope {
|
||||
case ScopeSystem:
|
||||
actual = input.SysStats.ActualUsed
|
||||
case ScopeHeap:
|
||||
actual = input.MemStats.HeapAlloc
|
||||
}
|
||||
|
||||
input.Logger.Debugf("watermark policy: evaluating; curr_watermark: %f, utilization: %d/%d/%d (used/curr_threshold/limit)",
|
||||
w.Watermarks[w.currIdx], actual, w.thresholds[w.currIdx], input.Limit)
|
||||
|
||||
// determine whether we're past the emergency watermark; if we are, we fire
|
||||
// unconditionally. Disabled if 0.
|
||||
if w.EmergencyWatermark > 0 {
|
||||
currPercentage := math.Round((float64(actual)/float64(input.Limit))*DecimalPrecision) / DecimalPrecision // round float.
|
||||
if pastEmergency := currPercentage >= w.EmergencyWatermark; pastEmergency {
|
||||
emergencyThreshold := uint64(float64(input.Limit) / w.EmergencyWatermark)
|
||||
input.Logger.Infof("watermark policy: emergency trigger; perc: %f/%f (%% used/%% emergency), utilization: %d/%d/%d (used/emergency/limit), used: %d, limit: %d, ",
|
||||
currPercentage, w.EmergencyWatermark, actual, emergencyThreshold, input.Limit)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// short-circuit if within the silencing period.
|
||||
if silencing := w.silenceNs > 0; silencing {
|
||||
now := Clock.Now().UnixNano()
|
||||
if elapsed := now - int64(input.MemStats.LastGC); elapsed < w.silenceNs {
|
||||
input.Logger.Debugf("watermark policy: silenced")
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// check if the the utilisation is below our current threshold; try
|
||||
// to downscale before returning false.
|
||||
if actual < w.thresholds[w.currIdx] {
|
||||
for w.currIdx > 0 {
|
||||
if actual >= w.thresholds[w.currIdx-1] {
|
||||
break
|
||||
}
|
||||
w.firedLast = false
|
||||
w.currIdx--
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// we are above our current watermark, but if this is the last watermark and
|
||||
// we've already fired, suppress the firing.
|
||||
if w.currIdx == w.lastIdx && w.firedLast {
|
||||
input.Logger.Debugf("watermark policy: last watermark already fired; skipping")
|
||||
return false
|
||||
}
|
||||
|
||||
// if our value is above the last threshold, record the last threshold as
|
||||
// our current watermark, and also the fact that we've already fired for
|
||||
// it.
|
||||
if actual >= w.thresholds[w.lastIdx] {
|
||||
w.currIdx = w.lastIdx
|
||||
w.firedLast = true
|
||||
input.Logger.Infof("watermark policy triggering: %f watermark surpassed", w.Watermarks[w.currIdx])
|
||||
return true
|
||||
}
|
||||
|
||||
input.Logger.Infof("watermark policy triggering: %f watermark surpassed", w.Watermarks[w.currIdx])
|
||||
|
||||
// we are triggering; update the current watermark, upscaling it to the
|
||||
// next threshold.
|
||||
for w.currIdx < w.lastIdx {
|
||||
w.currIdx++
|
||||
if actual < w.thresholds[w.currIdx] {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
type watermarkPolicy struct {
|
||||
// watermarks are the percentual amounts of limit.
|
||||
watermarks []float64
|
||||
// thresholds are the absolute trigger points of this policy.
|
||||
thresholds []uint64
|
||||
limit uint64
|
||||
}
|
||||
|
||||
var _ Policy = (*watermarkPolicy)(nil)
|
||||
|
||||
func (w *watermarkPolicy) Evaluate(_ UtilizationType, used uint64) (next uint64) {
|
||||
Logger.Debugf("watermark policy: evaluating; utilization: %d/%d (used/limit)", used, w.limit)
|
||||
var i int
|
||||
for ; i < len(w.thresholds); i++ {
|
||||
t := w.thresholds[i]
|
||||
if used < t {
|
||||
return t
|
||||
}
|
||||
}
|
||||
// we reached the maximum threshold, so we disable this policy.
|
||||
return PolicyTempDisabled
|
||||
}
|
||||
|
@ -1,288 +1,54 @@
|
||||
package watchdog
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/elastic/gosigar"
|
||||
"github.com/raulk/clock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
limit uint64 = 64 << 20 // 64MiB.
|
||||
firstWatermark = 0.50
|
||||
secondWatermark = 0.75
|
||||
thirdWatermark = 0.80
|
||||
emergencyWatermark = 0.90
|
||||
|
||||
logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true}
|
||||
watermarks = []float64{0.50, 0.75, 0.80}
|
||||
thresholds = func() []uint64 {
|
||||
var ret []uint64
|
||||
for _, w := range watermarks {
|
||||
ret = append(ret, uint64(float64(limit)*w))
|
||||
}
|
||||
return ret
|
||||
}()
|
||||
)
|
||||
|
||||
func TestProgressiveWatermarksSystem(t *testing.T) {
|
||||
func TestProgressiveWatermarks(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark},
|
||||
EmergencyWatermark: emergencyWatermark,
|
||||
}
|
||||
p, err := NewWatermarkPolicy(watermarks...)(limit)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*firstWatermark) - 1},
|
||||
}))
|
||||
// at zero
|
||||
next := p.Evaluate(UtilizationSystem, uint64(0))
|
||||
require.EqualValues(t, thresholds[0], next)
|
||||
|
||||
// trigger the first watermark.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)},
|
||||
}))
|
||||
// before the watermark.
|
||||
next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])-1)
|
||||
require.EqualValues(t, thresholds[0], next)
|
||||
|
||||
// this won't fire because we're still on the same watermark.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)},
|
||||
}))
|
||||
}
|
||||
// exactly at the watermark; gives us the next watermark, as the watchdodg would've
|
||||
// taken care of triggering the first watermark.
|
||||
next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0]))
|
||||
require.EqualValues(t, thresholds[1], next)
|
||||
|
||||
// now let's move to the second watermark.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)},
|
||||
}))
|
||||
// after the watermark gives us the next watermark.
|
||||
next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])+1)
|
||||
require.EqualValues(t, thresholds[1], next)
|
||||
|
||||
// this won't fire because we're still on the same watermark.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)},
|
||||
}))
|
||||
}
|
||||
// last watermark; disable the policy.
|
||||
next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2]))
|
||||
require.EqualValues(t, PolicyTempDisabled, next)
|
||||
|
||||
// now let's move to the third and last watermark.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * thirdWatermark)},
|
||||
}))
|
||||
next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2]+1))
|
||||
require.EqualValues(t, PolicyTempDisabled, next)
|
||||
|
||||
// this won't fire because we're still on the same watermark.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * thirdWatermark)},
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressiveWatermarksHeap(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark},
|
||||
}
|
||||
|
||||
// now back up step by step, check that all of them fire.
|
||||
for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} {
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeHeap,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0), HeapAlloc: uint64(float64(limit) * wm)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: 0},
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDownscalingWatermarks_Reentrancy(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark},
|
||||
}
|
||||
|
||||
// crank all the way to the top.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: limit},
|
||||
}))
|
||||
|
||||
// now back down, checking that none of the boundaries fire.
|
||||
for _, wm := range []float64{thirdWatermark, secondWatermark, firstWatermark} {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*wm - 1)},
|
||||
}))
|
||||
}
|
||||
|
||||
// now back up step by step, check that all of them fire.
|
||||
for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} {
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*wm + 1)},
|
||||
}))
|
||||
}
|
||||
|
||||
// check the top does not fire because it already fired.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: limit},
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmergencyWatermark(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark},
|
||||
EmergencyWatermark: emergencyWatermark,
|
||||
Silence: 1 * time.Minute,
|
||||
}
|
||||
|
||||
// every tick triggers, even within the silence period.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * emergencyWatermark)},
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func TestJumpWatermark(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark},
|
||||
}
|
||||
|
||||
// check that jumping to the top only fires once.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: limit},
|
||||
}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: limit},
|
||||
}))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSilencePeriod(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
var (
|
||||
limit uint64 = 64 << 20 // 64MiB.
|
||||
firstWatermark = 0.50
|
||||
secondWatermark = 0.75
|
||||
thirdWatermark = 0.80
|
||||
)
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark},
|
||||
Silence: 1 * time.Minute,
|
||||
}
|
||||
|
||||
// going above the first threshold, but within silencing period, so nothing happens.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)},
|
||||
}))
|
||||
}
|
||||
|
||||
// now outside the silencing period, we do fire.
|
||||
clk.Add(time.Minute)
|
||||
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: 0},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)},
|
||||
}))
|
||||
|
||||
// but not the second time.
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: 0},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)},
|
||||
}))
|
||||
|
||||
// now let's go up inside the silencing period, nothing happens.
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)},
|
||||
}))
|
||||
|
||||
// same thing, outside the silencing period should trigger.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Logger: logger,
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)},
|
||||
}))
|
||||
next = p.Evaluate(UtilizationSystem, limit)
|
||||
require.EqualValues(t, PolicyTempDisabled, next)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user