mirror of https://github.com/status-im/consul.git
Merge pull request #939 from hashicorp/f-leak
Fixing memory leak caused by blocking query
This commit is contained in:
commit
c559023d81
|
@ -10,7 +10,7 @@ import (
|
|||
// notify list.
|
||||
type NotifyGroup struct {
|
||||
l sync.Mutex
|
||||
notify []chan struct{}
|
||||
notify map[chan struct{}]struct{}
|
||||
}
|
||||
|
||||
// Notify will do a non-blocking send to all waiting channels, and
|
||||
|
@ -18,20 +18,33 @@ type NotifyGroup struct {
|
|||
func (n *NotifyGroup) Notify() {
|
||||
n.l.Lock()
|
||||
defer n.l.Unlock()
|
||||
for _, ch := range n.notify {
|
||||
for ch, _ := range n.notify {
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
n.notify = n.notify[:0]
|
||||
n.notify = nil
|
||||
}
|
||||
|
||||
// Wait adds a channel to the notify group
|
||||
func (n *NotifyGroup) Wait(ch chan struct{}) {
|
||||
n.l.Lock()
|
||||
defer n.l.Unlock()
|
||||
n.notify = append(n.notify, ch)
|
||||
if n.notify == nil {
|
||||
n.notify = make(map[chan struct{}]struct{})
|
||||
}
|
||||
n.notify[ch] = struct{}{}
|
||||
}
|
||||
|
||||
// Clear removes a channel from the notify group
|
||||
func (n *NotifyGroup) Clear(ch chan struct{}) {
|
||||
n.l.Lock()
|
||||
defer n.l.Unlock()
|
||||
if n.notify == nil {
|
||||
return
|
||||
}
|
||||
delete(n.notify, ch)
|
||||
}
|
||||
|
||||
// WaitCh allocates a channel that is subscribed to notifications
|
||||
|
|
|
@ -54,3 +54,19 @@ func TestNotifyGroup(t *testing.T) {
|
|||
t.Fatalf("should not block")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNotifyGroup_Clear(t *testing.T) {
|
||||
grp := &NotifyGroup{}
|
||||
|
||||
ch1 := grp.WaitCh()
|
||||
grp.Clear(ch1)
|
||||
|
||||
grp.Notify()
|
||||
|
||||
// Should not get anything
|
||||
select {
|
||||
case <-ch1:
|
||||
t.Fatalf("should not get message")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,15 @@ const (
|
|||
// maxQueryTime is used to bound the limit of a blocking query
|
||||
maxQueryTime = 600 * time.Second
|
||||
|
||||
// defaultQueryTime is the amount of time we block waiting for a change
|
||||
// if no time is specified. Previously we would wait the maxQueryTime.
|
||||
defaultQueryTime = 300 * time.Second
|
||||
|
||||
// jitterFraction is a the limit to the amount of jitter we apply
|
||||
// to a user specified MaxQueryTime. We divide the specified time by
|
||||
// the fraction. So 16 == 6.25% limit of jitter
|
||||
jitterFraction = 16
|
||||
|
||||
// Warn if the Raft command is larger than this.
|
||||
// If it's over 1MB something is probably being abusive.
|
||||
raftWarnSize = 1024 * 1024
|
||||
|
@ -314,8 +323,9 @@ type blockingRPCOptions struct {
|
|||
// blockingRPCOpt is the replacement for blockingRPC as it allows
|
||||
// for more parameterization easily. It should be prefered over blockingRPC.
|
||||
func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
|
||||
var timeout <-chan time.Time
|
||||
var timeout *time.Timer
|
||||
var notifyCh chan struct{}
|
||||
var state *StateStore
|
||||
|
||||
// Fast path non-blocking
|
||||
if opts.queryOpts.MinQueryIndex == 0 {
|
||||
|
@ -327,30 +337,38 @@ func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
|
|||
panic("no tables to block on")
|
||||
}
|
||||
|
||||
// Restrict the max query time
|
||||
// Restrict the max query time, and ensure there is always one
|
||||
if opts.queryOpts.MaxQueryTime > maxQueryTime {
|
||||
opts.queryOpts.MaxQueryTime = maxQueryTime
|
||||
} else if opts.queryOpts.MaxQueryTime <= 0 {
|
||||
opts.queryOpts.MaxQueryTime = defaultQueryTime
|
||||
}
|
||||
|
||||
// Ensure a time limit is set if we have an index
|
||||
if opts.queryOpts.MinQueryIndex > 0 && opts.queryOpts.MaxQueryTime == 0 {
|
||||
opts.queryOpts.MaxQueryTime = maxQueryTime
|
||||
}
|
||||
// Apply a small amount of jitter to the request
|
||||
opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction)
|
||||
|
||||
// Setup a query timeout
|
||||
if opts.queryOpts.MaxQueryTime > 0 {
|
||||
timeout = time.After(opts.queryOpts.MaxQueryTime)
|
||||
}
|
||||
timeout = time.NewTimer(opts.queryOpts.MaxQueryTime)
|
||||
|
||||
// Setup a notification channel for changes
|
||||
SETUP_NOTIFY:
|
||||
if opts.queryOpts.MinQueryIndex > 0 {
|
||||
notifyCh = make(chan struct{}, 1)
|
||||
state := s.fsm.State()
|
||||
state.Watch(opts.tables, notifyCh)
|
||||
// Setup the notify channel
|
||||
notifyCh = make(chan struct{}, 1)
|
||||
|
||||
// Ensure we tear down any watchers on return
|
||||
state = s.fsm.State()
|
||||
defer func() {
|
||||
timeout.Stop()
|
||||
state.StopWatch(opts.tables, notifyCh)
|
||||
if opts.kvWatch {
|
||||
state.WatchKV(opts.kvPrefix, notifyCh)
|
||||
state.StopWatchKV(opts.kvPrefix, notifyCh)
|
||||
}
|
||||
}()
|
||||
|
||||
REGISTER_NOTIFY:
|
||||
// Register the notification channel. This may be done
|
||||
// multiple times if we have not reached the target wait index.
|
||||
state.Watch(opts.tables, notifyCh)
|
||||
if opts.kvWatch {
|
||||
state.WatchKV(opts.kvPrefix, notifyCh)
|
||||
}
|
||||
|
||||
RUN_QUERY:
|
||||
|
@ -372,8 +390,8 @@ RUN_QUERY:
|
|||
if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
|
||||
select {
|
||||
case <-notifyCh:
|
||||
goto SETUP_NOTIFY
|
||||
case <-timeout:
|
||||
goto REGISTER_NOTIFY
|
||||
case <-timeout.C:
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
|
|
@ -421,6 +421,13 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) {
|
|||
}
|
||||
}
|
||||
|
||||
// StopWatch is used to unsubscribe a channel to a set of MDBTables
|
||||
func (s *StateStore) StopWatch(tables MDBTables, notify chan struct{}) {
|
||||
for _, t := range tables {
|
||||
s.watch[t].Clear(notify)
|
||||
}
|
||||
}
|
||||
|
||||
// WatchKV is used to subscribe a channel to changes in KV data
|
||||
func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
|
||||
s.kvWatchLock.Lock()
|
||||
|
@ -439,6 +446,18 @@ func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
|
|||
s.kvWatch.Insert(prefix, grp)
|
||||
}
|
||||
|
||||
// StopWatchKV is used to unsubscribe a channel from changes in KV data
|
||||
func (s *StateStore) StopWatchKV(prefix string, notify chan struct{}) {
|
||||
s.kvWatchLock.Lock()
|
||||
defer s.kvWatchLock.Unlock()
|
||||
|
||||
// Check for an existing notify group
|
||||
if raw, ok := s.kvWatch.Get(prefix); ok {
|
||||
grp := raw.(*NotifyGroup)
|
||||
grp.Clear(notify)
|
||||
}
|
||||
}
|
||||
|
||||
// notifyKV is used to notify any KV listeners of a change
|
||||
// on a prefix
|
||||
func (s *StateStore) notifyKV(path string, prefix bool) {
|
||||
|
|
|
@ -127,6 +127,37 @@ func TestGetNodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestGetNodes_Watch_StopWatch(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
notify1 := make(chan struct{}, 1)
|
||||
notify2 := make(chan struct{}, 1)
|
||||
|
||||
store.Watch(store.QueryTables("Nodes"), notify1)
|
||||
store.Watch(store.QueryTables("Nodes"), notify2)
|
||||
store.StopWatch(store.QueryTables("Nodes"), notify2)
|
||||
|
||||
if err := store.EnsureNode(40, structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-notify1:
|
||||
default:
|
||||
t.Fatalf("should be notified")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-notify2:
|
||||
t.Fatalf("should not be notified")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGetNodes(b *testing.B) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
|
@ -1429,6 +1460,32 @@ func TestKVSSet_Watch(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestKVSSet_Watch_Stop(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
notify1 := make(chan struct{}, 1)
|
||||
|
||||
store.WatchKV("", notify1)
|
||||
store.StopWatchKV("", notify1)
|
||||
|
||||
// Create the entry
|
||||
d := &structs.DirEntry{Key: "foo/baz", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1000, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check that we've not fired notify1
|
||||
select {
|
||||
case <-notify1:
|
||||
t.Fatalf("should not notify ")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSSet_Get(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
|
|
|
@ -4,12 +4,14 @@ import (
|
|||
crand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
@ -222,3 +224,8 @@ func generateUUID() string {
|
|||
buf[8:10],
|
||||
buf[10:16])
|
||||
}
|
||||
|
||||
// Returns a random stagger interval between 0 and the duration
|
||||
func randomStagger(intv time.Duration) time.Duration {
|
||||
return time.Duration(uint64(rand.Int63()) % uint64(intv))
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"net"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
@ -124,3 +125,13 @@ func TestGenerateUUID(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRandomStagger(t *testing.T) {
|
||||
intv := time.Minute
|
||||
for i := 0; i < 10; i++ {
|
||||
stagger := randomStagger(intv)
|
||||
if stagger < 0 || stagger >= intv {
|
||||
t.Fatalf("Bad: %v", stagger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,9 +39,9 @@ query string parameter to the value of `X-Consul-Index`, indicating that the cli
|
|||
to wait for any changes subsequent to that index.
|
||||
|
||||
In addition to `index`, endpoints that support blocking will also honor a `wait`
|
||||
parameter specifying a maximum duration for the blocking request. If not set, it will
|
||||
default to 10 minutes. This value can be specified in the form of "10s" or "5m" (i.e.,
|
||||
10 seconds or 5 minutes, respectively).
|
||||
parameter specifying a maximum duration for the blocking request. This is limited to
|
||||
10 minutes. If not set, the wait time defaults to 5 minutes. This value can be specified
|
||||
in the form of "10s" or "5m" (i.e., 10 seconds or 5 minutes, respectively).
|
||||
|
||||
A critical note is that the return of a blocking request is **no guarantee** of a change. It
|
||||
is possible that the timeout was reached or that there was an idempotent write that does
|
||||
|
|
Loading…
Reference in New Issue