Merge pull request #6 from raulk/cgroups-driven

This commit is contained in:
raulk 2021-01-18 22:33:53 +00:00 committed by GitHub
commit 13cc66ee4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 600 additions and 184 deletions

View File

@ -1,13 +1,57 @@
# Golang CircleCI 2.0 configuration file # Golang CircleCI 2.0 configuration file
version: 2 version: 2.1
jobs:
build:
docker:
- image: circleci/golang:1.15
working_directory: /go/src/github.com/{{ORG_NAME}}/{{REPO_NAME}} parameters:
go-version:
type: string
default: "1.15.5"
workspace-dir:
type: string
default: "/home/circleci"
commands:
setup:
description: "install go, checkout and restore cache"
steps: steps:
- run:
name: "install go"
command: |
curl --create-dirs -o $GOPATH/go.tar.gz https://dl.google.com/go/go${GOVERSION}.linux-amd64.tar.gz
tar --strip-components=1 -C $GOPATH -xzf $GOPATH/go.tar.gz
rm -rf $GOPATH/go.tar.gz
- checkout
- restore_cache:
keys:
- 'v2-pkg-cache-{{ checksum "go.sum" }}-{{ .Environment.GOVERSION }}'
- 'bin-cache-{{ .Branch }}'
setup-macos:
description: "install go, checkout and restore cache"
steps:
- run:
name: "install go on macOS"
command: |
brew --version
[ ! -d /usr/local/opt/go@1.14 ] && brew update && brew install go@1.14 && echo "done installing go"
echo 'export GOPATH="$HOME/go"' >> $BASH_ENV
echo 'export PATH="/usr/local/opt/go@1.14/bin:$GOPATH/bin:$PATH"' >> $BASH_ENV
source $BASH_ENV
go version
- checkout - checkout
- run: go get -v -t -d ./... executors:
- run: go test -v ./... linux:
machine:
image: ubuntu-1604:201903-01
working_directory: << pipeline.parameters.workspace-dir >>/project
environment:
GOPATH: << pipeline.parameters.workspace-dir >>/go/<< pipeline.parameters.go-version >>
PATH: << pipeline.parameters.workspace-dir >>/go/<< pipeline.parameters.go-version >>/bin:<< pipeline.parameters.workspace-dir >>/bin:/usr/local/bin:/usr/bin:/bin
GOVERSION: << pipeline.parameters.go-version >>
jobs:
build:
executor: linux
steps:
- checkout
- setup
- run: make

1
.dockerignore Normal file
View File

@ -0,0 +1 @@
Makefile

22
Dockerfile.dlv Normal file
View File

@ -0,0 +1,22 @@
## This Dockerfile compiles the watchdog with delve support. It enables the tests
## to be debugged inside a container.
##
## Run with:
## docker run --memory=64MiB --memory-swap=64MiB -p 2345:2345 <image> \
## --listen=:2345 --headless=true --log=true \
## --log-output=debugger,debuglineerr,gdbwire,lldbout,rpc \
## --accept-multiclient --api-version=2 exec /root/watchdog.test
##
FROM golang:1.15.5
WORKDIR /watchdog
COPY . .
RUN CGO_ENABLED=0 go get -ldflags "-s -w -extldflags '-static'" github.com/go-delve/delve/cmd/dlv
RUN CGO_ENABLED=0 go test -gcflags "all=-N -l" -c -o ./watchdog.test
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=0 /go/bin/dlv /dlv
COPY --from=0 /watchdog/watchdog.test .
ENTRYPOINT [ "/dlv" ]
EXPOSE 2345

10
Dockerfile.test Normal file
View File

@ -0,0 +1,10 @@
FROM golang:1.15.5
WORKDIR /watchdog
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go test -c -o watchdog.test
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=0 /watchdog/watchdog.test .
CMD ["/root/watchdog.test", "-test.v"]

36
Makefile Normal file
View File

