improve logging.

This commit is contained in:
Raúl Kripalani 2020-12-02 16:33:00 +00:00
parent 2181a740fb
commit ffbfd5e37a
4 changed files with 93 additions and 13 deletions

38
log.go Normal file
View File

@ -0,0 +1,38 @@
package watchdog
import "log"
// Logger is an interface to be implemented by custom loggers.
type Logger interface {
Debugf(template string, args ...interface{})
Infof(template string, args ...interface{})
Warnf(template string, args ...interface{})
Errorf(template string, args ...interface{})
}
var _ Logger = (*stdlog)(nil)
// stdlog is a Logger that proxies to a standard log.Logger.
type stdlog struct {
log *log.Logger
debug bool
}
func (s *stdlog) Debugf(template string, args ...interface{}) {
if !s.debug {
return
}
s.log.Printf(template, args...)
}
func (s *stdlog) Infof(template string, args ...interface{}) {
s.log.Printf(template, args...)
}
func (s *stdlog) Warnf(template string, args ...interface{}) {
s.log.Printf(template, args...)
}
func (s *stdlog) Errorf(template string, args ...interface{}) {
s.log.Printf(template, args...)
}

View File

@ -58,7 +58,7 @@ type PolicyInput struct {
MemStats *runtime.MemStats MemStats *runtime.MemStats
SysStats *gosigar.Mem SysStats *gosigar.Mem
GCTrigger bool // is this a GC trigger? GCTrigger bool // is this a GC trigger?
Logger *log.Logger Logger Logger
} }
// Policy encapsulates the logic that the watchdog will run on every tick. // Policy encapsulates the logic that the watchdog will run on every tick.
@ -93,12 +93,9 @@ type MemConfig struct {
// callbacks, without triggering actual GC. // callbacks, without triggering actual GC.
NotifyOnly bool NotifyOnly bool
// Logger is the logger to use. If nil, it will default to a new log package // Logger is the logger to use. If nil, it will default to a logger that
// logger that uses the same io.Writer as the // proxies to a standard logger using the "[watchdog]" prefix.
// Logger Logger
// To use a zap logger, wrap it in a standard logger using use
// zap.NewStdLog().
Logger *log.Logger
} }
// Memory starts the singleton memory watchdog with the provided configuration. // Memory starts the singleton memory watchdog with the provided configuration.
@ -108,7 +105,7 @@ func Memory(config MemConfig) (err error, stop func()) {
} }
if config.Logger == nil { if config.Logger == nil {
config.Logger = log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix) config.Logger = &stdlog{log: log.New(log.Writer(), "[watchdog] ", log.LstdFlags|log.Lmsgprefix)}
} }
// if the user didn't provide a limit, get the total memory. // if the user didn't provide a limit, get the total memory.
@ -150,7 +147,7 @@ func watch() {
select { select {
case gcTriggered <- struct{}{}: case gcTriggered <- struct{}{}:
default: default:
config.Logger.Printf("failed to queue gc trigger; channel backlogged") config.Logger.Warnf("failed to queue gc trigger; channel backlogged")
} }
runtime.SetFinalizer(o, finalizer) runtime.SetFinalizer(o, finalizer)
} }
@ -218,10 +215,15 @@ func watch() {
// //
// See: https://github.com/golang/go/issues/19812 // See: https://github.com/golang/go/issues/19812
// See: https://github.com/prometheus/client_golang/issues/403 // See: https://github.com/prometheus/client_golang/issues/403
if eventIsGc {
config.Logger.Infof("watchdog after GC")
}
runtime.ReadMemStats(&memstats) runtime.ReadMemStats(&memstats)
if err := sysmem.Get(); err != nil { if err := sysmem.Get(); err != nil {
config.Logger.Printf("failed to obtain system memory stats ") config.Logger.Warnf("failed to obtain system memory stats; err: %s", err)
} }
trigger := config.Policy.Evaluate(PolicyInput{ trigger := config.Policy.Evaluate(PolicyInput{
@ -237,16 +239,19 @@ func watch() {
continue continue
} }
config.Logger.Infof("watchdog policy fired")
if f := config.NotifyFired; f != nil { if f := config.NotifyFired; f != nil {
f() f()
} }
if !config.NotifyOnly { if !config.NotifyOnly {
config.Logger.Printf("watchdog policy fired: triggering GC") config.Logger.Infof("watchdog is triggering GC")
start := time.Now()
runtime.GC() runtime.GC()
config.Logger.Printf("GC finished") runtime.ReadMemStats(&memstats)
config.Logger.Infof("watchdog-triggered GC finished; took: %s; current heap allocated: %d bytes", time.Since(start), memstats.HeapAlloc)
} }
} }
} }

