From 8676adea5ce34adc693880fcd35df08b0e91d6df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 18 Jan 2021 17:59:51 +0000 Subject: [PATCH] introduce cgroup-driven watchdog; refactor. This commit introduces the cgroup-driven watchdog. It can be initialized by calling watchdog.CgroupDriven(). This watchdog infers the limit from the process' cgroup, which is either derived from /proc/self/cgroup, or from the root cgroup if the PID == 1 (running in a container). Tests have been added/refactored to accommodate running locally and in a Docker container. Certain test cases now must be isolated from one another, to prevent side-effects from dirty go runtimes. A Makefile has been introduced to run all tests. --- .circleci/config.yml | 4 +- .dockerignore | 1 + Dockerfile.dlv | 22 +++++ Dockerfile.test | 10 ++ Makefile | 30 ++++++ README.md | 64 +++++++++--- doc.go | 40 ++++++++ go.mod | 1 + sys_linux.go | 26 ----- sys_other.go | 7 -- watchdog.go | 214 +++++++++++++++++++++-------------------- watchdog_linux.go | 73 ++++++++++++++ watchdog_linux_test.go | 125 ++++++++++++++++++++++++ watchdog_other.go | 13 +++ watchdog_other_test.go | 15 +++ watchdog_test.go | 70 ++++++++++---- 16 files changed, 537 insertions(+), 178 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile.dlv create mode 100644 Dockerfile.test create mode 100644 Makefile create mode 100644 doc.go delete mode 100644 sys_linux.go delete mode 100644 sys_other.go create mode 100644 watchdog_linux.go create mode 100644 watchdog_linux_test.go create mode 100644 watchdog_other.go create mode 100644 watchdog_other_test.go diff --git a/.circleci/config.yml b/.circleci/config.yml index b29b557..aaaa8bb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,6 +8,4 @@ jobs: working_directory: /go/src/github.com/{{ORG_NAME}}/{{REPO_NAME}} steps: - checkout - - - run: go get -v -t -d ./... - - run: go test -v ./... + - run: make diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..33ceb8f --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +Makefile \ No newline at end of file diff --git a/Dockerfile.dlv b/Dockerfile.dlv new file mode 100644 index 0000000..4ac9e77 --- /dev/null +++ b/Dockerfile.dlv @@ -0,0 +1,22 @@ +## This Dockerfile compiles the watchdog with delve support. It enables the tests +## to be debugged inside a container. +## +## Run with: +## docker run --memory=64MiB --memory-swap=64MiB -p 2345:2345 \ +## --listen=:2345 --headless=true --log=true \ +## --log-output=debugger,debuglineerr,gdbwire,lldbout,rpc \ +## --accept-multiclient --api-version=2 exec /root/watchdog.test +## +FROM golang:1.15.5 +WORKDIR /watchdog +COPY . . +RUN CGO_ENABLED=0 go get -ldflags "-s -w -extldflags '-static'" github.com/go-delve/delve/cmd/dlv +RUN CGO_ENABLED=0 go test -gcflags "all=-N -l" -c -o ./watchdog.test + +FROM alpine:latest +RUN apk --no-cache add ca-certificates +WORKDIR /root/ +COPY --from=0 /go/bin/dlv /dlv +COPY --from=0 /watchdog/watchdog.test . +ENTRYPOINT [ "/dlv" ] +EXPOSE 2345 \ No newline at end of file diff --git a/Dockerfile.test b/Dockerfile.test new file mode 100644 index 0000000..cd91efc --- /dev/null +++ b/Dockerfile.test @@ -0,0 +1,10 @@ +FROM golang:1.15.5 +WORKDIR /watchdog +COPY . . +RUN CGO_ENABLED=0 GOOS=linux go test -c -o watchdog.test + +FROM alpine:latest +RUN apk --no-cache add ca-certificates +WORKDIR /root/ +COPY --from=0 /watchdog/watchdog.test . +CMD ["/root/watchdog.test", "-test.v"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..c287408 --- /dev/null +++ b/Makefile @@ -0,0 +1,30 @@ +SHELL = /bin/bash + +.PHONY: test + +# these tests run in isolation by calling go test -run=... or the equivalent. +ISOLATED_TESTS = TestControl_Isolated \ + TestSystemDriven_Isolated \ + TestHeapDriven_Isolated \ + TestCgroupsDriven_Create_Isolated \ + TestCgroupsDriven_Docker_Isolated + +test: test-binary test-docker + +test-binary: + go test -v ./... # run all the non-isolated tests. + # foreach does not actually execute each iteration; it expands the text, and it's executed all at once + # that's why we use && true, to shorcircuit if a test fails. + $(foreach name,$(ISOLATED_TESTS),TEST_ISOLATED=1 go test -v -test.run=$(name) ./... && ) true + +test-docker: docker + docker run --memory=32MiB --memory-swap=32MiB -e TEST_DOCKER_MEMLIMIT=33554432 raulk/watchdog:latest + $(foreach name,$(ISOLATED_TESTS),docker run \ + --memory=32MiB --memory-swap=32MiB \ + -e TEST_ISOLATED=1 \ + -e TEST_DOCKER_MEMLIMIT=33554432 \ + raulk/watchdog:latest /root/watchdog.test -test.v -test.run=$(name) ./... && ) true + +docker: + docker build -f ./Dockerfile.test -t raulk/watchdog:latest . + diff --git a/README.md b/README.md index ec353e5..5f81e68 100644 --- a/README.md +++ b/README.md @@ -5,27 +5,59 @@ [![godocs](https://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square)](https://godoc.org/github.com/raulk/go-watchdog) [![build status](https://circleci.com/gh/raulk/go-watchdog.svg?style=svg)](https://circleci.com/gh/raulk/go-watchdog) -go-watchdog runs a singleton memory watchdog in the process, which watches -memory utilization and forces Go GC in accordance with a user-defined policy. +Package watchdog runs a singleton memory watchdog in the process, which +watches memory utilization and forces Go GC in accordance with a +user-defined policy. -There are two kinds of watchdog so far: +There three kinds of watchdogs: -* **heap-driven:** applies a limit to the heap, and obtains current usage through - `runtime.ReadMemStats()`. -* **system-driven:** applies a limit to the total system memory used, and obtains - current usage through [`elastic/go-sigar`](https://github.com/elastic/gosigar). +1. heap-driven (`watchdog.HeapDriven()`): applies a heap limit, adjusting GOGC + dynamically in accordance with the policy. +2. system-driven (`watchdog.SystemDriven()`): applies a limit to the total + system memory used, obtaining the current usage through elastic/go-sigar. +3. cgroups-driven (`watchdog.CgroupDriven()`): discovers the memory limit from + the cgroup of the process (derived from /proc/self/cgroup), or from the + root cgroup path if the PID == 1 (which indicates that the process is + running in a container). It uses the cgroup stats to obtain the + current usage. -A third process-driven watchdog that uses cgroups is underway. +The watchdog's behaviour is controlled by the policy, a pluggable function +that determines when to trigger GC based on the current utilization. This +library ships with two policies: -This library ships with two policies out of the box: +1. watermarks policy (`watchdog.NewWatermarkPolicy()`): runs GC at configured + watermarks of memory utilisation. +2. adaptive policy (`watchdog.NewAdaptivePolicy()`): runs GC when the current + usage surpasses a dynamically-set threshold. -* watermarks policy: runs GC at configured watermarks of system or heap memory - utilisation. -* adaptive policy: runs GC when the current usage surpasses a dynamically-set - threshold. - -You can easily build a custom policy tailored to the allocation patterns of your -program. +You can easily write a custom policy tailored to the allocation patterns of +your program. + +## Recommended way to set up the watchdog + +The recommended way to set up the watchdog is as follows, in descending order +of precedence. This logic assumes that the library supports setting a heap +limit through an environment variable (e.g. MYAPP_HEAP_MAX) or config key. + +1. If heap limit is set and legal, initialize a heap-driven watchdog. +2. Otherwise, try to use the cgroup-driven watchdog. If it succeeds, return. +3. Otherwise, try to initialize a system-driven watchdog. If it succeeds, return. +4. Watchdog initialization failed. Log a warning to inform the user that + they're flying solo. + +## Running the tests + +Given the low-level nature of this component, some tests need to run in +isolation, so that they don't carry over Go runtime metrics. For completeness, +this module uses a Docker image for testing, so we can simulate cgroup memory +limits. + +The test execution and docker builds have been conveniently packaged in a +Makefile. Run with: + +```shell +$ make +``` ## Why is this even needed? diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..fa0c1f8 --- /dev/null +++ b/doc.go @@ -0,0 +1,40 @@ +// Package watchdog runs a singleton memory watchdog in the process, which +// watches memory utilization and forces Go GC in accordance with a +// user-defined policy. +// +// There three kinds of watchdogs: +// +// 1. heap-driven (watchdog.HeapDriven()): applies a heap limit, adjusting GOGC +// dynamically in accordance with the policy. +// 2. system-driven (watchdog.SystemDriven()): applies a limit to the total +// system memory used, obtaining the current usage through elastic/go-sigar. +// 3. cgroups-driven (watchdog.CgroupDriven()): discovers the memory limit from +// the cgroup of the process (derived from /proc/self/cgroup), or from the +// root cgroup path if the PID == 1 (which indicates that the process is +// running in a container). It uses the cgroup stats to obtain the +// current usage. +// +// The watchdog's behaviour is controlled by the policy, a pluggable function +// that determines when to trigger GC based on the current utilization. This +// library ships with two policies: +// +// 1. watermarks policy (watchdog.NewWatermarkPolicy()): runs GC at configured +// watermarks of memory utilisation. +// 2. adaptive policy (watchdog.NewAdaptivePolicy()): runs GC when the current +// usage surpasses a dynamically-set threshold. +// +// You can easily write a custom policy tailored to the allocation patterns of +// your program. +// +// Recommended way to set up the watchdog +// +// The recommended way to set up the watchdog is as follows, in descending order +// of precedence. This logic assumes that the library supports setting a heap +// limit through an environment variable (e.g. MYAPP_HEAP_MAX) or config key. +// +// 1. If heap limit is set and legal, initialize a heap-driven watchdog. +// 2. Otherwise, try to use the cgroup-driven watchdog. If it succeeds, return. +// 3. Otherwise, try to initialize a system-driven watchdog. If it succeeds, return. +// 4. Watchdog initialization failed. Log a warning to inform the user that +// they're flying solo. +package watchdog diff --git a/go.mod b/go.mod index e2c1eaf..b133a7c 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327 github.com/elastic/gosigar v0.12.0 github.com/kr/pretty v0.1.0 // indirect + github.com/opencontainers/runtime-spec v1.0.2 github.com/raulk/clock v1.1.0 github.com/stretchr/testify v1.4.0 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect diff --git a/sys_linux.go b/sys_linux.go deleted file mode 100644 index aafc942..0000000 --- a/sys_linux.go +++ /dev/null @@ -1,26 +0,0 @@ -package watchdog - -import ( - "os" - - "github.com/containerd/cgroups" -) - -func ProcessMemoryLimit() uint64 { - var ( - pid = os.Getpid() - memSubsystem = cgroups.SingleSubsystem(cgroups.V1, cgroups.Memory) - ) - cgroup, err := cgroups.Load(memSubsystem, cgroups.PidPath(pid)) - if err != nil { - return 0 - } - metrics, err := cgroup.Stat() - if err != nil { - return 0 - } - if metrics.Memory == nil { - return 0 - } - return metrics.Memory.HierarchicalMemoryLimit -} diff --git a/sys_other.go b/sys_other.go deleted file mode 100644 index fb469f2..0000000 --- a/sys_other.go +++ /dev/null @@ -1,7 +0,0 @@ -// +build !linux - -package watchdog - -func ProcessMemoryLimit() uint64 { - return 0 -} diff --git a/watchdog.go b/watchdog.go index 466161f..ab4e504 100644 --- a/watchdog.go +++ b/watchdog.go @@ -1,6 +1,7 @@ package watchdog import ( + "errors" "fmt" "log" "math" @@ -13,6 +14,10 @@ import ( "github.com/raulk/clock" ) +// ErrNotSupported is returned when the watchdog does not support the requested +// run mode in the current OS/arch. +var ErrNotSupported = errors.New("watchdog run mode not supported") + // PolicyTempDisabled is a marker value for policies to signal that the policy // is temporarily disabled. Use it when all hope is lost to turn around from // significant memory pressure (such as when above an "extreme" watermark). @@ -28,9 +33,8 @@ var ( // Clock can be used to inject a mock clock for testing. Clock = clock.New() - // NotifyFired, if non-nil, will be called when the policy has fired, - // prior to calling GC, even if GC is disabled. - NotifyFired func() = func() {} + // NotifyGC, if non-nil, will be called when a GC has happened. + NotifyGC func() = func() {} ) var ( @@ -104,6 +108,8 @@ const ( // UtilizationSystem specifies that the policy compares against actual used // system memory. UtilizationSystem UtilizationType = iota + // UtilizationProcess specifies that the watchdog is using process limits. + UtilizationProcess // UtilizationHeap specifies that the policy compares against heap used. UtilizationHeap ) @@ -126,13 +132,6 @@ type Policy interface { // // A zero-valued limit will error. func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) { - _watchdog.lk.Lock() - defer _watchdog.lk.Unlock() - - if _watchdog.state != stateUnstarted { - return ErrAlreadyStarted, nil - } - if limit == 0 { return fmt.Errorf("cannot use zero limit for heap-driven watchdog"), nil } @@ -142,9 +141,9 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil } - _watchdog.state = stateRunning - _watchdog.scope = UtilizationHeap - _watchdog.closing = make(chan struct{}) + if err := start(UtilizationHeap); err != nil { + return err, nil + } gcTriggered := make(chan struct{}, 16) setupGCSentinel(gcTriggered) @@ -163,7 +162,7 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) for { select { case <-gcTriggered: - NotifyFired() + NotifyGC() case <-_watchdog.closing: return @@ -218,18 +217,12 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) // This threshold is calculated by querying the policy every time that GC runs, // either triggered by the runtime, or forced by us. func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) { - _watchdog.lk.Lock() - defer _watchdog.lk.Unlock() - - if _watchdog.state != stateUnstarted { - return ErrAlreadyStarted, nil - } - if limit == 0 { - limit, err = determineLimit(false) - if err != nil { - return err, nil + var sysmem gosigar.Mem + if err := sysmemFn(&sysmem); err != nil { + return fmt.Errorf("failed to get system memory stats: %w", err), nil } + limit = sysmem.Total } policy, err := policyCtor(limit) @@ -237,97 +230,92 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil } - _watchdog.state = stateRunning - _watchdog.scope = UtilizationSystem - _watchdog.closing = make(chan struct{}) - - gcTriggered := make(chan struct{}, 16) - setupGCSentinel(gcTriggered) + if err := start(UtilizationSystem); err != nil { + return err, nil + } _watchdog.wg.Add(1) - go func() { - defer _watchdog.wg.Done() - - var ( - memstats runtime.MemStats - sysmem gosigar.Mem - threshold uint64 - ) - - renewThreshold := func() { - // get the current usage. - if err := sysmemFn(&sysmem); err != nil { - Logger.Warnf("failed to obtain system memory stats; err: %s", err) - return - } - // calculate the threshold. - threshold = policy.Evaluate(UtilizationSystem, sysmem.ActualUsed) + var sysmem gosigar.Mem + go pollingWatchdog(policy, frequency, func() (uint64, error) { + if err := sysmemFn(&sysmem); err != nil { + return 0, err } - - // initialize the threshold. - renewThreshold() - - // initialize an empty timer. - timer := Clock.Timer(0) - stopTimer := func() { - if !timer.Stop() { - <-timer.C - } - } - - for { - timer.Reset(frequency) - - select { - case <-timer.C: - // get the current usage. - if err := sysmemFn(&sysmem); err != nil { - Logger.Warnf("failed to obtain system memory stats; err: %s", err) - continue - } - actual := sysmem.ActualUsed - if actual < threshold { - // nothing to do. - continue - } - // trigger GC; this will emit a gcTriggered event which we'll - // consume next to readjust the threshold. - Logger.Warnf("system-driven watchdog triggering GC; %d/%d bytes (used/threshold)", actual, threshold) - forceGC(&memstats) - - case <-gcTriggered: - NotifyFired() - - renewThreshold() - - stopTimer() - - case <-_watchdog.closing: - stopTimer() - return - } - } - }() + return sysmem.ActualUsed, nil + }) return nil, stop } -func determineLimit(restrictByProcess bool) (uint64, error) { - // TODO. - // if restrictByProcess { - // if pmem := ProcessMemoryLimit(); pmem > 0 { - // Logger.Infof("watchdog using process limit: %d bytes", pmem) - // return pmem, nil - // } - // Logger.Infof("watchdog was unable to determine process limit; falling back to total system memory") - // } +// pollingWatchdog starts a polling watchdog with the provided policy, using +// the supplied polling frequency. On every tick, it calls usageFn and, if the +// usage is greater or equal to the threshold at the time, it forces GC. +// usageFn is guaranteed to be called serially, so no locking should be +// necessary. +func pollingWatchdog(policy Policy, frequency time.Duration, usageFn func() (uint64, error)) { + defer _watchdog.wg.Done() - // populate initial utilisation and system stats. - var sysmem gosigar.Mem - if err := sysmemFn(&sysmem); err != nil { - return 0, fmt.Errorf("failed to get system memory stats: %w", err) + gcTriggered := make(chan struct{}, 16) + setupGCSentinel(gcTriggered) + + var ( + memstats runtime.MemStats + threshold uint64 + ) + + renewThreshold := func() { + // get the current usage. + usage, err := usageFn() + if err != nil { + Logger.Warnf("failed to obtain memory utilization stats; err: %s", err) + return + } + // calculate the threshold. + threshold = policy.Evaluate(_watchdog.scope, usage) + } + + // initialize the threshold. + renewThreshold() + + // initialize an empty timer. + timer := Clock.Timer(0) + stopTimer := func() { + if !timer.Stop() { + <-timer.C + } + } + + for { + timer.Reset(frequency) + + select { + case <-timer.C: + // get the current usage. + usage, err := usageFn() + if err != nil { + Logger.Warnf("failed to obtain memory utilizationstats; err: %s", err) + continue + } + if usage < threshold { + // nothing to do. + continue + } + // trigger GC; this will emit a gcTriggered event which we'll + // consume next to readjust the threshold. + Logger.Warnf("system-driven watchdog triggering GC; %d/%d bytes (used/threshold)", usage, threshold) + forceGC(&memstats) + + case <-gcTriggered: + NotifyGC() + + renewThreshold() + + stopTimer() + + case <-_watchdog.closing: + stopTimer() + return + } } - return sysmem.Total, nil } // forceGC forces a manual GC. @@ -379,6 +367,20 @@ func setupGCSentinel(gcTriggered chan struct{}) { runtime.SetFinalizer(&sentinel{}, finalizer) // start the flywheel. } +func start(scope UtilizationType) error { + _watchdog.lk.Lock() + defer _watchdog.lk.Unlock() + + if _watchdog.state != stateUnstarted { + return ErrAlreadyStarted + } + + _watchdog.state = stateRunning + _watchdog.scope = scope + _watchdog.closing = make(chan struct{}) + return nil +} + func stop() { _watchdog.lk.Lock() defer _watchdog.lk.Unlock() diff --git a/watchdog_linux.go b/watchdog_linux.go new file mode 100644 index 0000000..a50078a --- /dev/null +++ b/watchdog_linux.go @@ -0,0 +1,73 @@ +package watchdog + +import ( + "fmt" + "os" + "time" + + "github.com/containerd/cgroups" +) + +var ( + pid = os.Getpid() + memSubsystem = cgroups.SingleSubsystem(cgroups.V1, cgroups.Memory) +) + +// CgroupDriven initializes a cgroups-driven watchdog. It will try to discover +// the memory limit from the cgroup of the process (derived from /proc/self/cgroup), +// or from the root cgroup path if the PID == 1 (which indicates that the process +// is running in a container). +// +// Memory usage is calculated by querying the cgroup stats. +// +// This function will return an error immediately if the OS does not support cgroups, +// or if another error occurs during initialization. The caller can then safely fall +// back to the system driven watchdog. +func CgroupDriven(frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) { + // use self path unless our PID is 1, in which case we're running inside + // a container and our limits are in the root path. + path := cgroups.NestedPath("") + if pid := os.Getpid(); pid == 1 { + path = cgroups.RootPath + } + + cgroup, err := cgroups.Load(memSubsystem, path) + if err != nil { + return fmt.Errorf("failed to load cgroup for process: %w", err), nil + } + + var limit uint64 + if stat, err := cgroup.Stat(); err != nil { + return fmt.Errorf("failed to load memory cgroup stats: %w", err), nil + } else if stat.Memory == nil || stat.Memory.Usage == nil { + return fmt.Errorf("cgroup memory stats are nil; aborting"), nil + } else { + limit = stat.Memory.Usage.Limit + } + + if limit == 0 { + return fmt.Errorf("cgroup limit is 0; refusing to start memory watchdog"), nil + } + + policy, err := policyCtor(limit) + if err != nil { + return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil + } + + if err := start(UtilizationProcess); err != nil { + return err, nil + } + + _watchdog.wg.Add(1) + go pollingWatchdog(policy, frequency, func() (uint64, error) { + stat, err := cgroup.Stat() + if err != nil { + return 0, err + } else if stat.Memory == nil || stat.Memory.Usage == nil { + return 0, fmt.Errorf("cgroup memory stats are nil; aborting") + } + return stat.Memory.Usage.Usage, nil + }) + + return nil, stop +} diff --git a/watchdog_linux_test.go b/watchdog_linux_test.go new file mode 100644 index 0000000..793f791 --- /dev/null +++ b/watchdog_linux_test.go @@ -0,0 +1,125 @@ +package watchdog + +import ( + "fmt" + "log" + "os" + "runtime" + "runtime/debug" + "testing" + "time" + + "github.com/containerd/cgroups" + "github.com/opencontainers/runtime-spec/specs-go" + "github.com/raulk/clock" + "github.com/stretchr/testify/require" +) + +// retained will hoard unreclaimable byte buffers in the heap. +var retained [][]byte + +func TestCgroupsDriven_Create_Isolated(t *testing.T) { + skipIfNotIsolated(t) + + if os.Getpid() == 1 { + // we are running in Docker and cannot create a cgroup. + t.Skipf("cannot create a cgroup while running in non-privileged docker") + } + + // new cgroup limit. + var limit = uint64(32 << 20) // 32MiB. + createMemoryCgroup(t, limit) + + testCgroupsWatchdog(t, limit) +} + +func TestCgroupsDriven_Docker_Isolated(t *testing.T) { + skipIfNotIsolated(t) + + testCgroupsWatchdog(t, uint64(DockerMemLimit)) +} + +func testCgroupsWatchdog(t *testing.T, limit uint64) { + t.Cleanup(func() { + retained = nil + }) + + runtime.GC() // first GC to clear any junk from other tests. + debug.SetGCPercent(100000000) // disable GC. + + clk := clock.NewMock() + Clock = clk + + notifyCh := make(chan struct{}, 1) + NotifyGC = func() { + notifyCh <- struct{}{} + } + + err, stopFn := CgroupDriven(5*time.Second, NewAdaptivePolicy(0.5)) + require.NoError(t, err) + defer stopFn() + + time.Sleep(200 * time.Millisecond) // give time for the watchdog to init. + + maxSlabs := limit / (1 << 20) // number of 1MiB slabs to take up the entire limit. + + // first tick; nothing should happen. + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + require.Len(t, notifyCh, 0) // no GC has taken place. + + // allocate 50% of limit in heap (to be added to other mem usage). + for i := 0; i < (int(maxSlabs))/2; i++ { + retained = append(retained, func() []byte { + b := make([]byte, 1*1024*1024) + for i := range b { + b[i] = 0xff + } + return b + }()) + } + + // second tick; used = just over 50%; will trigger GC. + clk.Add(5 * time.Second) + time.Sleep(200 * time.Millisecond) + require.NotNil(t, <-notifyCh) + + var memstats runtime.MemStats + runtime.ReadMemStats(&memstats) + require.EqualValues(t, 2, memstats.NumForcedGC) +} + +// createMemoryCgroup creates a memory cgroup to restrict the memory available +// to this test. +func createMemoryCgroup(t *testing.T, limit uint64) { + l := int64(limit) + path := cgroups.NestedPath(fmt.Sprintf("/%d", time.Now().UnixNano())) + cgroup, err := cgroups.New(cgroups.V1, path, &specs.LinuxResources{ + Memory: &specs.LinuxMemory{ + Limit: &l, + Swap: &l, + }, + }) + + require.NoError(t, err, "failed to create a cgroup") + t.Cleanup(func() { + root, err := cgroups.Load(cgroups.V1, cgroups.RootPath) + if err != nil { + t.Logf("failed to resolve root cgroup: %s", err) + return + } + if err = root.Add(cgroups.Process{Pid: pid}); err != nil { + t.Logf("failed to move process to root cgroup: %s", err) + return + } + if err = cgroup.Delete(); err != nil { + t.Logf("failed to clean up temp cgroup: %s", err) + } + }) + + log.Printf("cgroup created") + + // add process to cgroup. + err = cgroup.Add(cgroups.Process{Pid: pid}) + require.NoError(t, err) +} diff --git a/watchdog_other.go b/watchdog_other.go new file mode 100644 index 0000000..7a9349f --- /dev/null +++ b/watchdog_other.go @@ -0,0 +1,13 @@ +// +build !linux + +package watchdog + +import ( + "fmt" + "time" +) + +// CgroupDriven is only available in Linux. This method will error. +func CgroupDriven(frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) { + return fmt.Errorf("cgroups-driven watchdog: %w", ErrNotSupported), nil +} diff --git a/watchdog_other_test.go b/watchdog_other_test.go new file mode 100644 index 0000000..119961f --- /dev/null +++ b/watchdog_other_test.go @@ -0,0 +1,15 @@ +// +build !linux + +package watchdog + +import "testing" + +func TestCgroupsDriven_Create_Isolated(t *testing.T) { + // this test only runs on linux. + t.Skip("test only valid on linux") +} + +func TestCgroupsDriven_Docker_Isolated(t *testing.T) { + // this test only runs on linux. + t.Skip("test only valid on linux") +} diff --git a/watchdog_test.go b/watchdog_test.go index 67bb6e3..2455786 100644 --- a/watchdog_test.go +++ b/watchdog_test.go @@ -6,6 +6,7 @@ import ( "os" "runtime" "runtime/debug" + "strconv" "testing" "time" @@ -14,29 +15,54 @@ import ( "github.com/stretchr/testify/require" ) -// These integration tests are a hugely non-deterministic, but necessary to get -// good coverage and confidence. The Go runtime makes its own pacing decisions, -// and those may vary based on machine, OS, kernel memory management, other -// running programs, exogenous memory pressure, and Go runtime versions. -// -// The assertions we use here are lax, but should be sufficient to serve as a -// reasonable litmus test of whether the watchdog is doing what it's supposed -// to or not. +const ( + // EnvTestIsolated is a marker property for the runner to confirm that this + // test is running in isolation (i.e. a dedicated process). + EnvTestIsolated = "TEST_ISOLATED" + + // EnvTestDockerMemLimit is the memory limit applied in a docker container. + EnvTestDockerMemLimit = "TEST_DOCKER_MEMLIMIT" +) + +// DockerMemLimit is initialized in the init() function from the +// EnvTestDockerMemLimit env variable. +var DockerMemLimit int // bytes + +func init() { + Logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true} + + if l := os.Getenv(EnvTestDockerMemLimit); l != "" { + l, err := strconv.Atoi(l) + if err != nil { + panic(err) + } + DockerMemLimit = l + } +} + +func skipIfNotIsolated(t *testing.T) { + if os.Getenv(EnvTestIsolated) != "1" { + t.Skipf("skipping test in non-isolated mode") + } +} var ( limit uint64 = 64 << 20 // 64MiB. ) -func init() { - Logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true} -} +func TestControl_Isolated(t *testing.T) { + skipIfNotIsolated(t) -func TestControl(t *testing.T) { debug.SetGCPercent(100) - // retain 1MiB every iteration, up to 100MiB (beyond heap limit!). + rounds := 100 + if DockerMemLimit != 0 { + rounds /= int(float64(DockerMemLimit)*0.8) / 1024 / 1024 + } + + // retain 1MiB every iteration. var retained [][]byte - for i := 0; i < 100; i++ { + for i := 0; i < rounds; i++ { b := make([]byte, 1*1024*1024) for i := range b { b[i] = byte(i) @@ -52,11 +78,13 @@ func TestControl(t *testing.T) { var ms runtime.MemStats runtime.ReadMemStats(&ms) - require.LessOrEqual(t, ms.NumGC, uint32(5)) // a maximum of 8 GCs should've happened. - require.Zero(t, ms.NumForcedGC) // no forced GCs. + require.NotZero(t, ms.NumGC) // GCs have taken place, but... + require.Zero(t, ms.NumForcedGC) // ... no forced GCs beyond our initial one. } -func TestHeapDriven(t *testing.T) { +func TestHeapDriven_Isolated(t *testing.T) { + skipIfNotIsolated(t) + // we can't mock ReadMemStats, because we're relying on the go runtime to // enforce the GC run, and the go runtime won't use our mock. Therefore, we // need to do the actual thing. @@ -66,7 +94,7 @@ func TestHeapDriven(t *testing.T) { Clock = clk observations := make([]*runtime.MemStats, 0, 100) - NotifyFired = func() { + NotifyGC = func() { var ms runtime.MemStats runtime.ReadMemStats(&ms) observations = append(observations, &ms) @@ -94,7 +122,9 @@ func TestHeapDriven(t *testing.T) { require.GreaterOrEqual(t, ms.NumGC, uint32(9)) // over 9 GCs should've taken place. } -func TestSystemDriven(t *testing.T) { +func TestSystemDriven_Isolated(t *testing.T) { + skipIfNotIsolated(t) + debug.SetGCPercent(100) clk := clock.NewMock() @@ -115,7 +145,7 @@ func TestSystemDriven(t *testing.T) { time.Sleep(200 * time.Millisecond) // give time for the watchdog to init. notifyCh := make(chan struct{}, 1) - NotifyFired = func() { + NotifyGC = func() { notifyCh <- struct{}{} }