@ -0,0 +1,36 @@
SHELL = /bin/bash
.PHONY: test
# these tests run in isolation by calling go test -run=... or the equivalent.
ISOLATED_TESTS +=
ifdef CI
ISOLATED_TESTS = TestControl_Isolated \
TestSystemDriven_Isolated \
TestHeapDriven_Isolated
else
ISOLATED_TESTS = TestControl_Isolated \
TestSystemDriven_Isolated \
TestHeapDriven_Isolated \
TestCgroupsDriven_Create_Isolated \
TestCgroupsDriven_Docker_Isolated
endif
test: test-binary test-docker
test-binary:
go test -v ./... # run all the non-isolated tests.
# foreach does not actually execute each iteration; it expands the text, and it's executed all at once
# that's why we use && true, to shorcircuit if a test fails.
$(foreach name,$(ISOLATED_TESTS),TEST_ISOLATED=1 go test -v -test.run=$(name) ./... && ) true
test-docker: docker
docker run --memory=32MiB --memory-swap=32MiB -e TEST_DOCKER_MEMLIMIT=33554432 raulk/watchdog:latest
$(foreach name,$(ISOLATED_TESTS),docker run \
--memory=32MiB --memory-swap=32MiB \
-e TEST_ISOLATED=1 \
-e TEST_DOCKER_MEMLIMIT=33554432 \
raulk/watchdog:latest /root/watchdog.test -test.v -test.run=$(name) ./... && ) true
docker:
docker build -f ./Dockerfile.test -t raulk/watchdog:latest .

View File

