implement automatic heapdumps when usage is above threshold.

A heapdump will be captured when the usage trespasses the threshold.
Staying above the threshold won't trigger another heapdump.
If the usage goes down, then back up, that is considered another
"episode" to be captured in a heapdump.

This feature is driven by three parameters:

* HeapdumpDir: the directory where the watchdog will write the heapdump.
  It will be created if it doesn't exist upon initialization. An error when
  creating the dir will not prevent heapdog initialization; it will just
  disable the heapdump capture feature.

  If zero-valued, the feature is disabled. Heapdumps will be written to path:
  <HeapdumpDir>/<RFC3339Nano formatted timestamp>.heap.

* HeapdumpMaxCaptures: sets the maximum amount of heapdumps a process will
  generate. This limits the amount of episodes that will be captured, in case
  the utilization climbs repeatedly over the threshold. By default, it is 10.

* HeapdumpThreshold: sets the utilization threshold that will trigger a
  heap dump to be taken automatically. A zero value disables this feature.
  By default, it is disabled.
This commit is contained in:
Raúl Kripalani 2021-01-19 19:38:46 +00:00
parent 13cc66ee4c
commit 31d951f370
6 changed files with 206 additions and 25 deletions

View File

@ -11,18 +11,18 @@ func TestAdaptivePolicy(t *testing.T) {
clk := clock.NewMock()
Clock = clk
p, err := NewAdaptivePolicy(0.5)(limit)
p, err := NewAdaptivePolicy(0.5)(limit64MiB)
require.NoError(t, err)
// at zero; next = 50%.
next := p.Evaluate(UtilizationSystem, 0)
require.EqualValues(t, limit/2, next)
require.EqualValues(t, limit64MiB/2, next)
// at half; next = 75%.
next = p.Evaluate(UtilizationSystem, limit/2)
require.EqualValues(t, 3*(limit/4), next)
next = p.Evaluate(UtilizationSystem, limit64MiB/2)
require.EqualValues(t, 3*(limit64MiB/4), next)
// at limit.
next = p.Evaluate(UtilizationSystem, limit)
require.EqualValues(t, limit, next)
// at limit64MiB.
next = p.Evaluate(UtilizationSystem, limit64MiB)
require.EqualValues(t, limit64MiB, next)
}

View File

