From 31d951f3704bbbc8e8c09a867f7766eba91fe432 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 19 Jan 2021 19:38:46 +0000 Subject: [PATCH] implement automatic heapdumps when usage is above threshold. A heapdump will be captured when the usage trespasses the threshold. Staying above the threshold won't trigger another heapdump. If the usage goes down, then back up, that is considered another "episode" to be captured in a heapdump. This feature is driven by three parameters: * HeapdumpDir: the directory where the watchdog will write the heapdump. It will be created if it doesn't exist upon initialization. An error when creating the dir will not prevent heapdog initialization; it will just disable the heapdump capture feature. If zero-valued, the feature is disabled. Heapdumps will be written to path: /.heap. * HeapdumpMaxCaptures: sets the maximum amount of heapdumps a process will generate. This limits the amount of episodes that will be captured, in case the utilization climbs repeatedly over the threshold. By default, it is 10. * HeapdumpThreshold: sets the utilization threshold that will trigger a heap dump to be taken automatically. A zero value disables this feature. By default, it is disabled. --- adaptive_test.go | 14 +++--- watchdog.go | 91 +++++++++++++++++++++++++++++++++++++- watchdog_linux.go | 2 +- watchdog_test.go | 106 ++++++++++++++++++++++++++++++++++++++++++--- watermarks.go | 2 +- watermarks_test.go | 16 +++---- 6 files changed, 206 insertions(+), 25 deletions(-) diff --git a/adaptive_test.go b/adaptive_test.go index c696bcc..bdacc83 100644 --- a/adaptive_test.go +++ b/adaptive_test.go @@ -11,18 +11,18 @@ func TestAdaptivePolicy(t *testing.T) { clk := clock.NewMock() Clock = clk - p, err := NewAdaptivePolicy(0.5)(limit) + p, err := NewAdaptivePolicy(0.5)(limit64MiB) require.NoError(t, err) // at zero; next = 50%. next := p.Evaluate(UtilizationSystem, 0) - require.EqualValues(t, limit/2, next) + require.EqualValues(t, limit64MiB/2, next) // at half; next = 75%. - next = p.Evaluate(UtilizationSystem, limit/2) - require.EqualValues(t, 3*(limit/4), next) + next = p.Evaluate(UtilizationSystem, limit64MiB/2) + require.EqualValues(t, 3*(limit64MiB/4), next) - // at limit. - next = p.Evaluate(UtilizationSystem, limit) - require.EqualValues(t, limit, next) + // at limit64MiB. + next = p.Evaluate(UtilizationSystem, limit64MiB) + require.EqualValues(t, limit64MiB, next) } diff --git a/watchdog.go b/watchdog.go index ab4e504..e5f3989 100644 --- a/watchdog.go +++ b/watchdog.go @@ -5,6 +5,8 @@ import ( "fmt" "log" "math" + "os" + "path/filepath" "runtime" "runtime/debug" "sync" @@ -35,6 +37,25 @@ var ( // NotifyGC, if non-nil, will be called when a GC has happened. NotifyGC func() = func() {} + + // HeapdumpThreshold sets the utilization threshold that will trigger a + // heap dump to be taken automatically. A zero value disables this feature. + // By default, it is disabled. + HeapdumpThreshold float64 + + // HeapdumpMaxCaptures sets the maximum amount of heapdumps a process will generate. + // This limits the amount of episodes that will be captured, in case the + // utilization climbs repeatedly over the threshold. By default, it is 10. + HeapdumpMaxCaptures = uint(10) + + // HeapdumpDir is the directory where the watchdog will write the heapdump. + // It will be created if it doesn't exist upon initialization. An error when + // creating the dir will not prevent heapdog initialization; it will just + // disable the heapdump capture feature. If zero-valued, the feature is + // disabled. + // + // Heapdumps will be written to path /.heap. + HeapdumpDir string ) var ( @@ -97,6 +118,9 @@ var _watchdog struct { scope UtilizationType + hdleft uint // tracks the amount of heapdumps left. + hdcurr bool // tracks whether a heapdump has already been taken for this episode. + closing chan struct{} wg sync.WaitGroup } @@ -171,6 +195,8 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) // recompute the next trigger. memstatsFn(&memstats) + maybeCaptureHeapdump(memstats.HeapAlloc, limit) + // heapMarked is the amount of heap that was marked as live by GC. // it is inferred from our current GOGC and the new target picked. heapMarked := uint64(float64(memstats.NextGC) / (1 + float64(currGOGC)/100)) @@ -236,7 +262,7 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) _watchdog.wg.Add(1) var sysmem gosigar.Mem - go pollingWatchdog(policy, frequency, func() (uint64, error) { + go pollingWatchdog(policy, frequency, limit, func() (uint64, error) { if err := sysmemFn(&sysmem); err != nil { return 0, err } @@ -251,7 +277,7 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) // usage is greater or equal to the threshold at the time, it forces GC. // usageFn is guaranteed to be called serially, so no locking should be // necessary. -func pollingWatchdog(policy Policy, frequency time.Duration, usageFn func() (uint64, error)) { +func pollingWatchdog(policy Policy, frequency time.Duration, limit uint64, usageFn func() (uint64, error)) { defer _watchdog.wg.Done() gcTriggered := make(chan struct{}, 16) @@ -295,6 +321,10 @@ func pollingWatchdog(policy Policy, frequency time.Duration, usageFn func() (uin Logger.Warnf("failed to obtain memory utilizationstats; err: %s", err) continue } + + // evaluate if a heapdump needs to be captured. + maybeCaptureHeapdump(usage, limit) + if usage < threshold { // nothing to do. continue @@ -378,6 +408,9 @@ func start(scope UtilizationType) error { _watchdog.state = stateRunning _watchdog.scope = scope _watchdog.closing = make(chan struct{}) + + initHeapdumpCapture() + return nil } @@ -393,3 +426,57 @@ func stop() { _watchdog.wg.Wait() _watchdog.state = stateUnstarted } + +func initHeapdumpCapture() { + if HeapdumpDir == "" || HeapdumpThreshold <= 0 { + Logger.Debugf("heapdump capture disabled") + return + } + if HeapdumpThreshold >= 1 { + Logger.Warnf("failed to initialize heapdump capture: threshold must be 0 < t < 1") + return + } + if fi, err := os.Stat(HeapdumpDir); os.IsNotExist(err) { + if err := os.MkdirAll(HeapdumpDir, 0777); err != nil { + Logger.Warnf("failed to initialize heapdump capture: failed to create dir: %s; err: %s", HeapdumpDir, err) + return + } + } else if err != nil { + Logger.Warnf("failed to initialize heapdump capture: failed to stat path: %s; err: %s", HeapdumpDir, err) + return + } else if !fi.IsDir() { + Logger.Warnf("failed to initialize heapdump capture: path exists but is not a directory: %s", HeapdumpDir) + return + } + // all good, set the amount of heapdump captures left. + _watchdog.hdleft = HeapdumpMaxCaptures + Logger.Infof("initialized heap dump capture; threshold: %f; max captures: %d; dir: %s", HeapdumpThreshold, HeapdumpMaxCaptures, HeapdumpDir) +} + +func maybeCaptureHeapdump(usage, limit uint64) { + if _watchdog.hdleft <= 0 { + // nothing to do; no captures remaining (or captures disabled), or + // already captured a heapdump for this episode. + return + } + if float64(usage)/float64(limit) < HeapdumpThreshold { + // we are below the threshold, reset the hdcurr flag. + _watchdog.hdcurr = false + return + } + // we are above the threshold. + if _watchdog.hdcurr { + return // we've already captured this episode, skip. + } + path := filepath.Join(HeapdumpDir, time.Now().Format(time.RFC3339Nano)+".heap") + file, err := os.Create(path) + if err != nil { + Logger.Warnf("failed to write heapdump; failed to create file in path %s; err: %s", path, err) + return + } + defer file.Close() + debug.WriteHeapDump(file.Fd()) + Logger.Infof("heap dump captured; path: %s", path) + _watchdog.hdcurr = true + _watchdog.hdleft-- +} diff --git a/watchdog_linux.go b/watchdog_linux.go index a50078a..a86c3ed 100644 --- a/watchdog_linux.go +++ b/watchdog_linux.go @@ -59,7 +59,7 @@ func CgroupDriven(frequency time.Duration, policyCtor PolicyCtor) (err error, st } _watchdog.wg.Add(1) - go pollingWatchdog(policy, frequency, func() (uint64, error) { + go pollingWatchdog(policy, frequency, limit, func() (uint64, error) { stat, err := cgroup.Stat() if err != nil { return 0, err diff --git a/watchdog_test.go b/watchdog_test.go index 9c579e0..c9f1a50 100644 --- a/watchdog_test.go +++ b/watchdog_test.go @@ -2,8 +2,10 @@ package watchdog import ( "fmt" + "io/ioutil" "log" "os" + "path/filepath" "runtime" "runtime/debug" "strconv" @@ -47,7 +49,7 @@ func skipIfNotIsolated(t *testing.T) { } var ( - limit uint64 = 64 << 20 // 64MiB. + limit64MiB uint64 = 64 << 20 // 64MiB. ) func TestControl_Isolated(t *testing.T) { @@ -101,7 +103,7 @@ func TestHeapDriven_Isolated(t *testing.T) { } // limit is 64MiB. - err, stopFn := HeapDriven(limit, NewAdaptivePolicy(0.5)) + err, stopFn := HeapDriven(limit64MiB, NewAdaptivePolicy(0.5)) require.NoError(t, err) defer stopFn() @@ -138,7 +140,7 @@ func TestSystemDriven_Isolated(t *testing.T) { } // limit is 64MiB. - err, stopFn := SystemDriven(limit, 5*time.Second, NewAdaptivePolicy(0.5)) + err, stopFn := SystemDriven(limit64MiB, 5*time.Second, NewAdaptivePolicy(0.5)) require.NoError(t, err) defer stopFn() @@ -155,20 +157,20 @@ func TestSystemDriven_Isolated(t *testing.T) { require.Len(t, notifyCh, 0) // no GC has taken place. // second tick; used = just over 50%; will trigger GC. - actualUsed = (limit / 2) + 1 + actualUsed = (limit64MiB / 2) + 1 clk.Add(5 * time.Second) time.Sleep(200 * time.Millisecond) require.Len(t, notifyCh, 1) <-notifyCh // third tick; just below 75%; no GC. - actualUsed = uint64(float64(limit)*0.75) - 1 + actualUsed = uint64(float64(limit64MiB)*0.75) - 1 clk.Add(5 * time.Second) time.Sleep(200 * time.Millisecond) require.Len(t, notifyCh, 0) // fourth tick; 75% exactly; will trigger GC. - actualUsed = uint64(float64(limit)*0.75) + 1 + actualUsed = uint64(float64(limit64MiB)*0.75) + 1 clk.Add(5 * time.Second) time.Sleep(200 * time.Millisecond) require.Len(t, notifyCh, 1) @@ -178,3 +180,95 @@ func TestSystemDriven_Isolated(t *testing.T) { runtime.ReadMemStats(&ms) require.GreaterOrEqual(t, ms.NumForcedGC, uint32(2)) } + +// TestHeapdumpCapture tests that heap dumps are captured appropriately. +func TestHeapdumpCapture(t *testing.T) { + debug.SetGCPercent(100) + + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + + t.Cleanup(func() { + _ = os.RemoveAll(dir) + }) + + assertFileCount := func(expected int) { + glob, err := filepath.Glob(filepath.Join(dir, "*")) + require.NoError(t, err) + require.Len(t, glob, expected) + } + + HeapdumpDir = dir + HeapdumpThreshold = 0.5 + HeapdumpMaxCaptures = 5 + + // mock clock. + clk := clock.NewMock() + Clock = clk + + // mock the system reporting. + var actualUsed uint64 + sysmemFn = func(g *gosigar.Mem) error { + g.ActualUsed = actualUsed + return nil + } + + // init a system driven watchdog. + err, stopFn := SystemDriven(limit64MiB, 5*time.Second, NewAdaptivePolicy(0.5)) + require.NoError(t, err) + defer stopFn() + time.Sleep(200 * time.Millisecond) // give time for the watchdog to init. + + // first tick; used = 0. + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + assertFileCount(0) + + // second tick; used = just over 50%; will trigger a heapdump. + actualUsed = (limit64MiB / 2) + 1 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + assertFileCount(1) + + // third tick; continues above 50%; same episode, no heapdump. + actualUsed = (limit64MiB / 2) + 10 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + assertFileCount(1) + + // fourth tick; below 50%; this resets the episodic flag. + actualUsed = limit64MiB / 3 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + assertFileCount(1) + + // fifth tick; above 50%; this triggers a new heapdump. + actualUsed = (limit64MiB / 2) + 1 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + assertFileCount(2) + + for i := 0; i < 20; i++ { + // below 50%; this resets the episodic flag. + actualUsed = limit64MiB / 3 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + + // above 50%; this triggers a new heapdump. + actualUsed = (limit64MiB / 2) + 1 + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + } + + assertFileCount(5) // we only generated 5 heap dumps even though we had more episodes. + + // verify that heap dump file sizes aren't zero. + glob, err := filepath.Glob(filepath.Join(dir, "*")) + require.NoError(t, err) + for _, f := range glob { + fi, err := os.Stat(f) + require.NoError(t, err) + require.NotZero(t, fi.Size()) + } + +} diff --git a/watermarks.go b/watermarks.go index cced767..ff21bb8 100644 --- a/watermarks.go +++ b/watermarks.go @@ -29,7 +29,7 @@ type watermarkPolicy struct { var _ Policy = (*watermarkPolicy)(nil) func (w *watermarkPolicy) Evaluate(_ UtilizationType, used uint64) (next uint64) { - Logger.Debugf("watermark policy: evaluating; utilization: %d/%d (used/limit)", used, w.limit) + Logger.Debugf("watermark policy: evaluating; utilization: %d/%d (used/limit64MiB)", used, w.limit) var i int for ; i < len(w.thresholds); i++ { t := w.thresholds[i] diff --git a/watermarks_test.go b/watermarks_test.go index d3ee3ee..9c3e511 100644 --- a/watermarks_test.go +++ b/watermarks_test.go @@ -12,7 +12,7 @@ var ( thresholds = func() []uint64 { var ret []uint64 for _, w := range watermarks { - ret = append(ret, uint64(float64(limit)*w)) + ret = append(ret, uint64(float64(limit64MiB)*w)) } return ret }() @@ -22,7 +22,7 @@ func TestProgressiveWatermarks(t *testing.T) { clk := clock.NewMock() Clock = clk - p, err := NewWatermarkPolicy(watermarks...)(limit) + p, err := NewWatermarkPolicy(watermarks...)(limit64MiB) require.NoError(t, err) // at zero @@ -30,25 +30,25 @@ func TestProgressiveWatermarks(t *testing.T) { require.EqualValues(t, thresholds[0], next) // before the watermark. - next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])-1) + next = p.Evaluate(UtilizationSystem, uint64(float64(limit64MiB)*watermarks[0])-1) require.EqualValues(t, thresholds[0], next) // exactly at the watermark; gives us the next watermark, as the watchdodg would've // taken care of triggering the first watermark. - next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])) + next = p.Evaluate(UtilizationSystem, uint64(float64(limit64MiB)*watermarks[0])) require.EqualValues(t, thresholds[1], next) // after the watermark gives us the next watermark. - next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[0])+1) + next = p.Evaluate(UtilizationSystem, uint64(float64(limit64MiB)*watermarks[0])+1) require.EqualValues(t, thresholds[1], next) // last watermark; disable the policy. - next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2])) + next = p.Evaluate(UtilizationSystem, uint64(float64(limit64MiB)*watermarks[2])) require.EqualValues(t, PolicyTempDisabled, next) - next = p.Evaluate(UtilizationSystem, uint64(float64(limit)*watermarks[2]+1)) + next = p.Evaluate(UtilizationSystem, uint64(float64(limit64MiB)*watermarks[2]+1)) require.EqualValues(t, PolicyTempDisabled, next) - next = p.Evaluate(UtilizationSystem, limit) + next = p.Evaluate(UtilizationSystem, limit64MiB) require.EqualValues(t, PolicyTempDisabled, next) }