View File

@ -60,6 +60,8 @@ func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) {
w.silenceNs = w.Silence.Nanoseconds() w.silenceNs = w.Silence.Nanoseconds()
w.lastIdx = len(w.Watermarks) - 1 w.lastIdx = len(w.Watermarks) - 1
w.initialized = true w.initialized = true
input.Logger.Infof("initialized watermark watchdog policy; watermarks: %v; emergency watermark: %f, thresholds: %v; silence period: %s",
w.Watermarks, w.EmergencyWatermark, w.thresholds, w.Silence)
} }
// determine the value to compare utilisation against. // determine the value to compare utilisation against.
@ -71,11 +73,17 @@ func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) {
actual = input.MemStats.HeapAlloc actual = input.MemStats.HeapAlloc
} }
input.Logger.Debugf("watermark policy: evaluating; curr_watermark: %f, utilization: %d/%d/%d (used/curr_threshold/limit)",
w.Watermarks[w.currIdx], actual, w.thresholds[w.currIdx], input.Limit)
// determine whether we're past the emergency watermark; if we are, we fire // determine whether we're past the emergency watermark; if we are, we fire
// unconditionally. Disabled if 0. // unconditionally. Disabled if 0.
if w.EmergencyWatermark > 0 { if w.EmergencyWatermark > 0 {
currPercentage := math.Round((float64(actual)/float64(input.Limit))*DecimalPrecision) / DecimalPrecision // round float. currPercentage := math.Round((float64(actual)/float64(input.Limit))*DecimalPrecision) / DecimalPrecision // round float.
if pastEmergency := currPercentage >= w.EmergencyWatermark; pastEmergency { if pastEmergency := currPercentage >= w.EmergencyWatermark; pastEmergency {
emergencyThreshold := uint64(float64(input.Limit) / w.EmergencyWatermark)
input.Logger.Infof("watermark policy: emergency trigger; perc: %f/%f (%% used/%% emergency), utilization: %d/%d/%d (used/emergency/limit), used: %d, limit: %d, ",
currPercentage, w.EmergencyWatermark, actual, emergencyThreshold, input.Limit)
return true return true
} }
} }
@ -84,6 +92,7 @@ func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) {
if silencing := w.silenceNs > 0; silencing { if silencing := w.silenceNs > 0; silencing {
now := Clock.Now().UnixNano() now := Clock.Now().UnixNano()
if elapsed := now - int64(input.MemStats.LastGC); elapsed < w.silenceNs { if elapsed := now - int64(input.MemStats.LastGC); elapsed < w.silenceNs {
input.Logger.Debugf("watermark policy: silenced")
return false return false
} }
} }
@ -104,6 +113,7 @@ func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) {
// we are above our current watermark, but if this is the last watermark and // we are above our current watermark, but if this is the last watermark and
// we've already fired, suppress the firing. // we've already fired, suppress the firing.
if w.currIdx == w.lastIdx && w.firedLast { if w.currIdx == w.lastIdx && w.firedLast {
input.Logger.Debugf("watermark policy: last watermark already fired; skipping")
return false return false
} }
@ -113,9 +123,12 @@ func (w *WatermarkPolicy) Evaluate(input PolicyInput) (trigger bool) {
if actual >= w.thresholds[w.lastIdx] { if actual >= w.thresholds[w.lastIdx] {
w.currIdx = w.lastIdx w.currIdx = w.lastIdx
w.firedLast = true w.firedLast = true
input.Logger.Infof("watermark policy triggering: %f watermark surpassed", w.Watermarks[w.currIdx])
return true return true
} }
input.Logger.Infof("watermark policy triggering: %f watermark surpassed", w.Watermarks[w.currIdx])
// we are triggering; update the current watermark, upscaling it to the // we are triggering; update the current watermark, upscaling it to the
// next threshold. // next threshold.
for w.currIdx < w.lastIdx { for w.currIdx < w.lastIdx {

View File

@ -1,6 +1,8 @@
package watchdog package watchdog
import ( import (
"log"
"os"
"runtime" "runtime"
"testing" "testing"
"time" "time"
@ -16,6 +18,8 @@ var (
secondWatermark = 0.75 secondWatermark = 0.75
thirdWatermark = 0.80 thirdWatermark = 0.80
emergencyWatermark = 0.90 emergencyWatermark = 0.90
logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true}
) )
func TestProgressiveWatermarksSystem(t *testing.T) { func TestProgressiveWatermarksSystem(t *testing.T) {
@ -28,6 +32,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) {
} }
require.False(t, p.Evaluate(PolicyInput{ require.False(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*firstWatermark) - 1}, SysStats: &gosigar.Mem{ActualUsed: uint64(float64(limit)*firstWatermark) - 1},
@ -35,6 +40,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) {
// trigger the first watermark. // trigger the first watermark.
require.True(t, p.Evaluate(PolicyInput{ require.True(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())},
@ -44,6 +50,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) {
// this won't fire because we're still on the same watermark. // this won't fire because we're still on the same watermark.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
require.False(t, p.Evaluate(PolicyInput{ require.False(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())},
@ -53,6 +60,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) {
// now let's move to the second watermark. // now let's move to the second watermark.
require.True(t, p.Evaluate(PolicyInput{ require.True(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())}, MemStats: &runtime.MemStats{LastGC: uint64(time.Now().UnixNano())},
@ -62,6 +70,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) {
// this won't fire because we're still on the same watermark. // this won't fire because we're still on the same watermark.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
require.False(t, p.Evaluate(PolicyInput{ require.False(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0)}, MemStats: &runtime.MemStats{LastGC: uint64(0)},
@ -71,6 +80,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) {
// now let's move to the third and last watermark. // now let's move to the third and last watermark.
require.True(t, p.Evaluate(PolicyInput{ require.True(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0)}, MemStats: &runtime.MemStats{LastGC: uint64(0)},
@ -80,6 +90,7 @@ func TestProgressiveWatermarksSystem(t *testing.T) {
// this won't fire because we're still on the same watermark. // this won't fire because we're still on the same watermark.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
require.False(t, p.Evaluate(PolicyInput{ require.False(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0)}, MemStats: &runtime.MemStats{LastGC: uint64(0)},
@ -99,6 +110,7 @@ func TestProgressiveWatermarksHeap(t *testing.T) {
// now back up step by step, check that all of them fire. // now back up step by step, check that all of them fire.
for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} { for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} {
require.True(t, p.Evaluate(PolicyInput{ require.True(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeHeap, Scope: ScopeHeap,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0), HeapAlloc: uint64(float64(limit) * wm)}, MemStats: &runtime.MemStats{LastGC: uint64(0), HeapAlloc: uint64(float64(limit) * wm)},
@ -117,6 +129,7 @@ func TestDownscalingWatermarks_Reentrancy(t *testing.T) {
// crank all the way to the top. // crank all the way to the top.
require.True(t, p.Evaluate(PolicyInput{ require.True(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0)}, MemStats: &runtime.MemStats{LastGC: uint64(0)},
@ -126,6 +139,7 @@ func TestDownscalingWatermarks_Reentrancy(t *testing.T) {
// now back down, checking that none of the boundaries fire. // now back down, checking that none of the boundaries fire.
for _, wm := range []float64{thirdWatermark, secondWatermark, firstWatermark} { for _, wm := range []float64{thirdWatermark, secondWatermark, firstWatermark} {
require.False(t, p.Evaluate(PolicyInput{ require.False(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0)}, MemStats: &runtime.MemStats{LastGC: uint64(0)},
@ -136,6 +150,7 @@ func TestDownscalingWatermarks_Reentrancy(t *testing.T) {
// now back up step by step, check that all of them fire. // now back up step by step, check that all of them fire.
for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} { for _, wm := range []float64{firstWatermark, secondWatermark, thirdWatermark} {
require.True(t, p.Evaluate(PolicyInput{ require.True(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0)}, MemStats: &runtime.MemStats{LastGC: uint64(0)},
@ -146,6 +161,7 @@ func TestDownscalingWatermarks_Reentrancy(t *testing.T) {
// check the top does not fire because it already fired. // check the top does not fire because it already fired.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
require.False(t, p.Evaluate(PolicyInput{ require.False(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0)}, MemStats: &runtime.MemStats{LastGC: uint64(0)},
@ -167,6 +183,7 @@ func TestEmergencyWatermark(t *testing.T) {
// every tick triggers, even within the silence period. // every tick triggers, even within the silence period.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
require.True(t, p.Evaluate(PolicyInput{ require.True(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())},
@ -185,6 +202,7 @@ func TestJumpWatermark(t *testing.T) {
// check that jumping to the top only fires once. // check that jumping to the top only fires once.
require.True(t, p.Evaluate(PolicyInput{ require.True(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0)}, MemStats: &runtime.MemStats{LastGC: uint64(0)},
@ -193,6 +211,7 @@ func TestJumpWatermark(t *testing.T) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
require.False(t, p.Evaluate(PolicyInput{ require.False(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0)}, MemStats: &runtime.MemStats{LastGC: uint64(0)},
@ -221,6 +240,7 @@ func TestSilencePeriod(t *testing.T) {
// going above the first threshold, but within silencing period, so nothing happens. // going above the first threshold, but within silencing period, so nothing happens.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
require.False(t, p.Evaluate(PolicyInput{ require.False(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())},
@ -232,6 +252,7 @@ func TestSilencePeriod(t *testing.T) {
clk.Add(time.Minute) clk.Add(time.Minute)
require.True(t, p.Evaluate(PolicyInput{ require.True(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: 0}, MemStats: &runtime.MemStats{LastGC: 0},
@ -240,6 +261,7 @@ func TestSilencePeriod(t *testing.T) {
// but not the second time. // but not the second time.
require.False(t, p.Evaluate(PolicyInput{ require.False(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: 0}, MemStats: &runtime.MemStats{LastGC: 0},
@ -248,6 +270,7 @@ func TestSilencePeriod(t *testing.T) {
// now let's go up inside the silencing period, nothing happens. // now let's go up inside the silencing period, nothing happens.
require.False(t, p.Evaluate(PolicyInput{ require.False(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())}, MemStats: &runtime.MemStats{LastGC: uint64(clk.Now().UnixNano())},
@ -256,6 +279,7 @@ func TestSilencePeriod(t *testing.T) {
// same thing, outside the silencing period should trigger. // same thing, outside the silencing period should trigger.
require.True(t, p.Evaluate(PolicyInput{ require.True(t, p.Evaluate(PolicyInput{
Logger: logger,
Scope: ScopeSystem, Scope: ScopeSystem,
Limit: limit, Limit: limit,
MemStats: &runtime.MemStats{LastGC: uint64(0)}, MemStats: &runtime.MemStats{LastGC: uint64(0)},