mirror of
https://github.com/status-im/go-watchdog.git
synced 2025-02-14 18:26:31 +00:00
initial commit.
This commit is contained in:
commit
e2e9d96ec7
5
LICENSE-APACHE
Normal file
5
LICENSE-APACHE
Normal file
@ -0,0 +1,5 @@
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
19
LICENSE-MIT
Normal file
19
LICENSE-MIT
Normal file
@ -0,0 +1,19 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
51
README.md
Normal file
51
README.md
Normal file
@ -0,0 +1,51 @@
|
||||
# Go memory watchdog
|
||||
|
||||
> 🐺 A library to curb OOMs by running Go GC according to a user-defined policy.
|
||||
|
||||
go-memwd runs a singleton memory watchdog. It takes system and heap memory
|
||||
readings periodically, and feeds them to a user-defined policy to determine
|
||||
whether GC needs to run immediately.
|
||||
|
||||
This library ships with two policies out of the box:
|
||||
|
||||
* watermarks policy: runs GC at configured watermarks of system or heap memory
|
||||
utilisation.
|
||||
* adaptive: runs GC when the current usage surpasses a dynamically-set
|
||||
threshold.
|
||||
|
||||
You can easily build a custom policy tailored to the allocation patterns of your
|
||||
program.
|
||||
|
||||
It is recommended that you set both (a) a memory limit and (b) a scope of
|
||||
application of that limit (system or heap) when you start the watchdog.
|
||||
Otherwise, go-watchdog will use the system scope, and will default to the
|
||||
total system memory as the limit. [elastic/go-sigar](https://github.com/elastic/gosigar)
|
||||
is used to make the discovery.
|
||||
|
||||
## Why is this even needed?
|
||||
|
||||
The garbage collector that ships with the go runtime is pretty good in some
|
||||
regards (low-latency, negligible no stop-the-world), but it's insatisfactory in
|
||||
a number of situations that yield ill-fated outcomes:
|
||||
|
||||
1. it is incapable of dealing with bursty/spiky allocations efficiently;
|
||||
depending on the workload, the program may OOM as a consequence of not
|
||||
scheduling GC in a timely manner.
|
||||
2. part of the above is due to the fact that go doesn't concern itself with any
|
||||
limits. To date, it is not possible to set a maximum heap size.
|
||||
2. its default policy of scheduling GC when the heap doubles, coupled with its
|
||||
ignorance of system or process limits, can easily cause it to OOM.
|
||||
|
||||
For more information, check out these GitHub issues:
|
||||
|
||||
* https://github.com/golang/go/issues/42805
|
||||
* https://github.com/golang/go/issues/42430
|
||||
* https://github.com/golang/go/issues/14735
|
||||
* https://github.com/golang/go/issues/16843
|
||||
* https://github.com/golang/go/issues/10064
|
||||
* https://github.com/golang/go/issues/9849
|
||||
|
||||
## License
|
||||
|
||||
Dual-licensed: [MIT](./LICENSE-MIT), [Apache Software License v2](./LICENSE-APACHE), by way of the
|
||||
[Permissive License Stack](https://protocol.ai/blog/an
|
53
adaptive.go
Normal file
53
adaptive.go
Normal file
@ -0,0 +1,53 @@
|
||||
package watchdog
|
||||
|
||||
// AdaptivePolicy is a policy that forces GC when the usage surpasses the
|
||||
// available memory after the last GC run.
|
||||
//
|
||||
// TODO tests
|
||||
type AdaptivePolicy struct {
|
||||
// Factor determines how much this policy will let the heap expand
|
||||
// before it triggers.
|
||||
//
|
||||
// On every GC run, this policy recalculates the next target as
|
||||
// (limit-currentHeap)*Factor (i.e. available*Factor).
|
||||
//
|
||||
// If the GC target calculated by the runtime is lower than the one
|
||||
// calculated by this policy, this policy will set the new target, but the
|
||||
// effect will be nil, since the the go runtime will run GC sooner than us
|
||||
// anyway.
|
||||
Factor float64
|
||||
|
||||
active bool
|
||||
target uint64
|
||||
initialized bool
|
||||
}
|
||||
|
||||
var _ Policy = (*AdaptivePolicy)(nil)
|
||||
|
||||
func (a *AdaptivePolicy) Evaluate(input PolicyInput) (trigger bool) {
|
||||
if !a.initialized {
|
||||
// when initializing, set the target to the limit; it will be reset
|
||||
// when the first GC happens.
|
||||
a.target = input.Limit
|
||||
a.initialized = true
|
||||
}
|
||||
|
||||
// determine the value to compare utilisation against.
|
||||
var actual uint64
|
||||
switch input.Scope {
|
||||
case ScopeSystem:
|
||||
actual = input.SysStats.ActualUsed
|
||||
case ScopeHeap:
|
||||
actual = input.MemStats.HeapAlloc
|
||||
}
|
||||
|
||||
if input.GCTrigger {
|
||||
available := float64(input.Limit) - float64(actual)
|
||||
calc := uint64(available * a.Factor)
|
||||
a.target = calc
|
||||
}
|
||||
if actual >= a.target {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
13
go.mod
Normal file
13
go.mod
Normal file
@ -0,0 +1,13 @@
|
||||
module github.com/raulk/go-memwd
|
||||
|
||||
go 1.15
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/elastic/gosigar v0.12.0
|
||||
github.com/kr/pretty v0.1.0 // indirect
|
||||
github.com/raulk/clock v1.1.0
|
||||
github.com/stretchr/testify v1.4.0
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
|
||||
)
|
30
go.sum
Normal file
30
go.sum
Normal file
@ -0,0 +1,30 @@
|
||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/elastic/gosigar v0.12.0 h1:AsdhYCJlTudhfOYQyFNgx+fIVTfrDO0V1ST0vHgiapU=
|
||||
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/raulk/clock v1.1.0 h1:dpb29+UKMbLqiU/jqIJptgLR1nn23HLgMY0sTCDza5Y=
|
||||
github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0 h1:8H8QZJ30plJyIVj60H3lr8TZGIq2Fh3Cyrs/ZNg1foU=
|
||||
golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
259
watchdog.go
Normal file
259
watchdog.go
Normal file
@ -0,0 +1,259 @@
|
||||
package watchdog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/elastic/gosigar"
|
||||
"github.com/raulk/clock"
|
||||
)
|
||||
|
||||
// DecimalPrecision is the rounding precision that float calculations will use.
|
||||
// By default, 4 decimal places.
|
||||
var DecimalPrecision = 1e4
|
||||
|
||||
// Clock can be used to inject a mock clock for testing.
|
||||
var Clock = clock.New()
|
||||
|
||||
var (
|
||||
// ErrAlreadyStarted is returned when the user tries to start the watchdog more than once.
|
||||
ErrAlreadyStarted = fmt.Errorf("singleton memory watchdog was already started")
|
||||
)
|
||||
|
||||
const (
|
||||
// stateUnstarted represents an unstarted state.
|
||||
stateUnstarted int64 = iota
|
||||
// stateStarted represents a started state.
|
||||
stateStarted
|
||||
)
|
||||
|
||||
// watchdog is a global singleton watchdog.
|
||||
var watchdog struct {
|
||||
state int64
|
||||
config MemConfig
|
||||
|
||||
closing chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// ScopeType defines the scope of the utilisation that we'll apply the limit to.
|
||||
type ScopeType int
|
||||
|
||||
const (
|
||||
// ScopeSystem specifies that the policy compares against actual used
|
||||
// system memory.
|
||||
ScopeSystem ScopeType = iota
|
||||
// ScopeHeap specifies that the policy compares against heap used.
|
||||
ScopeHeap
|
||||
)
|
||||
|
||||
// PolicyInput is the object that's passed to a policy when evaluating it.
|
||||
type PolicyInput struct {
|
||||
Scope ScopeType
|
||||
Limit uint64
|
||||
MemStats *runtime.MemStats
|
||||
SysStats *gosigar.Mem
|
||||
GCTrigger bool // is this a GC trigger?
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// Policy encapsulates the logic that the watchdog will run on every tick.
|
||||
type Policy interface {
|
||||
// Evaluate determines whether the policy should fire. It receives the
|
||||
// limit (either guessed or manually set), go runtime memory stats, and
|
||||
// system memory stats, amongst other things. It returns whether the policy
|
||||
// has fired or not.
|
||||
Evaluate(input PolicyInput) (trigger bool)
|
||||
}
|
||||
|
||||
type MemConfig struct {
|
||||
// Scope is the scope at which the limit will be applied.
|
||||
Scope ScopeType
|
||||
|
||||
// Limit is the memory available to this process. If zero, we will fall
|
||||
// back to querying the system total memory via SIGAR.
|
||||
Limit uint64
|
||||
|
||||
// Resolution is the interval at which the watchdog will retrieve memory
|
||||
// stats and invoke the Policy.
|
||||
Resolution time.Duration
|
||||
|
||||
// Policy sets the firing policy of this watchdog.
|
||||
Policy Policy
|
||||
|
||||
// NotifyFired, if non-nil, will be called when the policy has fired,
|
||||
// prior to calling GC, even if GC is disabled.
|
||||
NotifyFired func()
|
||||
|
||||
// NotifyOnly, if true, will cause the watchdog to only notify via the
|
||||
// callbacks, without triggering actual GC.
|
||||
NotifyOnly bool
|
||||
|
||||
// Logger is the logger to use. If nil, it will default to a new log package
|
||||
// logger that uses the same io.Writer as the
|
||||
//
|
||||
// To use a zap logger, wrap it in a standard logger using use
|
||||
// zap.NewStdLog().
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// Memory starts the singleton memory watchdog with the provided configuration.
|
||||
func Memory(config MemConfig) (err error, stop func()) {
|
||||
if !atomic.CompareAndSwapInt64(&watchdog.state, stateUnstarted, stateStarted) {
|
||||
return ErrAlreadyStarted, nil
|
||||
}
|
||||
|
||||
if config.Logger == nil {
|
||||
config.Logger = log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix)
|
||||
}
|
||||
|
||||
// if the user didn't provide a limit, get the total memory.
|
||||
if config.Limit == 0 {
|
||||
var mem gosigar.Mem
|
||||
if err := mem.Get(); err != nil {
|
||||
return fmt.Errorf("failed to get system memory limit via SIGAR: %w", err), nil
|
||||
}
|
||||
config.Limit = mem.Total
|
||||
}
|
||||
|
||||
watchdog.config = config
|
||||
watchdog.closing = make(chan struct{})
|
||||
|
||||
watchdog.wg.Add(1)
|
||||
go watch()
|
||||
|
||||
return nil, stopMemory
|
||||
}
|
||||
|
||||
func watch() {
|
||||
var (
|
||||
lk sync.Mutex // guards gcTriggered channel, which is drained and flipped to nil on closure.
|
||||
gcTriggered = make(chan struct{}, 16)
|
||||
|
||||
memstats runtime.MemStats
|
||||
sysmem gosigar.Mem
|
||||
config = watchdog.config
|
||||
)
|
||||
|
||||
// this non-zero sized struct is used as a sentinel to detect when a GC
|
||||
// run has finished, by setting and resetting a finalizer on it.
|
||||
type sentinel struct{ a *int }
|
||||
var sentinelObj sentinel
|
||||
var finalizer func(o *sentinel)
|
||||
finalizer = func(o *sentinel) {
|
||||
lk.Lock()
|
||||
defer lk.Unlock()
|
||||
select {
|
||||
case gcTriggered <- struct{}{}:
|
||||
default:
|
||||
config.Logger.Printf("failed to queue gc trigger; channel backlogged")
|
||||
}
|
||||
runtime.SetFinalizer(o, finalizer)
|
||||
}
|
||||
finalizer(&sentinelObj)
|
||||
|
||||
defer watchdog.wg.Done()
|
||||
for {
|
||||
var eventIsGc bool
|
||||
select {
|
||||
case <-Clock.After(config.Resolution):
|
||||
// exit select.
|
||||
|
||||
case <-gcTriggered:
|
||||
eventIsGc = true
|
||||
// exit select.
|
||||
|
||||
case <-watchdog.closing:
|
||||
runtime.SetFinalizer(&sentinelObj, nil) // clear the sentinel finalizer.
|
||||
|
||||
lk.Lock()
|
||||
ch := gcTriggered
|
||||
gcTriggered = nil
|
||||
lk.Unlock()
|
||||
|
||||
// close and drain
|
||||
close(ch)
|
||||
for range ch {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ReadMemStats stops the world. But as of go1.9, it should only
|
||||
// take ~25µs to complete.
|
||||
//
|
||||
// Before go1.15, calls to ReadMemStats during an ongoing GC would
|
||||
// block due to the worldsema lock. As of go1.15, this was optimized
|
||||
// and the runtime holds on to worldsema less during GC (only during
|
||||
// sweep termination and mark termination).
|
||||
//
|
||||
// For users using go1.14 and earlier, if this call happens during
|
||||
// GC, it will just block for longer until serviced, but it will not
|
||||
// take longer in itself. No harm done.
|
||||
//
|
||||
// Actual benchmarks
|
||||
// -----------------
|
||||
//
|
||||
// In Go 1.15.5, ReadMem with no ongoing GC takes ~27µs in a MBP 16
|
||||
// i9 busy with another million things. During GC, it takes an
|
||||
// average of less than 175µs per op.
|
||||
//
|
||||
// goos: darwin
|
||||
// goarch: amd64
|
||||
// pkg: github.com/filecoin-project/lotus/api
|
||||
// BenchmarkReadMemStats-16 44530 27523 ns/op
|
||||
// BenchmarkReadMemStats-16 43743 26879 ns/op
|
||||
// BenchmarkReadMemStats-16 45627 26791 ns/op
|
||||
// BenchmarkReadMemStats-16 44538 26219 ns/op
|
||||
// BenchmarkReadMemStats-16 44958 26757 ns/op
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 183733 p50-ns 211859 p90-ns 211859 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 7 198765 p50-ns 314873 p90-ns 314873 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 195151 p50-ns 311408 p90-ns 311408 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 217279 p50-ns 295308 p90-ns 295308 p99-ns
|
||||
// BenchmarkReadMemStatsWithGCContention-16 10 167054 p50-ns 327072 p90-ns 327072 p99-ns
|
||||
// PASS
|
||||
//
|
||||
// See: https://github.com/golang/go/issues/19812
|
||||
// See: https://github.com/prometheus/client_golang/issues/403
|
||||
runtime.ReadMemStats(&memstats)
|
||||
|
||||
if err := sysmem.Get(); err != nil {
|
||||
config.Logger.Printf("failed to obtain system memory stats ")
|
||||
}
|
||||
|
||||
trigger := config.Policy.Evaluate(PolicyInput{
|
||||
Scope: config.Scope,
|
||||
Limit: config.Limit,
|
||||
MemStats: &memstats,
|
||||
SysStats: &sysmem,
|
||||
GCTrigger: eventIsGc,
|
||||
Logger: config.Logger,
|
||||
})
|
||||
|
||||
if !trigger {
|
||||
continue
|
||||
}
|
||||
|
||||
if f := config.NotifyFired; f != nil {
|
||||
f()
|
||||
}
|
||||
|
||||
if !config.NotifyOnly {
|
||||
config.Logger.Printf("watchdog policy fired: triggering GC")
|
||||
runtime.GC()
|
||||
config.Logger.Printf("GC finished")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func stopMemory() {
|
||||
if !atomic.CompareAndSwapInt64(&watchdog.state, stateStarted, stateUnstarted) {
|
||||
return
|
||||
}
|
||||
close(watchdog.closing)
|
||||
watchdog.wg.Wait()
|
||||
}
|
92
watchdog_test.go
Normal file
92
watchdog_test.go
Normal file
@ -0,0 +1,92 @@
|
||||
package watchdog
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/raulk/clock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testPolicy struct {
|
||||
ch chan *PolicyInput
|
||||
trigger bool
|
||||
}
|
||||
|
||||
var _ Policy = (*testPolicy)(nil)
|
||||
|
||||
func (t *testPolicy) Evaluate(input PolicyInput) (trigger bool) {
|
||||
t.ch <- &input
|
||||
return t.trigger
|
||||
}
|
||||
|
||||
func TestWatchdog(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
tp := &testPolicy{ch: make(chan *PolicyInput, 1)}
|
||||
notifyCh := make(chan struct{}, 1)
|
||||
|
||||
err, stop := Memory(MemConfig{
|
||||
Scope: ScopeHeap,
|
||||
Limit: 100,
|
||||
Resolution: 10 * time.Second,
|
||||
Policy: tp,
|
||||
NotifyFired: func() { notifyCh <- struct{}{} },
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer stop()
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // wait til the watchdog goroutine starts.
|
||||
|
||||
clk.Add(10 * time.Second)
|
||||
pi := <-tp.ch
|
||||
require.EqualValues(t, 100, pi.Limit)
|
||||
require.NotNil(t, pi.MemStats)
|
||||
require.NotNil(t, pi.SysStats)
|
||||
require.Equal(t, ScopeHeap, pi.Scope)
|
||||
|
||||
require.Len(t, notifyCh, 0)
|
||||
|
||||
var ms runtime.MemStats
|
||||
runtime.ReadMemStats(&ms)
|
||||
require.EqualValues(t, 0, ms.NumGC)
|
||||
|
||||
// now fire.
|
||||
tp.trigger = true
|
||||
|
||||
clk.Add(10 * time.Second)
|
||||
pi = <-tp.ch
|
||||
require.EqualValues(t, 100, pi.Limit)
|
||||
require.NotNil(t, pi.MemStats)
|
||||
require.NotNil(t, pi.SysStats)
|
||||
require.Equal(t, ScopeHeap, pi.Scope)
|
||||
|
||||
time.Sleep(500 * time.Millisecond) // wait until the watchdog runs.
|
||||
|
||||
require.Len(t, notifyCh, 1)
|
||||
|
||||
runtime.ReadMemStats(&ms)
|
||||
require.EqualValues(t, 1, ms.NumGC)
|
||||
}
|
||||
|
||||
func TestDoubleClose(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatal(r)
|
||||
}
|
||||
}()
|
||||
|
||||
tp := &testPolicy{ch: make(chan *PolicyInput, 1)}
|
||||
|
||||
err, stop := Memory(MemConfig{
|
||||
Scope: ScopeHeap,
|
||||
Limit: 100,
|
||||
Resolution: 10 * time.Second,
|
||||
Policy: tp,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
stop()
|
||||
stop()
|
||||
}
|
129
watermarks.go
Normal file
129
watermarks.go
Normal file
@ -0,0 +1,129 @@
|
||||
package watchdog
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
// WatermarkPolicy is a watchdog firing policy that triggers when watermarks are
|
||||
// surpassed in the increasing direction.
|
||||
//
|
||||
// For example, a policy configured with the watermarks 0.50, 0.75, 0.80, and
|
||||
// 0.99 will trigger at most once, and only once, each time that a watermark
|
||||
// is surpassed upwards.
|
||||
//
|
||||
// Even if utilisation pierces through several watermarks at once between two
|
||||
// subsequent calls to Evaluate, the policy will fire only once.
|
||||
//
|
||||
// It is possible to suppress the watermark policy from firing too often by
|
||||
// setting a Silence period. When non-zero, if a watermark is surpassed within
|
||||
// the Silence window (using the last GC timestamp as basis), that event will
|
||||
// not immediately trigger firing. Instead, the policy will wait until the
|
||||
// silence period is over, and only then will it fire if utilisation is still
|
||||
// beyond that watermark.
|
||||
//
|
||||
// At last, if an EmergencyWatermark is set, when utilisation is above that
|
||||
// level, the Silence period will be ignored and the policy will fire
|
||||
// persistenly, as long as utilisation stays above that watermark.
|
||||
type WatermarkPolicy struct {
|
||||
// Watermarks are the percentual amounts of limit. The policy will panic if
|
||||
// Watermarks is zero length.
|
||||
Watermarks []float64
|
||||
|
||||
// EmergencyWatermark is a watermark that, when surpassed, puts this
|
||||
// watchdog in emergency mode. During emergency mode, the system is
|
||||
// considered to be under significant memory pressure, and the Quiesce
|
||||
// period is not honoured.
|
||||
EmergencyWatermark float64
|
||||
|
||||
// Silence is the quiet period the watchdog will honour except when in
|
||||
// emergency mode.
|
||||
Silence time.Duration
|
||||
|
||||
// internal state.
|
||||
thresholds []uint64
|
||||
currIdx int // idx of the current watermark.
|
||||
lastIdx int
|
||||
firedLast bool
|
||||
silenceNs int64
|
||||
initialized bool
|
||||
}
|
||||
|
||||
var _ Policy = (*WatermarkPolicy)(nil)
|
||||
|
||||
func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) {
|
||||
if !w.initialized {
|
||||
w.thresholds = make([]uint64, 0, len(w.Watermarks))
|
||||
for _, m := range w.Watermarks {
|
||||
w.thresholds = append(w.thresholds, uint64(float64(input.Limit)*m))
|
||||
}
|
||||
w.silenceNs = w.Silence.Nanoseconds()
|
||||
w.lastIdx = len(w.Watermarks) - 1
|
||||
w.initialized = true
|
||||
}
|
||||
|
||||
// determine the value to compare utilisation against.
|
||||
var actual uint64
|
||||
switch input.Scope {
|
||||
case ScopeSystem:
|
||||
actual = input.SysStats.ActualUsed
|
||||
case ScopeHeap:
|
||||
actual = input.MemStats.HeapAlloc
|
||||
}
|
||||
|
||||
// determine whether we're past the emergency watermark; if we are, we fire
|
||||
// unconditionally. Disabled if 0.
|
||||
if w.EmergencyWatermark > 0 {
|
||||
currPercentage := math.Round((float64(actual)/float64(input.Limit))*DecimalPrecision) / DecimalPrecision // round float.
|
||||
if pastEmergency := currPercentage >= w.EmergencyWatermark; pastEmergency {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// short-circuit if within the silencing period.
|
||||
if silencing := w.silenceNs > 0; silencing {
|
||||
now := Clock.Now().UnixNano()
|
||||
if elapsed := now - int64(input.MemStats.LastGC); elapsed < w.silenceNs {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// check if the the utilisation is below our current threshold; try
|
||||
// to downscale before returning false.
|
||||
if actual < w.thresholds[w.currIdx] {
|
||||
for w.currIdx > 0 {
|
||||
if actual >= w.thresholds[w.currIdx-1] {
|
||||
break
|
||||
}
|
||||
w.firedLast = false
|
||||
w.currIdx--
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// we are above our current watermark, but if this is the last watermark and
|
||||
// we've already fired, suppress the firing.
|
||||
if w.currIdx == w.lastIdx && w.firedLast {
|
||||
return false
|
||||
}
|
||||
|
||||
// if our value is above the last threshold, record the last threshold as
|
||||
// our current watermark, and also the fact that we've already fired for
|
||||
// it.
|
||||
if actual >= w.thresholds[w.lastIdx] {
|
||||
w.currIdx = w.lastIdx
|
||||
w.firedLast = true
|
||||
return true
|
||||
}
|
||||
|
||||
// we are triggering; update the current watermark, upscaling it to the
|
||||
// next threshold.
|
||||
for w.currIdx < w.lastIdx {
|
||||
w.currIdx++
|
||||
if actual < w.thresholds[w.currIdx] {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
264
watermarks_test.go
Normal file
264
watermarks_test.go
Normal file
@ -0,0 +1,264 @@
|
||||
package watchdog
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/elastic/gosigar"
|
||||
"github.com/raulk/clock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
limit uint64 = 64 << 20 // 64MiB.
|
||||
firstWatermark = 0.50
|
||||
secondWatermark = 0.75
|
||||
thirdWatermark = 0.80
|
||||
emergencyWatermark = 0.90
|
||||
)
|
||||
|
||||
func TestProgressiveWatermarksSystem(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark},
|
||||
EmergencyWatermark: emergencyWatermark,
|
||||
}
|
||||
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*firstWatermark) - 1},
|
||||
}))
|
||||
|
||||
// trigger the first watermark.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)},
|
||||
}))
|
||||
|
||||
// this won't fire because we're still on the same watermark.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)},
|
||||
}))
|
||||
}
|
||||
|
||||
// now let's move to the second watermark.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)},
|
||||
}))
|
||||
|
||||
// this won't fire because we're still on the same watermark.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)},
|
||||
}))
|
||||
}
|
||||
|
||||
// now let's move to the third and last watermark.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * thirdWatermark)},
|
||||
}))
|
||||
|
||||
// this won't fire because we're still on the same watermark.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * thirdWatermark)},
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressiveWatermarksHeap(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark},
|
||||
}
|
||||
|
||||
// now back up step by step, check that all of them fire.
|
||||
for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} {
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeHeap,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0), HeapAlloc: uint64(float64(limit) * wm)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: 0},
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDownscalingWatermarks_Reentrancy(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark},
|
||||
}
|
||||
|
||||
// crank all the way to the top.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: limit},
|
||||
}))
|
||||
|
||||
// now back down, checking that none of the boundaries fire.
|
||||
for _, wm := range []float64{thirdWatermark, secondWatermark, firstWatermark} {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*wm - 1)},
|
||||
}))
|
||||
}
|
||||
|
||||
// now back up step by step, check that all of them fire.
|
||||
for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} {
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*wm + 1)},
|
||||
}))
|
||||
}
|
||||
|
||||
// check the top does not fire because it already fired.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: limit},
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmergencyWatermark(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark},
|
||||
EmergencyWatermark: emergencyWatermark,
|
||||
Silence: 1 * time.Minute,
|
||||
}
|
||||
|
||||
// every tick triggers, even within the silence period.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * emergencyWatermark)},
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
func TestJumpWatermark(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark},
|
||||
}
|
||||
|
||||
// check that jumping to the top only fires once.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: limit},
|
||||
}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: limit},
|
||||
}))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSilencePeriod(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
Clock = clk
|
||||
|
||||
var (
|
||||
limit uint64 = 64 << 20 // 64MiB.
|
||||
firstWatermark = 0.50
|
||||
secondWatermark = 0.75
|
||||
thirdWatermark = 0.80
|
||||
)
|
||||
|
||||
p := WatermarkPolicy{
|
||||
Watermarks: []float64{firstWatermark, secondWatermark, thirdWatermark},
|
||||
Silence: 1 * time.Minute,
|
||||
}
|
||||
|
||||
// going above the first threshold, but within silencing period, so nothing happens.
|
||||
for i := 0; i < 100; i++ {
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)},
|
||||
}))
|
||||
}
|
||||
|
||||
// now outside the silencing period, we do fire.
|
||||
clk.Add(time.Minute)
|
||||
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: 0},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)},
|
||||
}))
|
||||
|
||||
// but not the second time.
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: 0},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * firstWatermark)},
|
||||
}))
|
||||
|
||||
// now let's go up inside the silencing period, nothing happens.
|
||||
require.False(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)},
|
||||
}))
|
||||
|
||||
// same thing, outside the silencing period should trigger.
|
||||
require.True(t, p.Evaluate(PolicyInput{
|
||||
Scope: ScopeSystem,
|
||||
Limit: limit,
|
||||
MemStats: &runtime.MemStats{LastGC: uint64(0)},
|
||||
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit) * secondWatermark)},
|
||||
}))
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user