introduce cgroup-driven watchdog; refactor.
This commit introduces the cgroup-driven watchdog. It can be initialized by calling watchdog.CgroupDriven(). This watchdog infers the limit from the process' cgroup, which is either derived from /proc/self/cgroup, or from the root cgroup if the PID == 1 (running in a container). Tests have been added/refactored to accommodate running locally and in a Docker container. Certain test cases now must be isolated from one another, to prevent side-effects from dirty go runtimes. A Makefile has been introduced to run all tests.
This commit is contained in:
parent
903e001223
commit
8676adea5c
|
@ -8,6 +8,4 @@ jobs:
|
||||||
working_directory: /go/src/github.com/{{ORG_NAME}}/{{REPO_NAME}}
|
working_directory: /go/src/github.com/{{ORG_NAME}}/{{REPO_NAME}}
|
||||||
steps:
|
steps:
|
||||||
- checkout
|
- checkout
|
||||||
|
- run: make
|
||||||
- run: go get -v -t -d ./...
|
|
||||||
- run: go test -v ./...
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Makefile
|
|
@ -0,0 +1,22 @@
|
||||||
|
## This Dockerfile compiles the watchdog with delve support. It enables the tests
|
||||||
|
## to be debugged inside a container.
|
||||||
|
##
|
||||||
|
## Run with:
|
||||||
|
## docker run --memory=64MiB --memory-swap=64MiB -p 2345:2345 <image> \
|
||||||
|
## --listen=:2345 --headless=true --log=true \
|
||||||
|
## --log-output=debugger,debuglineerr,gdbwire,lldbout,rpc \
|
||||||
|
## --accept-multiclient --api-version=2 exec /root/watchdog.test
|
||||||
|
##
|
||||||
|
FROM golang:1.15.5
|
||||||
|
WORKDIR /watchdog
|
||||||
|
COPY . .
|
||||||
|
RUN CGO_ENABLED=0 go get -ldflags "-s -w -extldflags '-static'" github.com/go-delve/delve/cmd/dlv
|
||||||
|
RUN CGO_ENABLED=0 go test -gcflags "all=-N -l" -c -o ./watchdog.test
|
||||||
|
|
||||||
|
FROM alpine:latest
|
||||||
|
RUN apk --no-cache add ca-certificates
|
||||||
|
WORKDIR /root/
|
||||||
|
COPY --from=0 /go/bin/dlv /dlv
|
||||||
|
COPY --from=0 /watchdog/watchdog.test .
|
||||||
|
ENTRYPOINT [ "/dlv" ]
|
||||||
|
EXPOSE 2345
|
|
@ -0,0 +1,10 @@
|
||||||
|
FROM golang:1.15.5
|
||||||
|
WORKDIR /watchdog
|
||||||
|
COPY . .
|
||||||
|
RUN CGO_ENABLED=0 GOOS=linux go test -c -o watchdog.test
|
||||||
|
|
||||||
|
FROM alpine:latest
|
||||||
|
RUN apk --no-cache add ca-certificates
|
||||||
|
WORKDIR /root/
|
||||||
|
COPY --from=0 /watchdog/watchdog.test .
|
||||||
|
CMD ["/root/watchdog.test", "-test.v"]
|
|
@ -0,0 +1,30 @@
|
||||||
|
SHELL = /bin/bash
|
||||||
|
|
||||||
|
.PHONY: test
|
||||||
|
|
||||||
|
# these tests run in isolation by calling go test -run=... or the equivalent.
|
||||||
|
ISOLATED_TESTS = TestControl_Isolated \
|
||||||
|
TestSystemDriven_Isolated \
|
||||||
|
TestHeapDriven_Isolated \
|
||||||
|
TestCgroupsDriven_Create_Isolated \
|
||||||
|
TestCgroupsDriven_Docker_Isolated
|
||||||
|
|
||||||
|
test: test-binary test-docker
|
||||||
|
|
||||||
|
test-binary:
|
||||||
|
go test -v ./... # run all the non-isolated tests.
|
||||||
|
# foreach does not actually execute each iteration; it expands the text, and it's executed all at once
|
||||||
|
# that's why we use && true, to shorcircuit if a test fails.
|
||||||
|
$(foreach name,$(ISOLATED_TESTS),TEST_ISOLATED=1 go test -v -test.run=$(name) ./... && ) true
|
||||||
|
|
||||||
|
test-docker: docker
|
||||||
|
docker run --memory=32MiB --memory-swap=32MiB -e TEST_DOCKER_MEMLIMIT=33554432 raulk/watchdog:latest
|
||||||
|
$(foreach name,$(ISOLATED_TESTS),docker run \
|
||||||
|
--memory=32MiB --memory-swap=32MiB \
|
||||||
|
-e TEST_ISOLATED=1 \
|
||||||
|
-e TEST_DOCKER_MEMLIMIT=33554432 \
|
||||||
|
raulk/watchdog:latest /root/watchdog.test -test.v -test.run=$(name) ./... && ) true
|
||||||
|
|
||||||
|
docker:
|
||||||
|
docker build -f ./Dockerfile.test -t raulk/watchdog:latest .
|
||||||
|
|
62
README.md
62
README.md
|
@ -5,27 +5,59 @@
|
||||||
[![godocs](https://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square)](https://godoc.org/github.com/raulk/go-watchdog)
|
[![godocs](https://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square)](https://godoc.org/github.com/raulk/go-watchdog)
|
||||||
[![build status](https://circleci.com/gh/raulk/go-watchdog.svg?style=svg)](https://circleci.com/gh/raulk/go-watchdog)
|
[![build status](https://circleci.com/gh/raulk/go-watchdog.svg?style=svg)](https://circleci.com/gh/raulk/go-watchdog)
|
||||||
|
|
||||||
go-watchdog runs a singleton memory watchdog in the process, which watches
|
Package watchdog runs a singleton memory watchdog in the process, which
|
||||||
memory utilization and forces Go GC in accordance with a user-defined policy.
|
watches memory utilization and forces Go GC in accordance with a
|
||||||
|
user-defined policy.
|
||||||
|
|
||||||
There are two kinds of watchdog so far:
|
There three kinds of watchdogs:
|
||||||
|
|
||||||
* **heap-driven:** applies a limit to the heap, and obtains current usage through
|
1. heap-driven (`watchdog.HeapDriven()`): applies a heap limit, adjusting GOGC
|
||||||
`runtime.ReadMemStats()`.
|
dynamically in accordance with the policy.
|
||||||
* **system-driven:** applies a limit to the total system memory used, and obtains
|
2. system-driven (`watchdog.SystemDriven()`): applies a limit to the total
|
||||||
current usage through [`elastic/go-sigar`](https://github.com/elastic/gosigar).
|
system memory used, obtaining the current usage through elastic/go-sigar.
|
||||||
|
3. cgroups-driven (`watchdog.CgroupDriven()`): discovers the memory limit from
|
||||||
|
the cgroup of the process (derived from /proc/self/cgroup), or from the
|
||||||
|
root cgroup path if the PID == 1 (which indicates that the process is
|
||||||
|
running in a container). It uses the cgroup stats to obtain the
|
||||||
|
current usage.
|
||||||
|
|
||||||
A third process-driven watchdog that uses cgroups is underway.
|
The watchdog's behaviour is controlled by the policy, a pluggable function
|
||||||
|
that determines when to trigger GC based on the current utilization. This
|
||||||
|
library ships with two policies:
|
||||||
|
|
||||||
This library ships with two policies out of the box:
|
1. watermarks policy (`watchdog.NewWatermarkPolicy()`): runs GC at configured
|
||||||
|
watermarks of memory utilisation.
|
||||||
|
2. adaptive policy (`watchdog.NewAdaptivePolicy()`): runs GC when the current
|
||||||
|
usage surpasses a dynamically-set threshold.
|
||||||
|
|
||||||
* watermarks policy: runs GC at configured watermarks of system or heap memory
|
You can easily write a custom policy tailored to the allocation patterns of
|
||||||
utilisation.
|
your program.
|
||||||
* adaptive policy: runs GC when the current usage surpasses a dynamically-set
|
|
||||||
threshold.
|
|
||||||
|
|
||||||
You can easily build a custom policy tailored to the allocation patterns of your
|
## Recommended way to set up the watchdog
|
||||||
program.
|
|
||||||
|
The recommended way to set up the watchdog is as follows, in descending order
|
||||||
|
of precedence. This logic assumes that the library supports setting a heap
|
||||||
|
limit through an environment variable (e.g. MYAPP_HEAP_MAX) or config key.
|
||||||
|
|
||||||
|
1. If heap limit is set and legal, initialize a heap-driven watchdog.
|
||||||
|
2. Otherwise, try to use the cgroup-driven watchdog. If it succeeds, return.
|
||||||
|
3. Otherwise, try to initialize a system-driven watchdog. If it succeeds, return.
|
||||||
|
4. Watchdog initialization failed. Log a warning to inform the user that
|
||||||
|
they're flying solo.
|
||||||
|
|
||||||
|
## Running the tests
|
||||||
|
|
||||||
|
Given the low-level nature of this component, some tests need to run in
|
||||||
|
isolation, so that they don't carry over Go runtime metrics. For completeness,
|
||||||
|
this module uses a Docker image for testing, so we can simulate cgroup memory
|
||||||
|
limits.
|
||||||
|
|
||||||
|
The test execution and docker builds have been conveniently packaged in a
|
||||||
|
Makefile. Run with:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
$ make
|
||||||
|
```
|
||||||
|
|
||||||
## Why is this even needed?
|
## Why is this even needed?
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
// Package watchdog runs a singleton memory watchdog in the process, which
|
||||||
|
// watches memory utilization and forces Go GC in accordance with a
|
||||||
|
// user-defined policy.
|
||||||
|
//
|
||||||
|
// There three kinds of watchdogs:
|
||||||
|
//
|
||||||
|
// 1. heap-driven (watchdog.HeapDriven()): applies a heap limit, adjusting GOGC
|
||||||
|
// dynamically in accordance with the policy.
|
||||||
|
// 2. system-driven (watchdog.SystemDriven()): applies a limit to the total
|
||||||
|
// system memory used, obtaining the current usage through elastic/go-sigar.
|
||||||
|
// 3. cgroups-driven (watchdog.CgroupDriven()): discovers the memory limit from
|
||||||
|
// the cgroup of the process (derived from /proc/self/cgroup), or from the
|
||||||
|
// root cgroup path if the PID == 1 (which indicates that the process is
|
||||||
|
// running in a container). It uses the cgroup stats to obtain the
|
||||||
|
// current usage.
|
||||||
|
//
|
||||||
|
// The watchdog's behaviour is controlled by the policy, a pluggable function
|
||||||
|
// that determines when to trigger GC based on the current utilization. This
|
||||||
|
// library ships with two policies:
|
||||||
|
//
|
||||||
|
// 1. watermarks policy (watchdog.NewWatermarkPolicy()): runs GC at configured
|
||||||
|
// watermarks of memory utilisation.
|
||||||
|
// 2. adaptive policy (watchdog.NewAdaptivePolicy()): runs GC when the current
|
||||||
|
// usage surpasses a dynamically-set threshold.
|
||||||
|
//
|
||||||
|
// You can easily write a custom policy tailored to the allocation patterns of
|
||||||
|
// your program.
|
||||||
|
//
|
||||||
|
// Recommended way to set up the watchdog
|
||||||
|
//
|
||||||
|
// The recommended way to set up the watchdog is as follows, in descending order
|
||||||
|
// of precedence. This logic assumes that the library supports setting a heap
|
||||||
|
// limit through an environment variable (e.g. MYAPP_HEAP_MAX) or config key.
|
||||||
|
//
|
||||||
|
// 1. If heap limit is set and legal, initialize a heap-driven watchdog.
|
||||||
|
// 2. Otherwise, try to use the cgroup-driven watchdog. If it succeeds, return.
|
||||||
|
// 3. Otherwise, try to initialize a system-driven watchdog. If it succeeds, return.
|
||||||
|
// 4. Watchdog initialization failed. Log a warning to inform the user that
|
||||||
|
// they're flying solo.
|
||||||
|
package watchdog
|
1
go.mod
1
go.mod
|
@ -6,6 +6,7 @@ require (
|
||||||
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327
|
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327
|
||||||
github.com/elastic/gosigar v0.12.0
|
github.com/elastic/gosigar v0.12.0
|
||||||
github.com/kr/pretty v0.1.0 // indirect
|
github.com/kr/pretty v0.1.0 // indirect
|
||||||
|
github.com/opencontainers/runtime-spec v1.0.2
|
||||||
github.com/raulk/clock v1.1.0
|
github.com/raulk/clock v1.1.0
|
||||||
github.com/stretchr/testify v1.4.0
|
github.com/stretchr/testify v1.4.0
|
||||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
|
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
|
||||||
|
|
26
sys_linux.go
26
sys_linux.go
|
@ -1,26 +0,0 @@
|
||||||
package watchdog
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/containerd/cgroups"
|
|
||||||
)
|
|
||||||
|
|
||||||
func ProcessMemoryLimit() uint64 {
|
|
||||||
var (
|
|
||||||
pid = os.Getpid()
|
|
||||||
memSubsystem = cgroups.SingleSubsystem(cgroups.V1, cgroups.Memory)
|
|
||||||
)
|
|
||||||
cgroup, err := cgroups.Load(memSubsystem, cgroups.PidPath(pid))
|
|
||||||
if err != nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
metrics, err := cgroup.Stat()
|
|
||||||
if err != nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
if metrics.Memory == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return metrics.Memory.HierarchicalMemoryLimit
|
|
||||||
}
|
|
|
@ -1,7 +0,0 @@
|
||||||
// +build !linux
|
|
||||||
|
|
||||||
package watchdog
|
|
||||||
|
|
||||||
func ProcessMemoryLimit() uint64 {
|
|
||||||
return 0
|
|
||||||
}
|
|
214
watchdog.go
214
watchdog.go
|
@ -1,6 +1,7 @@
|
||||||
package watchdog
|
package watchdog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
|
@ -13,6 +14,10 @@ import (
|
||||||
"github.com/raulk/clock"
|
"github.com/raulk/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ErrNotSupported is returned when the watchdog does not support the requested
|
||||||
|
// run mode in the current OS/arch.
|
||||||
|
var ErrNotSupported = errors.New("watchdog run mode not supported")
|
||||||
|
|
||||||
// PolicyTempDisabled is a marker value for policies to signal that the policy
|
// PolicyTempDisabled is a marker value for policies to signal that the policy
|
||||||
// is temporarily disabled. Use it when all hope is lost to turn around from
|
// is temporarily disabled. Use it when all hope is lost to turn around from
|
||||||
// significant memory pressure (such as when above an "extreme" watermark).
|
// significant memory pressure (such as when above an "extreme" watermark).
|
||||||
|
@ -28,9 +33,8 @@ var (
|
||||||
// Clock can be used to inject a mock clock for testing.
|
// Clock can be used to inject a mock clock for testing.
|
||||||
Clock = clock.New()
|
Clock = clock.New()
|
||||||
|
|
||||||
// NotifyFired, if non-nil, will be called when the policy has fired,
|
// NotifyGC, if non-nil, will be called when a GC has happened.
|
||||||
// prior to calling GC, even if GC is disabled.
|
NotifyGC func() = func() {}
|
||||||
NotifyFired func() = func() {}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -104,6 +108,8 @@ const (
|
||||||
// UtilizationSystem specifies that the policy compares against actual used
|
// UtilizationSystem specifies that the policy compares against actual used
|
||||||
// system memory.
|
// system memory.
|
||||||
UtilizationSystem UtilizationType = iota
|
UtilizationSystem UtilizationType = iota
|
||||||
|
// UtilizationProcess specifies that the watchdog is using process limits.
|
||||||
|
UtilizationProcess
|
||||||
// UtilizationHeap specifies that the policy compares against heap used.
|
// UtilizationHeap specifies that the policy compares against heap used.
|
||||||
UtilizationHeap
|
UtilizationHeap
|
||||||
)
|
)
|
||||||
|
@ -126,13 +132,6 @@ type Policy interface {
|
||||||
//
|
//
|
||||||
// A zero-valued limit will error.
|
// A zero-valued limit will error.
|
||||||
func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) {
|
func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) {
|
||||||
_watchdog.lk.Lock()
|
|
||||||
defer _watchdog.lk.Unlock()
|
|
||||||
|
|
||||||
if _watchdog.state != stateUnstarted {
|
|
||||||
return ErrAlreadyStarted, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if limit == 0 {
|
if limit == 0 {
|
||||||
return fmt.Errorf("cannot use zero limit for heap-driven watchdog"), nil
|
return fmt.Errorf("cannot use zero limit for heap-driven watchdog"), nil
|
||||||
}
|
}
|
||||||
|
@ -142,9 +141,9 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
|
||||||
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
|
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_watchdog.state = stateRunning
|
if err := start(UtilizationHeap); err != nil {
|
||||||
_watchdog.scope = UtilizationHeap
|
return err, nil
|
||||||
_watchdog.closing = make(chan struct{})
|
}
|
||||||
|
|
||||||
gcTriggered := make(chan struct{}, 16)
|
gcTriggered := make(chan struct{}, 16)
|
||||||
setupGCSentinel(gcTriggered)
|
setupGCSentinel(gcTriggered)
|
||||||
|
@ -163,7 +162,7 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-gcTriggered:
|
case <-gcTriggered:
|
||||||
NotifyFired()
|
NotifyGC()
|
||||||
|
|
||||||
case <-_watchdog.closing:
|
case <-_watchdog.closing:
|
||||||
return
|
return
|
||||||
|
@ -218,18 +217,12 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
|
||||||
// This threshold is calculated by querying the policy every time that GC runs,
|
// This threshold is calculated by querying the policy every time that GC runs,
|
||||||
// either triggered by the runtime, or forced by us.
|
// either triggered by the runtime, or forced by us.
|
||||||
func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) {
|
func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) {
|
||||||
_watchdog.lk.Lock()
|
|
||||||
defer _watchdog.lk.Unlock()
|
|
||||||
|
|
||||||
if _watchdog.state != stateUnstarted {
|
|
||||||
return ErrAlreadyStarted, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if limit == 0 {
|
if limit == 0 {
|
||||||
limit, err = determineLimit(false)
|
var sysmem gosigar.Mem
|
||||||
if err != nil {
|
if err := sysmemFn(&sysmem); err != nil {
|
||||||
return err, nil
|
return fmt.Errorf("failed to get system memory stats: %w", err), nil
|
||||||
}
|
}
|
||||||
|
limit = sysmem.Total
|
||||||
}
|
}
|
||||||
|
|
||||||
policy, err := policyCtor(limit)
|
policy, err := policyCtor(limit)
|
||||||
|
@ -237,97 +230,92 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor)
|
||||||
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
|
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_watchdog.state = stateRunning
|
if err := start(UtilizationSystem); err != nil {
|
||||||
_watchdog.scope = UtilizationSystem
|
return err, nil
|
||||||
_watchdog.closing = make(chan struct{})
|
}
|
||||||
|
|
||||||
gcTriggered := make(chan struct{}, 16)
|
|
||||||
setupGCSentinel(gcTriggered)
|
|
||||||
|
|
||||||
_watchdog.wg.Add(1)
|
_watchdog.wg.Add(1)
|
||||||
go func() {
|
var sysmem gosigar.Mem
|
||||||
defer _watchdog.wg.Done()
|
go pollingWatchdog(policy, frequency, func() (uint64, error) {
|
||||||
|
if err := sysmemFn(&sysmem); err != nil {
|
||||||
var (
|
return 0, err
|
||||||
memstats runtime.MemStats
|
|
||||||
sysmem gosigar.Mem
|
|
||||||
threshold uint64
|
|
||||||
)
|
|
||||||
|
|
||||||
renewThreshold := func() {
|
|
||||||
// get the current usage.
|
|
||||||
if err := sysmemFn(&sysmem); err != nil {
|
|
||||||
Logger.Warnf("failed to obtain system memory stats; err: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// calculate the threshold.
|
|
||||||
threshold = policy.Evaluate(UtilizationSystem, sysmem.ActualUsed)
|
|
||||||
}
|
}
|
||||||
|
return sysmem.ActualUsed, nil
|
||||||
// initialize the threshold.
|
})
|
||||||
renewThreshold()
|
|
||||||
|
|
||||||
// initialize an empty timer.
|
|
||||||
timer := Clock.Timer(0)
|
|
||||||
stopTimer := func() {
|
|
||||||
if !timer.Stop() {
|
|
||||||
<-timer.C
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
timer.Reset(frequency)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-timer.C:
|
|
||||||
// get the current usage.
|
|
||||||
if err := sysmemFn(&sysmem); err != nil {
|
|
||||||
Logger.Warnf("failed to obtain system memory stats; err: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
actual := sysmem.ActualUsed
|
|
||||||
if actual < threshold {
|
|
||||||
// nothing to do.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// trigger GC; this will emit a gcTriggered event which we'll
|
|
||||||
// consume next to readjust the threshold.
|
|
||||||
Logger.Warnf("system-driven watchdog triggering GC; %d/%d bytes (used/threshold)", actual, threshold)
|
|
||||||
forceGC(&memstats)
|
|
||||||
|
|
||||||
case <-gcTriggered:
|
|
||||||
NotifyFired()
|
|
||||||
|
|
||||||
renewThreshold()
|
|
||||||
|
|
||||||
stopTimer()
|
|
||||||
|
|
||||||
case <-_watchdog.closing:
|
|
||||||
stopTimer()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil, stop
|
return nil, stop
|
||||||
}
|
}
|
||||||
|
|
||||||
func determineLimit(restrictByProcess bool) (uint64, error) {
|
// pollingWatchdog starts a polling watchdog with the provided policy, using
|
||||||
// TODO.
|
// the supplied polling frequency. On every tick, it calls usageFn and, if the
|
||||||
// if restrictByProcess {
|
// usage is greater or equal to the threshold at the time, it forces GC.
|
||||||
// if pmem := ProcessMemoryLimit(); pmem > 0 {
|
// usageFn is guaranteed to be called serially, so no locking should be
|
||||||
// Logger.Infof("watchdog using process limit: %d bytes", pmem)
|
// necessary.
|
||||||
// return pmem, nil
|
func pollingWatchdog(policy Policy, frequency time.Duration, usageFn func() (uint64, error)) {
|
||||||
// }
|
defer _watchdog.wg.Done()
|
||||||
// Logger.Infof("watchdog was unable to determine process limit; falling back to total system memory")
|
|
||||||
// }
|
|
||||||
|
|
||||||
// populate initial utilisation and system stats.
|
gcTriggered := make(chan struct{}, 16)
|
||||||
var sysmem gosigar.Mem
|
setupGCSentinel(gcTriggered)
|
||||||
if err := sysmemFn(&sysmem); err != nil {
|
|
||||||
return 0, fmt.Errorf("failed to get system memory stats: %w", err)
|
var (
|
||||||
|
memstats runtime.MemStats
|
||||||
|
threshold uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
renewThreshold := func() {
|
||||||
|
// get the current usage.
|
||||||
|
usage, err := usageFn()
|
||||||
|
if err != nil {
|
||||||
|
Logger.Warnf("failed to obtain memory utilization stats; err: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// calculate the threshold.
|
||||||
|
threshold = policy.Evaluate(_watchdog.scope, usage)
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize the threshold.
|
||||||
|
renewThreshold()
|
||||||
|
|
||||||
|
// initialize an empty timer.
|
||||||
|
timer := Clock.Timer(0)
|
||||||
|
stopTimer := func() {
|
||||||
|
if !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
timer.Reset(frequency)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
// get the current usage.
|
||||||
|
usage, err := usageFn()
|
||||||
|
if err != nil {
|
||||||
|
Logger.Warnf("failed to obtain memory utilizationstats; err: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if usage < threshold {
|
||||||
|
// nothing to do.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// trigger GC; this will emit a gcTriggered event which we'll
|
||||||
|
// consume next to readjust the threshold.
|
||||||
|
Logger.Warnf("system-driven watchdog triggering GC; %d/%d bytes (used/threshold)", usage, threshold)
|
||||||
|
forceGC(&memstats)
|
||||||
|
|
||||||
|
case <-gcTriggered:
|
||||||
|
NotifyGC()
|
||||||
|
|
||||||
|
renewThreshold()
|
||||||
|
|
||||||
|
stopTimer()
|
||||||
|
|
||||||
|
case <-_watchdog.closing:
|
||||||
|
stopTimer()
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return sysmem.Total, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// forceGC forces a manual GC.
|
// forceGC forces a manual GC.
|
||||||
|
@ -379,6 +367,20 @@ func setupGCSentinel(gcTriggered chan struct{}) {
|
||||||
runtime.SetFinalizer(&sentinel{}, finalizer) // start the flywheel.
|
runtime.SetFinalizer(&sentinel{}, finalizer) // start the flywheel.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func start(scope UtilizationType) error {
|
||||||
|
_watchdog.lk.Lock()
|
||||||
|
defer _watchdog.lk.Unlock()
|
||||||
|
|
||||||
|
if _watchdog.state != stateUnstarted {
|
||||||
|
return ErrAlreadyStarted
|
||||||
|
}
|
||||||
|
|
||||||
|
_watchdog.state = stateRunning
|
||||||
|
_watchdog.scope = scope
|
||||||
|
_watchdog.closing = make(chan struct{})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func stop() {
|
func stop() {
|
||||||
_watchdog.lk.Lock()
|
_watchdog.lk.Lock()
|
||||||
defer _watchdog.lk.Unlock()
|
defer _watchdog.lk.Unlock()
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
package watchdog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/containerd/cgroups"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
pid = os.Getpid()
|
||||||
|
memSubsystem = cgroups.SingleSubsystem(cgroups.V1, cgroups.Memory)
|
||||||
|
)
|
||||||
|
|
||||||
|
// CgroupDriven initializes a cgroups-driven watchdog. It will try to discover
|
||||||
|
// the memory limit from the cgroup of the process (derived from /proc/self/cgroup),
|
||||||
|
// or from the root cgroup path if the PID == 1 (which indicates that the process
|
||||||
|
// is running in a container).
|
||||||
|
//
|
||||||
|
// Memory usage is calculated by querying the cgroup stats.
|
||||||
|
//
|
||||||
|
// This function will return an error immediately if the OS does not support cgroups,
|
||||||
|
// or if another error occurs during initialization. The caller can then safely fall
|
||||||
|
// back to the system driven watchdog.
|
||||||
|
func CgroupDriven(frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) {
|
||||||
|
// use self path unless our PID is 1, in which case we're running inside
|
||||||
|
// a container and our limits are in the root path.
|
||||||
|
path := cgroups.NestedPath("")
|
||||||
|
if pid := os.Getpid(); pid == 1 {
|
||||||
|
path = cgroups.RootPath
|
||||||
|
}
|
||||||
|
|
||||||
|
cgroup, err := cgroups.Load(memSubsystem, path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to load cgroup for process: %w", err), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var limit uint64
|
||||||
|
if stat, err := cgroup.Stat(); err != nil {
|
||||||
|
return fmt.Errorf("failed to load memory cgroup stats: %w", err), nil
|
||||||
|
} else if stat.Memory == nil || stat.Memory.Usage == nil {
|
||||||
|
return fmt.Errorf("cgroup memory stats are nil; aborting"), nil
|
||||||
|
} else {
|
||||||
|
limit = stat.Memory.Usage.Limit
|
||||||
|
}
|
||||||
|
|
||||||
|
if limit == 0 {
|
||||||
|
return fmt.Errorf("cgroup limit is 0; refusing to start memory watchdog"), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
policy, err := policyCtor(limit)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := start(UtilizationProcess); err != nil {
|
||||||
|
return err, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_watchdog.wg.Add(1)
|
||||||
|
go pollingWatchdog(policy, frequency, func() (uint64, error) {
|
||||||
|
stat, err := cgroup.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
} else if stat.Memory == nil || stat.Memory.Usage == nil {
|
||||||
|
return 0, fmt.Errorf("cgroup memory stats are nil; aborting")
|
||||||
|
}
|
||||||
|
return stat.Memory.Usage.Usage, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil, stop
|
||||||
|
}
|
|
@ -0,0 +1,125 @@
|
||||||
|
package watchdog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"runtime"
|
||||||
|
"runtime/debug"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/containerd/cgroups"
|
||||||
|
"github.com/opencontainers/runtime-spec/specs-go"
|
||||||
|
"github.com/raulk/clock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// retained will hoard unreclaimable byte buffers in the heap.
|
||||||
|
var retained [][]byte
|
||||||
|
|
||||||
|
func TestCgroupsDriven_Create_Isolated(t *testing.T) {
|
||||||
|
skipIfNotIsolated(t)
|
||||||
|
|
||||||
|
if os.Getpid() == 1 {
|
||||||
|
// we are running in Docker and cannot create a cgroup.
|
||||||
|
t.Skipf("cannot create a cgroup while running in non-privileged docker")
|
||||||
|
}
|
||||||
|
|
||||||
|
// new cgroup limit.
|
||||||
|
var limit = uint64(32 << 20) // 32MiB.
|
||||||
|
createMemoryCgroup(t, limit)
|
||||||
|
|
||||||
|
testCgroupsWatchdog(t, limit)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCgroupsDriven_Docker_Isolated(t *testing.T) {
|
||||||
|
skipIfNotIsolated(t)
|
||||||
|
|
||||||
|
testCgroupsWatchdog(t, uint64(DockerMemLimit))
|
||||||
|
}
|
||||||
|
|
||||||
|
func testCgroupsWatchdog(t *testing.T, limit uint64) {
|
||||||
|
t.Cleanup(func() {
|
||||||
|
retained = nil
|
||||||
|
})
|
||||||
|
|
||||||
|
runtime.GC() // first GC to clear any junk from other tests.
|
||||||
|
debug.SetGCPercent(100000000) // disable GC.
|
||||||
|
|
||||||
|
clk := clock.NewMock()
|
||||||
|
Clock = clk
|
||||||
|
|
||||||
|
notifyCh := make(chan struct{}, 1)
|
||||||
|
NotifyGC = func() {
|
||||||
|
notifyCh <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
err, stopFn := CgroupDriven(5*time.Second, NewAdaptivePolicy(0.5))
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer stopFn()
|
||||||
|
|
||||||
|
time.Sleep(200 * time.Millisecond) // give time for the watchdog to init.
|
||||||
|
|
||||||
|
maxSlabs := limit / (1 << 20) // number of 1MiB slabs to take up the entire limit.
|
||||||
|
|
||||||
|
// first tick; nothing should happen.
|
||||||
|
clk.Add(5 * time.Second)
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
require.Len(t, notifyCh, 0) // no GC has taken place.
|
||||||
|
|
||||||
|
// allocate 50% of limit in heap (to be added to other mem usage).
|
||||||
|
for i := 0; i < (int(maxSlabs))/2; i++ {
|
||||||
|
retained = append(retained, func() []byte {
|
||||||
|
b := make([]byte, 1*1024*1024)
|
||||||
|
for i := range b {
|
||||||
|
b[i] = 0xff
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}())
|
||||||
|
}
|
||||||
|
|
||||||
|
// second tick; used = just over 50%; will trigger GC.
|
||||||
|
clk.Add(5 * time.Second)
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
require.NotNil(t, <-notifyCh)
|
||||||
|
|
||||||
|
var memstats runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&memstats)
|
||||||
|
require.EqualValues(t, 2, memstats.NumForcedGC)
|
||||||
|
}
|
||||||
|
|
||||||
|
// createMemoryCgroup creates a memory cgroup to restrict the memory available
|
||||||
|
// to this test.
|
||||||
|
func createMemoryCgroup(t *testing.T, limit uint64) {
|
||||||
|
l := int64(limit)
|
||||||
|
path := cgroups.NestedPath(fmt.Sprintf("/%d", time.Now().UnixNano()))
|
||||||
|
cgroup, err := cgroups.New(cgroups.V1, path, &specs.LinuxResources{
|
||||||
|
Memory: &specs.LinuxMemory{
|
||||||
|
Limit: &l,
|
||||||
|
Swap: &l,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, err, "failed to create a cgroup")
|
||||||
|
t.Cleanup(func() {
|
||||||
|
root, err := cgroups.Load(cgroups.V1, cgroups.RootPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("failed to resolve root cgroup: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = root.Add(cgroups.Process{Pid: pid}); err != nil {
|
||||||
|
t.Logf("failed to move process to root cgroup: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = cgroup.Delete(); err != nil {
|
||||||
|
t.Logf("failed to clean up temp cgroup: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Printf("cgroup created")
|
||||||
|
|
||||||
|
// add process to cgroup.
|
||||||
|
err = cgroup.Add(cgroups.Process{Pid: pid})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
// +build !linux
|
||||||
|
|
||||||
|
package watchdog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CgroupDriven is only available in Linux. This method will error.
|
||||||
|
func CgroupDriven(frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) {
|
||||||
|
return fmt.Errorf("cgroups-driven watchdog: %w", ErrNotSupported), nil
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
// +build !linux
|
||||||
|
|
||||||
|
package watchdog
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestCgroupsDriven_Create_Isolated(t *testing.T) {
|
||||||
|
// this test only runs on linux.
|
||||||
|
t.Skip("test only valid on linux")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCgroupsDriven_Docker_Isolated(t *testing.T) {
|
||||||
|
// this test only runs on linux.
|
||||||
|
t.Skip("test only valid on linux")
|
||||||
|
}
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -14,29 +15,54 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
// These integration tests are a hugely non-deterministic, but necessary to get
|
const (
|
||||||
// good coverage and confidence. The Go runtime makes its own pacing decisions,
|
// EnvTestIsolated is a marker property for the runner to confirm that this
|
||||||
// and those may vary based on machine, OS, kernel memory management, other
|
// test is running in isolation (i.e. a dedicated process).
|
||||||
// running programs, exogenous memory pressure, and Go runtime versions.
|
EnvTestIsolated = "TEST_ISOLATED"
|
||||||
//
|
|
||||||
// The assertions we use here are lax, but should be sufficient to serve as a
|
// EnvTestDockerMemLimit is the memory limit applied in a docker container.
|
||||||
// reasonable litmus test of whether the watchdog is doing what it's supposed
|
EnvTestDockerMemLimit = "TEST_DOCKER_MEMLIMIT"
|
||||||
// to or not.
|
)
|
||||||
|
|
||||||
|
// DockerMemLimit is initialized in the init() function from the
|
||||||
|
// EnvTestDockerMemLimit env variable.
|
||||||
|
var DockerMemLimit int // bytes
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
Logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true}
|
||||||
|
|
||||||
|
if l := os.Getenv(EnvTestDockerMemLimit); l != "" {
|
||||||
|
l, err := strconv.Atoi(l)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
DockerMemLimit = l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func skipIfNotIsolated(t *testing.T) {
|
||||||
|
if os.Getenv(EnvTestIsolated) != "1" {
|
||||||
|
t.Skipf("skipping test in non-isolated mode")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
limit uint64 = 64 << 20 // 64MiB.
|
limit uint64 = 64 << 20 // 64MiB.
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func TestControl_Isolated(t *testing.T) {
|
||||||
Logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true}
|
skipIfNotIsolated(t)
|
||||||
}
|
|
||||||
|
|
||||||
func TestControl(t *testing.T) {
|
|
||||||
debug.SetGCPercent(100)
|
debug.SetGCPercent(100)
|
||||||
|
|
||||||
// retain 1MiB every iteration, up to 100MiB (beyond heap limit!).
|
rounds := 100
|
||||||
|
if DockerMemLimit != 0 {
|
||||||
|
rounds /= int(float64(DockerMemLimit)*0.8) / 1024 / 1024
|
||||||
|
}
|
||||||
|
|
||||||
|
// retain 1MiB every iteration.
|
||||||
var retained [][]byte
|
var retained [][]byte
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < rounds; i++ {
|
||||||
b := make([]byte, 1*1024*1024)
|
b := make([]byte, 1*1024*1024)
|
||||||
for i := range b {
|
for i := range b {
|
||||||
b[i] = byte(i)
|
b[i] = byte(i)
|
||||||
|
@ -52,11 +78,13 @@ func TestControl(t *testing.T) {
|
||||||
|
|
||||||
var ms runtime.MemStats
|
var ms runtime.MemStats
|
||||||
runtime.ReadMemStats(&ms)
|
runtime.ReadMemStats(&ms)
|
||||||
require.LessOrEqual(t, ms.NumGC, uint32(5)) // a maximum of 8 GCs should've happened.
|
require.NotZero(t, ms.NumGC) // GCs have taken place, but...
|
||||||
require.Zero(t, ms.NumForcedGC) // no forced GCs.
|
require.Zero(t, ms.NumForcedGC) // ... no forced GCs beyond our initial one.
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeapDriven(t *testing.T) {
|
func TestHeapDriven_Isolated(t *testing.T) {
|
||||||
|
skipIfNotIsolated(t)
|
||||||
|
|
||||||
// we can't mock ReadMemStats, because we're relying on the go runtime to
|
// we can't mock ReadMemStats, because we're relying on the go runtime to
|
||||||
// enforce the GC run, and the go runtime won't use our mock. Therefore, we
|
// enforce the GC run, and the go runtime won't use our mock. Therefore, we
|
||||||
// need to do the actual thing.
|
// need to do the actual thing.
|
||||||
|
@ -66,7 +94,7 @@ func TestHeapDriven(t *testing.T) {
|
||||||
Clock = clk
|
Clock = clk
|
||||||
|
|
||||||
observations := make([]*runtime.MemStats, 0, 100)
|
observations := make([]*runtime.MemStats, 0, 100)
|
||||||
NotifyFired = func() {
|
NotifyGC = func() {
|
||||||
var ms runtime.MemStats
|
var ms runtime.MemStats
|
||||||
runtime.ReadMemStats(&ms)
|
runtime.ReadMemStats(&ms)
|
||||||
observations = append(observations, &ms)
|
observations = append(observations, &ms)
|
||||||
|
@ -94,7 +122,9 @@ func TestHeapDriven(t *testing.T) {
|
||||||
require.GreaterOrEqual(t, ms.NumGC, uint32(9)) // over 9 GCs should've taken place.
|
require.GreaterOrEqual(t, ms.NumGC, uint32(9)) // over 9 GCs should've taken place.
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSystemDriven(t *testing.T) {
|
func TestSystemDriven_Isolated(t *testing.T) {
|
||||||
|
skipIfNotIsolated(t)
|
||||||
|
|
||||||
debug.SetGCPercent(100)
|
debug.SetGCPercent(100)
|
||||||
|
|
||||||
clk := clock.NewMock()
|
clk := clock.NewMock()
|
||||||
|
@ -115,7 +145,7 @@ func TestSystemDriven(t *testing.T) {
|
||||||
time.Sleep(200 * time.Millisecond) // give time for the watchdog to init.
|
time.Sleep(200 * time.Millisecond) // give time for the watchdog to init.
|
||||||
|
|
||||||
notifyCh := make(chan struct{}, 1)
|
notifyCh := make(chan struct{}, 1)
|
||||||
NotifyFired = func() {
|
NotifyGC = func() {
|
||||||
notifyCh <- struct{}{}
|
notifyCh <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue