diff --git a/log.go b/log.go new file mode 100644 index 0000000..b6977cc --- /dev/null +++ b/log.go @@ -0,0 +1,38 @@ +package watchdog + +import "log" + +// Logger is an interface to be implemented by custom loggers. +type Logger interface { + Debugf(template string, args ...interface{}) + Infof(template string, args ...interface{}) + Warnf(template string, args ...interface{}) + Errorf(template string, args ...interface{}) +} + +var _ Logger = (*stdlog)(nil) + +// stdlog is a Logger that proxies to a standard log.Logger. +type stdlog struct { + log *log.Logger + debug bool +} + +func (s *stdlog) Debugf(template string, args ...interface{}) { + if !s.debug { + return + } + s.log.Printf(template, args...) +} + +func (s *stdlog) Infof(template string, args ...interface{}) { + s.log.Printf(template, args...) +} + +func (s *stdlog) Warnf(template string, args ...interface{}) { + s.log.Printf(template, args...) +} + +func (s *stdlog) Errorf(template string, args ...interface{}) { + s.log.Printf(template, args...) +} diff --git a/watchdog.go b/watchdog.go index d8f0e11..7fc9266 100644 --- a/watchdog.go +++ b/watchdog.go @@ -58,7 +58,7 @@ type PolicyInput struct { MemStats *runtime.MemStats SysStats *gosigar.Mem GCTrigger bool // is this a GC trigger? - Logger *log.Logger + Logger Logger } // Policy encapsulates the logic that the watchdog will run on every tick. @@ -93,12 +93,9 @@ type MemConfig struct { // callbacks, without triggering actual GC. NotifyOnly bool - // Logger is the logger to use. If nil, it will default to a new log package - // logger that uses the same io.Writer as the - // - // To use a zap logger, wrap it in a standard logger using use - // zap.NewStdLog(). - Logger *log.Logger + // Logger is the logger to use. If nil, it will default to a logger that + // proxies to a standard logger using the "[watchdog]" prefix. + Logger Logger } // Memory starts the singleton memory watchdog with the provided configuration. @@ -108,7 +105,7 @@ func Memory(config MemConfig) (err error, stop func()) { } if config.Logger == nil { - config.Logger = log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix) + config.Logger = &stdlog{log: log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix)} } // if the user didn't provide a limit, get the total memory. @@ -150,7 +147,7 @@ func watch() { select { case gcTriggered <- struct{}{}: default: - config.Logger.Printf("failed to queue gc trigger; channel backlogged") + config.Logger.Warnf("failed to queue gc trigger; channel backlogged") } runtime.SetFinalizer(o, finalizer) } @@ -218,10 +215,15 @@ func watch() { // // See: https://github.com/golang/go/issues/19812 // See: https://github.com/prometheus/client_golang/issues/403 + + if eventIsGc { + config.Logger.Infof("watchdog after GC") + } + runtime.ReadMemStats(&memstats) if err := sysmem.Get(); err != nil { - config.Logger.Printf("failed to obtain system memory stats ") + config.Logger.Warnf("failed to obtain system memory stats; err: %s", err) } trigger := config.Policy.Evaluate(PolicyInput{ @@ -237,16 +239,19 @@ func watch() { continue } + config.Logger.Infof("watchdog policy fired") + if f := config.NotifyFired; f != nil { f() } if !config.NotifyOnly { - config.Logger.Printf("watchdog policy fired: triggering GC") + config.Logger.Infof("watchdog is triggering GC") + start := time.Now() runtime.GC() - config.Logger.Printf("GC finished") + runtime.ReadMemStats(&memstats) + config.Logger.Infof("watchdog-triggered GC finished; took: %s; current heap allocated: %d bytes", time.Since(start), memstats.HeapAlloc) } - } } diff --git a/watermarks.go b/watermarks.go index e40fd9b..54c986e 100644 --- a/watermarks.go +++ b/watermarks.go @@ -60,6 +60,8 @@ func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) { w.silenceNs = w.Silence.Nanoseconds() w.lastIdx = len(w.Watermarks) - 1 w.initialized = true + input.Logger.Infof("initialized watermark watchdog policy; watermarks: %v; emergency watermark: %f, thresholds: %v; silence period: %s", + w.Watermarks, w.EmergencyWatermark, w.thresholds, w.Silence) } // determine the value to compare utilisation against. @@ -71,11 +73,17 @@ func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) { actual = input.MemStats.HeapAlloc } + input.Logger.Debugf("watermark policy: evaluating; curr_watermark: %f, utilization: %d/%d/%d (used/curr_threshold/limit)", + w.Watermarks[w.currIdx], actual, w.thresholds[w.currIdx], input.Limit) + // determine whether we're past the emergency watermark; if we are, we fire // unconditionally. Disabled if 0. if w.EmergencyWatermark > 0 { currPercentage := math.Round((float64(actual)/float64(input.Limit))*DecimalPrecision) / DecimalPrecision // round float. if pastEmergency := currPercentage >= w.EmergencyWatermark; pastEmergency { + emergencyThreshold := uint64(float64(input.Limit) / w.EmergencyWatermark) + input.Logger.Infof("watermark policy: emergency trigger; perc: %f/%f (%% used/%% emergency), utilization: %d/%d/%d (used/emergency/limit), used: %d, limit: %d, ", + currPercentage, w.EmergencyWatermark, actual, emergencyThreshold, input.Limit) return true } } @@ -84,6 +92,7 @@ func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) { if silencing := w.silenceNs > 0; silencing { now := Clock.Now().UnixNano() if elapsed := now - int64(input.MemStats.LastGC); elapsed < w.silenceNs { + input.Logger.Debugf("watermark policy: silenced") return false } } @@ -104,6 +113,7 @@ func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) { // we are above our current watermark, but if this is the last watermark and // we've already fired, suppress the firing. if w.currIdx == w.lastIdx && w.firedLast { + input.Logger.Debugf("watermark policy: last watermark already fired; skipping") return false } @@ -113,9 +123,12 @@ func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) { if actual >= w.thresholds[w.lastIdx] { w.currIdx = w.lastIdx w.firedLast = true + input.Logger.Infof("watermark policy triggering: %f watermark surpassed", w.Watermarks[w.currIdx]) return true } + input.Logger.Infof("watermark policy triggering: %f watermark surpassed", w.Watermarks[w.currIdx]) + // we are triggering; update the current watermark, upscaling it to the // next threshold. for w.currIdx < w.lastIdx { diff --git a/watermarks_test.go b/watermarks_test.go index ae2ef06..1d855e7 100644 --- a/watermarks_test.go +++ b/watermarks_test.go @@ -1,6 +1,8 @@ package watchdog import ( + "log" + "os" "runtime" "testing" "time" @@ -16,6 +18,8 @@ var ( secondWatermark = 0.75 thirdWatermark = 0.80 emergencyWatermark = 0.90 + + logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true} ) func TestProgressiveWatermarksSystem(t *testing.T) { @@ -28,6 +32,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) { } require.False(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*firstWatermark) - 1}, @@ -35,6 +40,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) { // trigger the first watermark. require.True(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, @@ -44,6 +50,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) { // this won't fire because we're still on the same watermark. for i := 0; i < 100; i++ { require.False(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, @@ -53,6 +60,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) { // now let's move to the second watermark. require.True(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, @@ -62,6 +70,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) { // this won't fire because we're still on the same watermark. for i := 0; i < 100; i++ { require.False(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0)}, @@ -71,6 +80,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) { // now let's move to the third and last watermark. require.True(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0)}, @@ -80,6 +90,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) { // this won't fire because we're still on the same watermark. for i := 0; i < 100; i++ { require.False(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0)}, @@ -99,6 +110,7 @@ func TestProgressiveWatermarksHeap(t *testing.T) { // now back up step by step, check that all of them fire. for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} { require.True(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeHeap, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0), HeapAlloc: uint64(float64(limit) * wm)}, @@ -117,6 +129,7 @@ func TestDownscalingWatermarks_Reentrancy(t *testing.T) { // crank all the way to the top. require.True(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0)}, @@ -126,6 +139,7 @@ func TestDownscalingWatermarks_Reentrancy(t *testing.T) { // now back down, checking that none of the boundaries fire. for _, wm := range []float64{thirdWatermark, secondWatermark, firstWatermark} { require.False(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0)}, @@ -136,6 +150,7 @@ func TestDownscalingWatermarks_Reentrancy(t *testing.T) { // now back up step by step, check that all of them fire. for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} { require.True(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0)}, @@ -146,6 +161,7 @@ func TestDownscalingWatermarks_Reentrancy(t *testing.T) { // check the top does not fire because it already fired. for i := 0; i < 100; i++ { require.False(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0)}, @@ -167,6 +183,7 @@ func TestEmergencyWatermark(t *testing.T) { // every tick triggers, even within the silence period. for i := 0; i < 100; i++ { require.True(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, @@ -185,6 +202,7 @@ func TestJumpWatermark(t *testing.T) { // check that jumping to the top only fires once. require.True(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0)}, @@ -193,6 +211,7 @@ func TestJumpWatermark(t *testing.T) { for i := 0; i < 100; i++ { require.False(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0)}, @@ -221,6 +240,7 @@ func TestSilencePeriod(t *testing.T) { // going above the first threshold, but within silencing period, so nothing happens. for i := 0; i < 100; i++ { require.False(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, @@ -232,6 +252,7 @@ func TestSilencePeriod(t *testing.T) { clk.Add(time.Minute) require.True(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: 0}, @@ -240,6 +261,7 @@ func TestSilencePeriod(t *testing.T) { // but not the second time. require.False(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: 0}, @@ -248,6 +270,7 @@ func TestSilencePeriod(t *testing.T) { // now let's go up inside the silencing period, nothing happens. require.False(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, @@ -256,6 +279,7 @@ func TestSilencePeriod(t *testing.T) { // same thing, outside the silencing period should trigger. require.True(t, p.Evaluate(PolicyInput{ + Logger: logger, Scope: ScopeSystem, Limit: limit, MemStats: &runtime.MemStats{LastGC: uint64(0)},