@ -5,6 +5,8 @@ import (
"fmt"
"log"
"math"
"os"
"path/filepath"
"runtime"
"runtime/debug"
"sync"
@ -35,6 +37,25 @@ var (
// NotifyGC, if non-nil, will be called when a GC has happened.
NotifyGC func() = func() {}
// HeapdumpThreshold sets the utilization threshold that will trigger a
// heap dump to be taken automatically. A zero value disables this feature.
// By default, it is disabled.
HeapdumpThreshold float64
// HeapdumpMaxCaptures sets the maximum amount of heapdumps a process will generate.
// This limits the amount of episodes that will be captured, in case the
// utilization climbs repeatedly over the threshold. By default, it is 10.
HeapdumpMaxCaptures = uint(10)
// HeapdumpDir is the directory where the watchdog will write the heapdump.
// It will be created if it doesn't exist upon initialization. An error when
// creating the dir will not prevent heapdog initialization; it will just
// disable the heapdump capture feature. If zero-valued, the feature is
// disabled.
//
// Heapdumps will be written to path <HeapdumpDir>/<RFC3339Nano formatted timestamp>.heap.
HeapdumpDir string
)
var (
@ -97,6 +118,9 @@ var _watchdog struct {
scope UtilizationType
hdleft uint // tracks the amount of heapdumps left.
hdcurr bool // tracks whether a heapdump has already been taken for this episode.
closing chan struct{}
wg sync.WaitGroup
}
@ -171,6 +195,8 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
// recompute the next trigger.
memstatsFn(&memstats)
maybeCaptureHeapdump(memstats.HeapAlloc, limit)
// heapMarked is the amount of heap that was marked as live by GC.
// it is inferred from our current GOGC and the new target picked.
heapMarked := uint64(float64(memstats.NextGC) / (1 + float64(currGOGC)/100))
@ -236,7 +262,7 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor)
_watchdog.wg.Add(1)
var sysmem gosigar.Mem
go pollingWatchdog(policy, frequency, func() (uint64, error) {
go pollingWatchdog(policy, frequency, limit, func() (uint64, error) {
if err := sysmemFn(&sysmem); err != nil {
return 0, err
}
@ -251,7 +277,7 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor)
// usage is greater or equal to the threshold at the time, it forces GC.
// usageFn is guaranteed to be called serially, so no locking should be
// necessary.
func pollingWatchdog(policy Policy, frequency time.Duration, usageFn func() (uint64, error)) {
func pollingWatchdog(policy Policy, frequency time.Duration, limit uint64, usageFn func() (uint64, error)) {
defer _watchdog.wg.Done()
gcTriggered := make(chan struct{}, 16)
@ -295,6 +321,10 @@ func pollingWatchdog(policy Policy, frequency time.Duration, usageFn func() (uin
Logger.Warnf("failed to obtain memory utilizationstats; err: %s", err)
continue
}
// evaluate if a heapdump needs to be captured.
maybeCaptureHeapdump(usage, limit)
if usage < threshold {
// nothing to do.
continue
@ -378,6 +408,9 @@ func start(scope UtilizationType) error {
_watchdog.state = stateRunning
_watchdog.scope = scope
_watchdog.closing = make(chan struct{})
initHeapdumpCapture()
return nil
}
@ -393,3 +426,57 @@ func stop() {
_watchdog.wg.Wait()
_watchdog.state = stateUnstarted
}
func initHeapdumpCapture() {
if HeapdumpDir == "" || HeapdumpThreshold <= 0 {
Logger.Debugf("heapdump capture disabled")
return
}
if HeapdumpThreshold >= 1 {
Logger.Warnf("failed to initialize heapdump capture: threshold must be 0 < t < 1")
return
}
if fi, err := os.Stat(HeapdumpDir); os.IsNotExist(err) {
if err := os.MkdirAll(HeapdumpDir, 0777); err != nil {
Logger.Warnf("failed to initialize heapdump capture: failed to create dir: %s; err: %s", HeapdumpDir, err)
return
}
} else if err != nil {
Logger.Warnf("failed to initialize heapdump capture: failed to stat path: %s; err: %s", HeapdumpDir, err)
return
} else if !fi.IsDir() {
Logger.Warnf("failed to initialize heapdump capture: path exists but is not a directory: %s", HeapdumpDir)
return
}
// all good, set the amount of heapdump captures left.
_watchdog.hdleft = HeapdumpMaxCaptures
Logger.Infof("initialized heap dump capture; threshold: %f; max captures: %d; dir: %s", HeapdumpThreshold, HeapdumpMaxCaptures, HeapdumpDir)
}
func maybeCaptureHeapdump(usage, limit uint64) {
if _watchdog.hdleft <= 0 {
// nothing to do; no captures remaining (or captures disabled), or
// already captured a heapdump for this episode.
return
}
if float64(usage)/float64(limit) < HeapdumpThreshold {
// we are below the threshold, reset the hdcurr flag.
_watchdog.hdcurr = false
return
}
// we are above the threshold.
if _watchdog.hdcurr {
return // we've already captured this episode, skip.
}
path := filepath.Join(HeapdumpDir, time.Now().Format(time.RFC3339Nano)+".heap")
file, err := os.Create(path)
if err != nil {
Logger.Warnf("failed to write heapdump; failed to create file in path %s; err: %s", path, err)
return
}
defer file.Close()
debug.WriteHeapDump(file.Fd())
Logger.Infof("heap dump captured; path: %s", path)
_watchdog.hdcurr = true
_watchdog.hdleft--
}

View File

@ -59,7 +59,7 @@ func CgroupDriven(frequency time.Duration, policyCtor PolicyCtor) (err error, st
}
_watchdog.wg.Add(1)
go pollingWatchdog(policy, frequency, func() (uint64, error) {
go pollingWatchdog(policy, frequency, limit, func() (uint64, error) {
stat, err := cgroup.Stat()
if err != nil {
return 0, err

View File

@ -2,8 +2,10 @@ package watchdog
import (
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"runtime"
"runtime/debug"
"strconv"
@ -47,7 +49,7 @@ func skipIfNotIsolated(t *testing.T) {
}
var (
limit uint64 = 64 << 20 // 64MiB.
limit64MiB uint64 = 64 << 20 // 64MiB.
)
func TestControl_Isolated(t *testing.T) {
@ -101,7 +103,7 @@ func TestHeapDriven_Isolated(t *testing.T) {
}
// limit is 64MiB.
err, stopFn := HeapDriven(limit, NewAdaptivePolicy(0.5))
err, stopFn := HeapDriven(limit64MiB, NewAdaptivePolicy(0.5))
require.NoError(t, err)
defer stopFn()
@ -138,7 +140,7 @@ func TestSystemDriven_Isolated(t *testing.T) {
}
// limit is 64MiB.
err, stopFn := SystemDriven(limit, 5*time.Second, NewAdaptivePolicy(0.5))
err, stopFn := SystemDriven(limit64MiB, 5*time.Second, NewAdaptivePolicy(0.5))
require.NoError(t, err)
defer stopFn()
@ -155,20 +157,20 @@ func TestSystemDriven_Isolated(t *testing.T) {
require.Len(t, notifyCh, 0) // no GC has taken place.
// second tick; used = just over 50%; will trigger GC.
actualUsed = (limit / 2) + 1
actualUsed = (limit64MiB / 2) + 1
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
require.Len(t, notifyCh, 1)
<-notifyCh
// third tick; just below 75%; no GC.
actualUsed = uint64(float64(limit)*0.75) - 1
actualUsed = uint64(float64(limit64MiB)*0.75) - 1
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
require.Len(t, notifyCh, 0)
// fourth tick; 75% exactly; will trigger GC.
actualUsed = uint64(float64(limit)*0.75) + 1
actualUsed = uint64(float64(limit64MiB)*0.75) + 1
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
require.Len(t, notifyCh, 1)
@ -178,3 +180,95 @@ func TestSystemDriven_Isolated(t *testing.T) {
runtime.ReadMemStats(&ms)
require.GreaterOrEqual(t, ms.NumForcedGC, uint32(2))
}
// TestHeapdumpCapture tests that heap dumps are captured appropriately.
func TestHeapdumpCapture(t *testing.T) {
debug.SetGCPercent(100)
dir, err := ioutil.TempDir("", "")
require.NoError(t, err)
t.Cleanup(func() {
_ = os.RemoveAll(dir)
})
assertFileCount := func(expected int) {
glob, err := filepath.Glob(filepath.Join(dir, "*"))
require.NoError(t, err)
require.Len(t, glob, expected)
}
HeapdumpDir = dir
HeapdumpThreshold = 0.5
HeapdumpMaxCaptures = 5
// mock clock.
clk := clock.NewMock()
Clock = clk
// mock the system reporting.
var actualUsed uint64
sysmemFn = func(g *gosigar.Mem) error {
g.ActualUsed = actualUsed
return nil
}
// init a system driven watchdog.
err, stopFn := SystemDriven(limit64MiB, 5*time.Second, NewAdaptivePolicy(0.5))
require.NoError(t, err)
defer stopFn()
time.Sleep(200 * time.Millisecond) // give time for the watchdog to init.
// first tick; used = 0.
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
assertFileCount(0)
// second tick; used = just over 50%; will trigger a heapdump.
actualUsed = (limit64MiB / 2) + 1
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
assertFileCount(1)
// third tick; continues above 50%; same episode, no heapdump.
actualUsed = (limit64MiB / 2) + 10
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
assertFileCount(1)
// fourth tick; below 50%; this resets the episodic flag.
actualUsed = limit64MiB / 3
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
assertFileCount(1)
// fifth tick; above 50%; this triggers a new heapdump.
actualUsed = (limit64MiB / 2) + 1
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
assertFileCount(2)
for i := 0; i < 20; i++ {
// below 50%; this resets the episodic flag.
actualUsed = limit64MiB / 3
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
// above 50%; this triggers a new heapdump.
actualUsed = (limit64MiB / 2) + 1
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
}
assertFileCount(5) // we only generated 5 heap dumps even though we had more episodes.
// verify that heap dump file sizes aren't zero.
glob, err := filepath.Glob(filepath.Join(dir, "*"))
require.NoError(t, err)
for _, f := range glob {
fi, err := os.Stat(f)
require.NoError(t, err)
require.NotZero(t, fi.Size())
}
}

View File

@ -29,7 +29,7 @@ type watermarkPolicy struct {
var _ Policy = (*watermarkPolicy)(nil)
func (w *watermarkPolicy) Evaluate(_ UtilizationType, used uint64) (next uint64) {
Logger.Debugf("watermark policy: evaluating; utilization: %d/%d (used/limit)", used, w.limit)
Logger.Debugf("watermark policy: evaluating; utilization: %d/%d (used/limit64MiB)", used, w.limit)
var i int
for ; i < len(w.thresholds); i++ {
t := w.thresholds[i]

View File

@ -12,7 +12,7 @@ var (
thresholds = func() []uint64 {
var ret []uint64
for _, w := range watermarks {
ret = append(ret, uint64(float64(limit)*w))
ret = append(ret, uint64(float64(limit64MiB)*w))
}
return ret
}()
@ -22,7 +22,7 @@ func TestProgressiveWatermarks(t *testing.T) {
clk := clock.NewMock()
Clock = clk
p, err := NewWatermarkPolicy(watermarks...)(limit)
p, err := NewWatermarkPolicy(watermarks...)(limit64MiB)
require.NoError(t, err)
// at zero
@ -30,25 +30,25 @@ func TestProgressiveWatermarks(t *testing.T) {
require.EqualValues(t, thresholds[0], next)
// before the watermark.
next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])-1)
next = p.Evaluate(UtilizationSystem, uint64(float64(limit64MiB)*watermarks[0])-1)
require.EqualValues(t, thresholds[0], next)
// exactly at the watermark; gives us the next watermark, as the watchdodg would've
// taken care of triggering the first watermark.
next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0]))
next = p.Evaluate(UtilizationSystem, uint64(float64(limit64MiB)*watermarks[0]))
require.EqualValues(t, thresholds[1], next)
// after the watermark gives us the next watermark.
next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])+1)
next = p.Evaluate(UtilizationSystem, uint64(float64(limit64MiB)*watermarks[0])+1)
require.EqualValues(t, thresholds[1], next)
// last watermark; disable the policy.
next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2]))
next = p.Evaluate(UtilizationSystem, uint64(float64(limit64MiB)*watermarks[2]))
require.EqualValues(t, PolicyTempDisabled, next)
next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2]+1))
next = p.Evaluate(UtilizationSystem, uint64(float64(limit64MiB)*watermarks[2]+1))
require.EqualValues(t, PolicyTempDisabled, next)
next = p.Evaluate(UtilizationSystem, limit)
next = p.Evaluate(UtilizationSystem, limit64MiB)
require.EqualValues(t, PolicyTempDisabled, next)
}