@ -5,27 +5,59 @@
[![godocs](https://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square)](https://godoc.org/github.com/raulk/go-watchdog) [![godocs](https://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square)](https://godoc.org/github.com/raulk/go-watchdog)
[![build status](https://circleci.com/gh/raulk/go-watchdog.svg?style=svg)](https://circleci.com/gh/raulk/go-watchdog) [![build status](https://circleci.com/gh/raulk/go-watchdog.svg?style=svg)](https://circleci.com/gh/raulk/go-watchdog)
go-watchdog runs a singleton memory watchdog in the process, which watches Package watchdog runs a singleton memory watchdog in the process, which
memory utilization and forces Go GC in accordance with a user-defined policy. watches memory utilization and forces Go GC in accordance with a
user-defined policy.
There are two kinds of watchdog so far: There three kinds of watchdogs:
* **heap-driven:** applies a limit to the heap, and obtains current usage through 1. heap-driven (`watchdog.HeapDriven()`): applies a heap limit, adjusting GOGC
`runtime.ReadMemStats()`. dynamically in accordance with the policy.
* **system-driven:** applies a limit to the total system memory used, and obtains 2. system-driven (`watchdog.SystemDriven()`): applies a limit to the total
current usage through [`elastic/go-sigar`](https://github.com/elastic/gosigar). system memory used, obtaining the current usage through elastic/go-sigar.
3. cgroups-driven (`watchdog.CgroupDriven()`): discovers the memory limit from
the cgroup of the process (derived from /proc/self/cgroup), or from the
root cgroup path if the PID == 1 (which indicates that the process is
running in a container). It uses the cgroup stats to obtain the
current usage.
A third process-driven watchdog that uses cgroups is underway. The watchdog's behaviour is controlled by the policy, a pluggable function
that determines when to trigger GC based on the current utilization. This
library ships with two policies:
This library ships with two policies out of the box: 1. watermarks policy (`watchdog.NewWatermarkPolicy()`): runs GC at configured
watermarks of memory utilisation.
2. adaptive policy (`watchdog.NewAdaptivePolicy()`): runs GC when the current
usage surpasses a dynamically-set threshold.
* watermarks policy: runs GC at configured watermarks of system or heap memory You can easily write a custom policy tailored to the allocation patterns of
utilisation. your program.
* adaptive policy: runs GC when the current usage surpasses a dynamically-set
threshold. ## Recommended way to set up the watchdog
You can easily build a custom policy tailored to the allocation patterns of your The recommended way to set up the watchdog is as follows, in descending order
program. of precedence. This logic assumes that the library supports setting a heap
limit through an environment variable (e.g. MYAPP_HEAP_MAX) or config key.
1. If heap limit is set and legal, initialize a heap-driven watchdog.
2. Otherwise, try to use the cgroup-driven watchdog. If it succeeds, return.
3. Otherwise, try to initialize a system-driven watchdog. If it succeeds, return.
4. Watchdog initialization failed. Log a warning to inform the user that
they're flying solo.
## Running the tests
Given the low-level nature of this component, some tests need to run in
isolation, so that they don't carry over Go runtime metrics. For completeness,
this module uses a Docker image for testing, so we can simulate cgroup memory
limits.
The test execution and docker builds have been conveniently packaged in a
Makefile. Run with:
```shell
$ make
```
## Why is this even needed? ## Why is this even needed?

40
doc.go Normal file
View File

@ -0,0 +1,40 @@
// Package watchdog runs a singleton memory watchdog in the process, which
// watches memory utilization and forces Go GC in accordance with a
// user-defined policy.
//
// There three kinds of watchdogs:
//
// 1. heap-driven (watchdog.HeapDriven()): applies a heap limit, adjusting GOGC
// dynamically in accordance with the policy.
// 2. system-driven (watchdog.SystemDriven()): applies a limit to the total
// system memory used, obtaining the current usage through elastic/go-sigar.
// 3. cgroups-driven (watchdog.CgroupDriven()): discovers the memory limit from
// the cgroup of the process (derived from /proc/self/cgroup), or from the
// root cgroup path if the PID == 1 (which indicates that the process is
// running in a container). It uses the cgroup stats to obtain the
// current usage.
//
// The watchdog's behaviour is controlled by the policy, a pluggable function
// that determines when to trigger GC based on the current utilization. This
// library ships with two policies:
//
// 1. watermarks policy (watchdog.NewWatermarkPolicy()): runs GC at configured
// watermarks of memory utilisation.
// 2. adaptive policy (watchdog.NewAdaptivePolicy()): runs GC when the current
// usage surpasses a dynamically-set threshold.
//
// You can easily write a custom policy tailored to the allocation patterns of
// your program.
//
// Recommended way to set up the watchdog
//
// The recommended way to set up the watchdog is as follows, in descending order
// of precedence. This logic assumes that the library supports setting a heap
// limit through an environment variable (e.g. MYAPP_HEAP_MAX) or config key.
//
// 1. If heap limit is set and legal, initialize a heap-driven watchdog.
// 2. Otherwise, try to use the cgroup-driven watchdog. If it succeeds, return.
// 3. Otherwise, try to initialize a system-driven watchdog. If it succeeds, return.
// 4. Watchdog initialization failed. Log a warning to inform the user that
// they're flying solo.
package watchdog

1
go.mod
View File

@ -6,6 +6,7 @@ require (
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327 github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327
github.com/elastic/gosigar v0.12.0 github.com/elastic/gosigar v0.12.0
github.com/kr/pretty v0.1.0 // indirect github.com/kr/pretty v0.1.0 // indirect
github.com/opencontainers/runtime-spec v1.0.2
github.com/raulk/clock v1.1.0 github.com/raulk/clock v1.1.0
github.com/stretchr/testify v1.4.0 github.com/stretchr/testify v1.4.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect

View File

@ -1,26 +0,0 @@
package watchdog
import (
"os"
"github.com/containerd/cgroups"
)
func ProcessMemoryLimit() uint64 {
var (
pid = os.Getpid()
memSubsystem = cgroups.SingleSubsystem(cgroups.V1, cgroups.Memory)
)
cgroup, err := cgroups.Load(memSubsystem, cgroups.PidPath(pid))
if err != nil {
return 0
}
metrics, err := cgroup.Stat()
if err != nil {
return 0
}
if metrics.Memory == nil {
return 0
}
return metrics.Memory.HierarchicalMemoryLimit
}

View File

@ -1,7 +0,0 @@
// +build !linux
package watchdog
func ProcessMemoryLimit() uint64 {
return 0
}

View File

@ -1,6 +1,7 @@
package watchdog package watchdog
import ( import (
"errors"
"fmt" "fmt"
"log" "log"
"math" "math"
@ -13,6 +14,10 @@ import (
"github.com/raulk/clock" "github.com/raulk/clock"
) )
// ErrNotSupported is returned when the watchdog does not support the requested
// run mode in the current OS/arch.
var ErrNotSupported = errors.New("watchdog run mode not supported")
// PolicyTempDisabled is a marker value for policies to signal that the policy // PolicyTempDisabled is a marker value for policies to signal that the policy
// is temporarily disabled. Use it when all hope is lost to turn around from // is temporarily disabled. Use it when all hope is lost to turn around from
// significant memory pressure (such as when above an "extreme" watermark). // significant memory pressure (such as when above an "extreme" watermark).
@ -28,9 +33,8 @@ var (
// Clock can be used to inject a mock clock for testing. // Clock can be used to inject a mock clock for testing.
Clock = clock.New() Clock = clock.New()
// NotifyFired, if non-nil, will be called when the policy has fired, // NotifyGC, if non-nil, will be called when a GC has happened.
// prior to calling GC, even if GC is disabled. NotifyGC func() = func() {}
NotifyFired func() = func() {}
) )
var ( var (
@ -104,6 +108,8 @@ const (
// UtilizationSystem specifies that the policy compares against actual used // UtilizationSystem specifies that the policy compares against actual used
// system memory. // system memory.
UtilizationSystem UtilizationType = iota UtilizationSystem UtilizationType = iota
// UtilizationProcess specifies that the watchdog is using process limits.
UtilizationProcess
// UtilizationHeap specifies that the policy compares against heap used. // UtilizationHeap specifies that the policy compares against heap used.
UtilizationHeap UtilizationHeap
) )
@ -126,13 +132,6 @@ type Policy interface {
// //
// A zero-valued limit will error. // A zero-valued limit will error.
func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) { func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func()) {
_watchdog.lk.Lock()
defer _watchdog.lk.Unlock()
if _watchdog.state != stateUnstarted {
return ErrAlreadyStarted, nil
}
if limit == 0 { if limit == 0 {
return fmt.Errorf("cannot use zero limit for heap-driven watchdog"), nil return fmt.Errorf("cannot use zero limit for heap-driven watchdog"), nil
} }
@ -142,9 +141,9 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
} }
_watchdog.state = stateRunning if err := start(UtilizationHeap); err != nil {
_watchdog.scope = UtilizationHeap return err, nil
_watchdog.closing = make(chan struct{}) }
gcTriggered := make(chan struct{}, 16) gcTriggered := make(chan struct{}, 16)
setupGCSentinel(gcTriggered) setupGCSentinel(gcTriggered)
@ -163,7 +162,7 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
for { for {
select { select {
case <-gcTriggered: case <-gcTriggered:
NotifyFired() NotifyGC()
case <-_watchdog.closing: case <-_watchdog.closing:
return return
@ -218,18 +217,12 @@ func HeapDriven(limit uint64, policyCtor PolicyCtor) (err error, stopFn func())
// This threshold is calculated by querying the policy every time that GC runs, // This threshold is calculated by querying the policy every time that GC runs,
// either triggered by the runtime, or forced by us. // either triggered by the runtime, or forced by us.
func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) { func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) {
_watchdog.lk.Lock()
defer _watchdog.lk.Unlock()
if _watchdog.state != stateUnstarted {
return ErrAlreadyStarted, nil
}
if limit == 0 { if limit == 0 {
limit, err = determineLimit(false) var sysmem gosigar.Mem
if err != nil { if err := sysmemFn(&sysmem); err != nil {
return err, nil return fmt.Errorf("failed to get system memory stats: %w", err), nil
} }
limit = sysmem.Total
} }
policy, err := policyCtor(limit) policy, err := policyCtor(limit)
@ -237,97 +230,92 @@ func SystemDriven(limit uint64, frequency time.Duration, policyCtor PolicyCtor)
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
} }
_watchdog.state = stateRunning if err := start(UtilizationSystem); err != nil {
_watchdog.scope = UtilizationSystem return err, nil
_watchdog.closing = make(chan struct{}) }
gcTriggered := make(chan struct{}, 16)
setupGCSentinel(gcTriggered)
_watchdog.wg.Add(1) _watchdog.wg.Add(1)
go func() { var sysmem gosigar.Mem
defer _watchdog.wg.Done() go pollingWatchdog(policy, frequency, func() (uint64, error) {
if err := sysmemFn(&sysmem); err != nil {
var ( return 0, err
memstats runtime.MemStats
sysmem gosigar.Mem
threshold uint64
)
renewThreshold := func() {
// get the current usage.
if err := sysmemFn(&sysmem); err != nil {
Logger.Warnf("failed to obtain system memory stats; err: %s", err)
return
}
// calculate the threshold.
threshold = policy.Evaluate(UtilizationSystem, sysmem.ActualUsed)
} }
return sysmem.ActualUsed, nil
// initialize the threshold. })
renewThreshold()
// initialize an empty timer.
timer := Clock.Timer(0)
stopTimer := func() {
if !timer.Stop() {
<-timer.C
}
}
for {
timer.Reset(frequency)
select {
case <-timer.C:
// get the current usage.
if err := sysmemFn(&sysmem); err != nil {
Logger.Warnf("failed to obtain system memory stats; err: %s", err)
continue
}
actual := sysmem.ActualUsed
if actual < threshold {
// nothing to do.
continue
}
// trigger GC; this will emit a gcTriggered event which we'll
// consume next to readjust the threshold.
Logger.Warnf("system-driven watchdog triggering GC; %d/%d bytes (used/threshold)", actual, threshold)
forceGC(&memstats)
case <-gcTriggered:
NotifyFired()
renewThreshold()
stopTimer()
case <-_watchdog.closing:
stopTimer()
return
}
}
}()
return nil, stop return nil, stop
} }
func determineLimit(restrictByProcess bool) (uint64, error) { // pollingWatchdog starts a polling watchdog with the provided policy, using
// TODO. // the supplied polling frequency. On every tick, it calls usageFn and, if the
// if restrictByProcess { // usage is greater or equal to the threshold at the time, it forces GC.
// if pmem := ProcessMemoryLimit(); pmem > 0 { // usageFn is guaranteed to be called serially, so no locking should be
// Logger.Infof("watchdog using process limit: %d bytes", pmem) // necessary.
// return pmem, nil func pollingWatchdog(policy Policy, frequency time.Duration, usageFn func() (uint64, error)) {
// } defer _watchdog.wg.Done()
// Logger.Infof("watchdog was unable to determine process limit; falling back to total system memory")
// }
// populate initial utilisation and system stats. gcTriggered := make(chan struct{}, 16)
var sysmem gosigar.Mem setupGCSentinel(gcTriggered)
if err := sysmemFn(&sysmem); err != nil {
return 0, fmt.Errorf("failed to get system memory stats: %w", err) var (
memstats runtime.MemStats
threshold uint64
)
renewThreshold := func() {
// get the current usage.
usage, err := usageFn()
if err != nil {
Logger.Warnf("failed to obtain memory utilization stats; err: %s", err)
return
}
// calculate the threshold.
threshold = policy.Evaluate(_watchdog.scope, usage)
}
// initialize the threshold.
renewThreshold()
// initialize an empty timer.
timer := Clock.Timer(0)
stopTimer := func() {
if !timer.Stop() {
<-timer.C
}
}
for {
timer.Reset(frequency)
select {
case <-timer.C:
// get the current usage.
usage, err := usageFn()
if err != nil {
Logger.Warnf("failed to obtain memory utilizationstats; err: %s", err)
continue
}
if usage < threshold {
// nothing to do.
continue
}
// trigger GC; this will emit a gcTriggered event which we'll
// consume next to readjust the threshold.
Logger.Warnf("system-driven watchdog triggering GC; %d/%d bytes (used/threshold)", usage, threshold)
forceGC(&memstats)
case <-gcTriggered:
NotifyGC()
renewThreshold()
stopTimer()
case <-_watchdog.closing:
stopTimer()
return
}
} }
return sysmem.Total, nil
} }
// forceGC forces a manual GC. // forceGC forces a manual GC.
@ -379,6 +367,20 @@ func setupGCSentinel(gcTriggered chan struct{}) {
runtime.SetFinalizer(&sentinel{}, finalizer) // start the flywheel. runtime.SetFinalizer(&sentinel{}, finalizer) // start the flywheel.
} }
func start(scope UtilizationType) error {
_watchdog.lk.Lock()
defer _watchdog.lk.Unlock()
if _watchdog.state != stateUnstarted {
return ErrAlreadyStarted
}
_watchdog.state = stateRunning
_watchdog.scope = scope
_watchdog.closing = make(chan struct{})
return nil
}
func stop() { func stop() {
_watchdog.lk.Lock() _watchdog.lk.Lock()
defer _watchdog.lk.Unlock() defer _watchdog.lk.Unlock()

73
watchdog_linux.go Normal file
View File

@ -0,0 +1,73 @@
package watchdog
import (
"fmt"
"os"
"time"
"github.com/containerd/cgroups"
)
var (
pid = os.Getpid()
memSubsystem = cgroups.SingleSubsystem(cgroups.V1, cgroups.Memory)
)
// CgroupDriven initializes a cgroups-driven watchdog. It will try to discover
// the memory limit from the cgroup of the process (derived from /proc/self/cgroup),
// or from the root cgroup path if the PID == 1 (which indicates that the process
// is running in a container).
//
// Memory usage is calculated by querying the cgroup stats.
//
// This function will return an error immediately if the OS does not support cgroups,
// or if another error occurs during initialization. The caller can then safely fall
// back to the system driven watchdog.
func CgroupDriven(frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) {
// use self path unless our PID is 1, in which case we're running inside
// a container and our limits are in the root path.
path := cgroups.NestedPath("")
if pid := os.Getpid(); pid == 1 {
path = cgroups.RootPath
}
cgroup, err := cgroups.Load(memSubsystem, path)
if err != nil {
return fmt.Errorf("failed to load cgroup for process: %w", err), nil
}
var limit uint64
if stat, err := cgroup.Stat(); err != nil {
return fmt.Errorf("failed to load memory cgroup stats: %w", err), nil
} else if stat.Memory == nil || stat.Memory.Usage == nil {
return fmt.Errorf("cgroup memory stats are nil; aborting"), nil
} else {
limit = stat.Memory.Usage.Limit
}
if limit == 0 {
return fmt.Errorf("cgroup limit is 0; refusing to start memory watchdog"), nil
}
policy, err := policyCtor(limit)
if err != nil {
return fmt.Errorf("failed to construct policy with limit %d: %w", limit, err), nil
}
if err := start(UtilizationProcess); err != nil {
return err, nil
}
_watchdog.wg.Add(1)
go pollingWatchdog(policy, frequency, func() (uint64, error) {
stat, err := cgroup.Stat()
if err != nil {
return 0, err
} else if stat.Memory == nil || stat.Memory.Usage == nil {
return 0, fmt.Errorf("cgroup memory stats are nil; aborting")
}
return stat.Memory.Usage.Usage, nil
})
return nil, stop
}

130
watchdog_linux_test.go Normal file
View File

@ -0,0 +1,130 @@
package watchdog
import (
"fmt"
"log"
"os"
"runtime"
"runtime/debug"
"testing"
"time"
"github.com/containerd/cgroups"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/raulk/clock"
"github.com/stretchr/testify/require"
)
// retained will hoard unreclaimable byte buffers in the heap.
var retained [][]byte
func TestCgroupsDriven_Create_Isolated(t *testing.T) {
skipIfNotIsolated(t)
if os.Getpid() == 1 {
// we are running in Docker and cannot create a cgroup.
t.Skipf("cannot create a cgroup while running in non-privileged docker")
}
// new cgroup limit.
var limit = uint64(32 << 20) // 32MiB.
createMemoryCgroup(t, limit)
testCgroupsWatchdog(t, limit)
}
func TestCgroupsDriven_Docker_Isolated(t *testing.T) {
skipIfNotIsolated(t)
if os.Getpid() != 1 {
// we are not running in a container.
t.Skipf("test only runs inside a container")
}
testCgroupsWatchdog(t, uint64(DockerMemLimit))
}
func testCgroupsWatchdog(t *testing.T, limit uint64) {
t.Cleanup(func() {
retained = nil
})
runtime.GC() // first GC to clear any junk from other tests.
debug.SetGCPercent(100000000) // disable GC.
clk := clock.NewMock()
Clock = clk
notifyCh := make(chan struct{}, 1)
NotifyGC = func() {
notifyCh <- struct{}{}
}
err, stopFn := CgroupDriven(5*time.Second, NewAdaptivePolicy(0.5))
require.NoError(t, err)
defer stopFn()
time.Sleep(200 * time.Millisecond) // give time for the watchdog to init.
maxSlabs := limit / (1 << 20) // number of 1MiB slabs to take up the entire limit.
// first tick; nothing should happen.
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
require.Len(t, notifyCh, 0) // no GC has taken place.
// allocate 50% of limit in heap (to be added to other mem usage).
for i := 0; i < (int(maxSlabs))/2; i++ {
retained = append(retained, func() []byte {
b := make([]byte, 1*1024*1024)
for i := range b {
b[i] = 0xff
}
return b
}())
}
// second tick; used = just over 50%; will trigger GC.
clk.Add(5 * time.Second)
time.Sleep(200 * time.Millisecond)
require.NotNil(t, <-notifyCh)
var memstats runtime.MemStats
runtime.ReadMemStats(&memstats)
require.EqualValues(t, 2, memstats.NumForcedGC)
}
// createMemoryCgroup creates a memory cgroup to restrict the memory available
// to this test.
func createMemoryCgroup(t *testing.T, limit uint64) {
l := int64(limit)
path := cgroups.NestedPath(fmt.Sprintf("/%d", time.Now().UnixNano()))
cgroup, err := cgroups.New(cgroups.V1, path, &specs.LinuxResources{
Memory: &specs.LinuxMemory{
Limit: &l,
Swap: &l,
},
})
require.NoError(t, err, "failed to create a cgroup")
t.Cleanup(func() {
root, err := cgroups.Load(cgroups.V1, cgroups.RootPath)
if err != nil {
t.Logf("failed to resolve root cgroup: %s", err)
return
}
if err = root.Add(cgroups.Process{Pid: pid}); err != nil {
t.Logf("failed to move process to root cgroup: %s", err)
return
}
if err = cgroup.Delete(); err != nil {
t.Logf("failed to clean up temp cgroup: %s", err)
}
})
log.Printf("cgroup created")
// add process to cgroup.
err = cgroup.Add(cgroups.Process{Pid: pid})
require.NoError(t, err)
}

13
watchdog_other.go Normal file
View File

@ -0,0 +1,13 @@
// +build !linux
package watchdog
import (
"fmt"
"time"
)
// CgroupDriven is only available in Linux. This method will error.
func CgroupDriven(frequency time.Duration, policyCtor PolicyCtor) (err error, stopFn func()) {
return fmt.Errorf("cgroups-driven watchdog: %w", ErrNotSupported), nil
}

15
watchdog_other_test.go Normal file
View File

@ -0,0 +1,15 @@
// +build !linux
package watchdog
import "testing"
func TestCgroupsDriven_Create_Isolated(t *testing.T) {
// this test only runs on linux.
t.Skip("test only valid on linux")
}
func TestCgroupsDriven_Docker_Isolated(t *testing.T) {
// this test only runs on linux.
t.Skip("test only valid on linux")
}

View File

@ -6,6 +6,7 @@ import (
"os" "os"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"strconv"
"testing" "testing"
"time" "time"
@ -14,29 +15,54 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// These integration tests are a hugely non-deterministic, but necessary to get const (
// good coverage and confidence. The Go runtime makes its own pacing decisions, // EnvTestIsolated is a marker property for the runner to confirm that this
// and those may vary based on machine, OS, kernel memory management, other // test is running in isolation (i.e. a dedicated process).
// running programs, exogenous memory pressure, and Go runtime versions. EnvTestIsolated = "TEST_ISOLATED"
//
// The assertions we use here are lax, but should be sufficient to serve as a // EnvTestDockerMemLimit is the memory limit applied in a docker container.
// reasonable litmus test of whether the watchdog is doing what it's supposed EnvTestDockerMemLimit = "TEST_DOCKER_MEMLIMIT"
// to or not. )
// DockerMemLimit is initialized in the init() function from the
// EnvTestDockerMemLimit env variable.
var DockerMemLimit int // bytes
func init() {
Logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true}
if l := os.Getenv(EnvTestDockerMemLimit); l != "" {
l, err := strconv.Atoi(l)
if err != nil {
panic(err)
}
DockerMemLimit = l
}
}
func skipIfNotIsolated(t *testing.T) {
if os.Getenv(EnvTestIsolated) != "1" {
t.Skipf("skipping test in non-isolated mode")
}
}
var ( var (
limit uint64 = 64 << 20 // 64MiB. limit uint64 = 64 << 20 // 64MiB.
) )
func init() { func TestControl_Isolated(t *testing.T) {
Logger = &stdlog{log: log.New(os.Stdout, "[watchdog test] ", log.LstdFlags|log.Lmsgprefix), debug: true} skipIfNotIsolated(t)
}
func TestControl(t *testing.T) {
debug.SetGCPercent(100) debug.SetGCPercent(100)
// retain 1MiB every iteration, up to 100MiB (beyond heap limit!). rounds := 100
if DockerMemLimit != 0 {
rounds /= int(float64(DockerMemLimit)*0.8) / 1024 / 1024
}
// retain 1MiB every iteration.
var retained [][]byte var retained [][]byte
for i := 0; i < 100; i++ { for i := 0; i < rounds; i++ {
b := make([]byte, 1*1024*1024) b := make([]byte, 1*1024*1024)
for i := range b { for i := range b {
b[i] = byte(i) b[i] = byte(i)
@ -52,11 +78,13 @@ func TestControl(t *testing.T) {
var ms runtime.MemStats var ms runtime.MemStats
runtime.ReadMemStats(&ms) runtime.ReadMemStats(&ms)
require.LessOrEqual(t, ms.NumGC, uint32(5)) // a maximum of 8 GCs should've happened. require.NotZero(t, ms.NumGC) // GCs have taken place, but...
require.Zero(t, ms.NumForcedGC) // no forced GCs. require.Zero(t, ms.NumForcedGC) // ... no forced GCs beyond our initial one.
} }
func TestHeapDriven(t *testing.T) { func TestHeapDriven_Isolated(t *testing.T) {
skipIfNotIsolated(t)
// we can't mock ReadMemStats, because we're relying on the go runtime to // we can't mock ReadMemStats, because we're relying on the go runtime to
// enforce the GC run, and the go runtime won't use our mock. Therefore, we // enforce the GC run, and the go runtime won't use our mock. Therefore, we
// need to do the actual thing. // need to do the actual thing.
@ -66,7 +94,7 @@ func TestHeapDriven(t *testing.T) {
Clock = clk Clock = clk
observations := make([]*runtime.MemStats, 0, 100) observations := make([]*runtime.MemStats, 0, 100)
NotifyFired = func() { NotifyGC = func() {
var ms runtime.MemStats var ms runtime.MemStats
runtime.ReadMemStats(&ms) runtime.ReadMemStats(&ms)
observations = append(observations, &ms) observations = append(observations, &ms)
@ -91,10 +119,12 @@ func TestHeapDriven(t *testing.T) {
var ms runtime.MemStats var ms runtime.MemStats
runtime.ReadMemStats(&ms) runtime.ReadMemStats(&ms)
require.GreaterOrEqual(t, ms.NumGC, uint32(9)) // over 9 GCs should've taken place. require.GreaterOrEqual(t, ms.NumGC, uint32(5)) // over 5 GCs should've taken place.
} }
func TestSystemDriven(t *testing.T) { func TestSystemDriven_Isolated(t *testing.T) {
skipIfNotIsolated(t)
debug.SetGCPercent(100) debug.SetGCPercent(100)
clk := clock.NewMock() clk := clock.NewMock()
@ -115,7 +145,7 @@ func TestSystemDriven(t *testing.T) {
time.Sleep(200 * time.Millisecond) // give time for the watchdog to init. time.Sleep(200 * time.Millisecond) // give time for the watchdog to init.
notifyCh := make(chan struct{}, 1) notifyCh := make(chan struct{}, 1)
NotifyFired = func() { NotifyGC = func() {
notifyCh <- struct{}{} notifyCh <- struct{}{}
} }