From c54ab04794030a6773b08234b835288130a7ad38 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 14 May 2015 17:30:30 -0700 Subject: [PATCH 1/7] consul: adding Clear to NotifyGroup --- consul/notify.go | 21 +++++++++++++++++---- consul/notify_test.go | 16 ++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/consul/notify.go b/consul/notify.go index 88da09242c..2fe5acbe2b 100644 --- a/consul/notify.go +++ b/consul/notify.go @@ -10,7 +10,7 @@ import ( // notify list. type NotifyGroup struct { l sync.Mutex - notify []chan struct{} + notify map[chan struct{}]struct{} } // Notify will do a non-blocking send to all waiting channels, and @@ -18,20 +18,33 @@ type NotifyGroup struct { func (n *NotifyGroup) Notify() { n.l.Lock() defer n.l.Unlock() - for _, ch := range n.notify { + for ch, _ := range n.notify { select { case ch <- struct{}{}: default: } } - n.notify = n.notify[:0] + n.notify = nil } // Wait adds a channel to the notify group func (n *NotifyGroup) Wait(ch chan struct{}) { n.l.Lock() defer n.l.Unlock() - n.notify = append(n.notify, ch) + if n.notify == nil { + n.notify = make(map[chan struct{}]struct{}) + } + n.notify[ch] = struct{}{} +} + +// Clear removes a channel from the notify group +func (n *NotifyGroup) Clear(ch chan struct{}) { + n.l.Lock() + defer n.l.Unlock() + if n.notify == nil { + return + } + delete(n.notify, ch) } // WaitCh allocates a channel that is subscribed to notifications diff --git a/consul/notify_test.go b/consul/notify_test.go index 4c3be55901..2133e9b312 100644 --- a/consul/notify_test.go +++ b/consul/notify_test.go @@ -54,3 +54,19 @@ func TestNotifyGroup(t *testing.T) { t.Fatalf("should not block") } } + +func TestNotifyGroup_Clear(t *testing.T) { + grp := &NotifyGroup{} + + ch1 := grp.WaitCh() + grp.Clear(ch1) + + grp.Notify() + + // Should not get anything + select { + case <-ch1: + t.Fatalf("should not get message") + default: + } +} From 1a50225911cfcd3a3c93c7e80db30e1b8da23f35 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 14 May 2015 17:33:02 -0700 Subject: [PATCH 2/7] consul: Adding methods to stop watching for changes --- consul/state_store.go | 19 +++++++++++++++++++ consul/state_store_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/consul/state_store.go b/consul/state_store.go index 074261a1b9..f008c78ea9 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -421,6 +421,13 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) { } } +// StopWatch is used to unsubscribe a channel to a set of MDBTables +func (s *StateStore) StopWatch(tables MDBTables, notify chan struct{}) { + for _, t := range tables { + s.watch[t].Clear(notify) + } +} + // WatchKV is used to subscribe a channel to changes in KV data func (s *StateStore) WatchKV(prefix string, notify chan struct{}) { s.kvWatchLock.Lock() @@ -439,6 +446,18 @@ func (s *StateStore) WatchKV(prefix string, notify chan struct{}) { s.kvWatch.Insert(prefix, grp) } +// StopWatchKV is used to unsubscribe a channel from changes in KV data +func (s *StateStore) StopWatchKV(prefix string, notify chan struct{}) { + s.kvWatchLock.Lock() + defer s.kvWatchLock.Unlock() + + // Check for an existing notify group + if raw, ok := s.kvWatch.Get(prefix); ok { + grp := raw.(*NotifyGroup) + grp.Clear(notify) + } +} + // notifyKV is used to notify any KV listeners of a change // on a prefix func (s *StateStore) notifyKV(path string, prefix bool) { diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 1c605c8756..30416eaa5d 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1429,6 +1429,32 @@ func TestKVSSet_Watch(t *testing.T) { } } +func TestKVSSet_Watch_Stop(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + notify1 := make(chan struct{}, 1) + + store.WatchKV("", notify1) + store.StopWatchKV("", notify1) + + // Create the entry + d := &structs.DirEntry{Key: "foo/baz", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Check that we've not fired notify1 + select { + case <-notify1: + t.Fatalf("should not notify ") + default: + } +} + func TestKVSSet_Get(t *testing.T) { store, err := testStateStore() if err != nil { From e62ff966a3e82f4442f5ce7c4afdc145fdd1ddbd Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 14 May 2015 17:38:42 -0700 Subject: [PATCH 3/7] consul: ensure blocking query cleans any lingering state --- consul/rpc.go | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/consul/rpc.go b/consul/rpc.go index 6359d1cf6f..d69f82e103 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -316,6 +316,7 @@ type blockingRPCOptions struct { func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { var timeout <-chan time.Time var notifyCh chan struct{} + var state *StateStore // Fast path non-blocking if opts.queryOpts.MinQueryIndex == 0 { @@ -327,30 +328,34 @@ func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { panic("no tables to block on") } - // Restrict the max query time + // Restrict the max query time, and ensure there is always one if opts.queryOpts.MaxQueryTime > maxQueryTime { opts.queryOpts.MaxQueryTime = maxQueryTime - } - - // Ensure a time limit is set if we have an index - if opts.queryOpts.MinQueryIndex > 0 && opts.queryOpts.MaxQueryTime == 0 { + } else if opts.queryOpts.MaxQueryTime <= 0 { opts.queryOpts.MaxQueryTime = maxQueryTime } // Setup a query timeout - if opts.queryOpts.MaxQueryTime > 0 { - timeout = time.After(opts.queryOpts.MaxQueryTime) - } + timeout = time.After(opts.queryOpts.MaxQueryTime) - // Setup a notification channel for changes -SETUP_NOTIFY: - if opts.queryOpts.MinQueryIndex > 0 { - notifyCh = make(chan struct{}, 1) - state := s.fsm.State() - state.Watch(opts.tables, notifyCh) + // Setup the notify channel + notifyCh = make(chan struct{}, 1) + + // Ensure we tear down any watchers on return + state = s.fsm.State() + defer func() { + state.StopWatch(opts.tables, notifyCh) if opts.kvWatch { - state.WatchKV(opts.kvPrefix, notifyCh) + state.StopWatchKV(opts.kvPrefix, notifyCh) } + }() + +REGISTER_NOTIFY: + // Register the notification channel. This may be done + // multiple times if we have not reached the target wait index. + state.Watch(opts.tables, notifyCh) + if opts.kvWatch { + state.WatchKV(opts.kvPrefix, notifyCh) } RUN_QUERY: @@ -372,7 +377,7 @@ RUN_QUERY: if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex { select { case <-notifyCh: - goto SETUP_NOTIFY + goto REGISTER_NOTIFY case <-timeout: } } From 926b8bc7fa455797f98a3d313e6375afb930fc7a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 14 May 2015 17:42:47 -0700 Subject: [PATCH 4/7] consul: proactively clear timers --- consul/rpc.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/consul/rpc.go b/consul/rpc.go index d69f82e103..c0f91cf42c 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -314,7 +314,7 @@ type blockingRPCOptions struct { // blockingRPCOpt is the replacement for blockingRPC as it allows // for more parameterization easily. It should be prefered over blockingRPC. func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { - var timeout <-chan time.Time + var timeout *time.Timer var notifyCh chan struct{} var state *StateStore @@ -336,7 +336,7 @@ func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { } // Setup a query timeout - timeout = time.After(opts.queryOpts.MaxQueryTime) + timeout = time.NewTimer(opts.queryOpts.MaxQueryTime) // Setup the notify channel notifyCh = make(chan struct{}, 1) @@ -344,6 +344,7 @@ func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { // Ensure we tear down any watchers on return state = s.fsm.State() defer func() { + timeout.Stop() state.StopWatch(opts.tables, notifyCh) if opts.kvWatch { state.StopWatchKV(opts.kvPrefix, notifyCh) @@ -378,7 +379,7 @@ RUN_QUERY: select { case <-notifyCh: goto REGISTER_NOTIFY - case <-timeout: + case <-timeout.C: } } return err From c51c88814284b8acb8b38f1284c1f387995612ec Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 14 May 2015 17:59:11 -0700 Subject: [PATCH 5/7] consul: adding randomStagger util method --- consul/util.go | 7 +++++++ consul/util_test.go | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/consul/util.go b/consul/util.go index d0ce256fe6..6cc7858995 100644 --- a/consul/util.go +++ b/consul/util.go @@ -4,12 +4,14 @@ import ( crand "crypto/rand" "encoding/binary" "fmt" + "math/rand" "net" "os" "path/filepath" "runtime" "strconv" "strings" + "time" "github.com/hashicorp/serf/serf" ) @@ -222,3 +224,8 @@ func generateUUID() string { buf[8:10], buf[10:16]) } + +// Returns a random stagger interval between 0 and the duration +func randomStagger(intv time.Duration) time.Duration { + return time.Duration(uint64(rand.Int63()) % uint64(intv)) +} diff --git a/consul/util_test.go b/consul/util_test.go index 24d9f7299b..5803e95a9b 100644 --- a/consul/util_test.go +++ b/consul/util_test.go @@ -4,6 +4,7 @@ import ( "net" "regexp" "testing" + "time" "github.com/hashicorp/serf/serf" ) @@ -124,3 +125,13 @@ func TestGenerateUUID(t *testing.T) { } } } + +func TestRandomStagger(t *testing.T) { + intv := time.Minute + for i := 0; i < 10; i++ { + stagger := randomStagger(intv) + if stagger < 0 || stagger >= intv { + t.Fatalf("Bad: %v", stagger) + } + } +} From ab3632e409398f2c5658545c53be0aba1523516b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 14 May 2015 17:59:43 -0700 Subject: [PATCH 6/7] consul: lower default query time and add small stagger --- consul/rpc.go | 14 +++++++++++++- website/source/docs/agent/http.html.markdown | 6 +++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/consul/rpc.go b/consul/rpc.go index c0f91cf42c..ae3fd67812 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -30,6 +30,15 @@ const ( // maxQueryTime is used to bound the limit of a blocking query maxQueryTime = 600 * time.Second + // defaultQueryTime is the amount of time we block waiting for a change + // if no time is specified. Previously we would wait the maxQueryTime. + defaultQueryTime = 300 * time.Second + + // jitterFraction is a the limit to the amount of jitter we apply + // to a user specified MaxQueryTime. We divide the specified time by + // the fraction. So 16 == 6.25% limit of jitter + jitterFraction = 16 + // Warn if the Raft command is larger than this. // If it's over 1MB something is probably being abusive. raftWarnSize = 1024 * 1024 @@ -332,9 +341,12 @@ func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { if opts.queryOpts.MaxQueryTime > maxQueryTime { opts.queryOpts.MaxQueryTime = maxQueryTime } else if opts.queryOpts.MaxQueryTime <= 0 { - opts.queryOpts.MaxQueryTime = maxQueryTime + opts.queryOpts.MaxQueryTime = defaultQueryTime } + // Apply a small amount of jitter to the request + opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction) + // Setup a query timeout timeout = time.NewTimer(opts.queryOpts.MaxQueryTime) diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index 7ada4d670f..9e1ae0b9be 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -39,9 +39,9 @@ query string parameter to the value of `X-Consul-Index`, indicating that the cli to wait for any changes subsequent to that index. In addition to `index`, endpoints that support blocking will also honor a `wait` -parameter specifying a maximum duration for the blocking request. If not set, it will -default to 10 minutes. This value can be specified in the form of "10s" or "5m" (i.e., -10 seconds or 5 minutes, respectively). +parameter specifying a maximum duration for the blocking request. This is limited to +10 minutes. If not set, the wait time defaults to 5 minutes. This value can be specified +in the form of "10s" or "5m" (i.e., 10 seconds or 5 minutes, respectively). A critical note is that the return of a blocking request is **no guarantee** of a change. It is possible that the timeout was reached or that there was an idempotent write that does From cee5b53423291ee029c9ab40424d56f8bc8965b7 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 14 May 2015 18:32:19 -0700 Subject: [PATCH 7/7] consul: adding StopWatch test --- consul/state_store_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 30416eaa5d..97592c11d5 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -127,6 +127,37 @@ func TestGetNodes(t *testing.T) { } } +func TestGetNodes_Watch_StopWatch(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + notify1 := make(chan struct{}, 1) + notify2 := make(chan struct{}, 1) + + store.Watch(store.QueryTables("Nodes"), notify1) + store.Watch(store.QueryTables("Nodes"), notify2) + store.StopWatch(store.QueryTables("Nodes"), notify2) + + if err := store.EnsureNode(40, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + + select { + case <-notify1: + default: + t.Fatalf("should be notified") + } + + select { + case <-notify2: + t.Fatalf("should not be notified") + default: + } +} + func BenchmarkGetNodes(b *testing.B) { store, err := testStateStore() if err != nil {