From c18a218bbbe731206a6e4d0e206b7d6683e184ad Mon Sep 17 00:00:00 2001 From: Freddy Date: Mon, 8 Feb 2021 16:14:06 -0700 Subject: [PATCH] Avoid potential proxycfg/xDS deadlock using non-blocking send --- .changelog/9689.txt | 3 +++ agent/proxycfg/state.go | 46 +++++++++++++++++++++++++++++++---------- agent/xds/server.go | 12 +++++++++++ logging/names.go | 1 + 4 files changed, 51 insertions(+), 11 deletions(-) create mode 100644 .changelog/9689.txt diff --git a/.changelog/9689.txt b/.changelog/9689.txt new file mode 100644 index 0000000000..85f78ac90e --- /dev/null +++ b/.changelog/9689.txt @@ -0,0 +1,3 @@ +```release-note:bug +proxycfg: avoid potential deadlock in delivering proxy snapshot to watchers. +``` \ No newline at end of file diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index e8323432fe..6051686e86 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -596,10 +596,11 @@ func (s *state) run() { case <-s.ctx.Done(): return case u := <-s.ch: + s.logger.Trace("A blocking query returned; handling snapshot update") + if err := s.handleUpdate(u, &snap); err != nil { - s.logger.Error("watch error", - "id", u.CorrelationID, - "error", err, + s.logger.Error("Failed to handle update from watch", + "id", u.CorrelationID, "error", err, ) continue } @@ -610,23 +611,47 @@ func (s *state) run() { snapCopy, err := snap.Clone() if err != nil { s.logger.Error("Failed to copy config snapshot for proxy", - "proxy", s.proxyID, "error", err, ) continue } - s.snapCh <- *snapCopy - // Allow the next change to trigger a send - coalesceTimer = nil - // Skip rest of loop - there is nothing to send since nothing changed on - // this iteration - continue + select { + // Try to send + case s.snapCh <- *snapCopy: + s.logger.Trace("Delivered new snapshot to proxy config watchers") + + // Allow the next change to trigger a send + coalesceTimer = nil + + // Skip rest of loop - there is nothing to send since nothing changed on + // this iteration + continue + + // Avoid blocking if a snapshot is already buffered in snapCh as this can result in a deadlock. + // See PR #9689 for more details. + default: + s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") + + // Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly. + if coalesceTimer == nil { + coalesceTimer = time.AfterFunc(coalesceTimeout, func() { + sendCh <- struct{}{} + }) + } + + // Do not reset coalesceTimer since we just queued a timer-based refresh + continue + } case replyCh := <-s.reqCh: + s.logger.Trace("A proxy config snapshot was requested") + if !snap.Valid() { // Not valid yet just respond with nil and move on to next task. replyCh <- nil + + s.logger.Trace("The proxy's config snapshot is not valid yet") continue } // Make a deep copy of snap so we don't mutate any of the embedded structs @@ -634,7 +659,6 @@ func (s *state) run() { snapCopy, err := snap.Clone() if err != nil { s.logger.Error("Failed to copy config snapshot for proxy", - "proxy", s.proxyID, "error", err, ) continue diff --git a/agent/xds/server.go b/agent/xds/server.go index 345080db9b..d92c3891cf 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/hashicorp/consul/logging" "sync/atomic" "time" @@ -186,6 +187,8 @@ const ( ) func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) error { + logger := s.Logger.Named(logging.XDS) + // xDS requires a unique nonce to correlate response/request pairs var nonce uint64 @@ -342,6 +345,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) // state machine. defer watchCancel() + logger.Trace("watching proxy, pending initial proxycfg snapshot", + "service_id", proxyID.String()) + // Now wait for the config so we can check ACL state = statePendingInitialConfig case statePendingInitialConfig: @@ -353,6 +359,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) // Got config, try to authenticate next. state = stateRunning + logger.Trace("Got initial config snapshot", + "service_id", cfgSnap.ProxyID.String()) + // Lets actually process the config we just got or we'll mis responding fallthrough case stateRunning: @@ -364,6 +373,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) // timer is first started. extendAuthTimer() + logger.Trace("Invoking all xDS resource handlers and sending new data if there is any", + "service_id", cfgSnap.ProxyID.String()) + // See if any handlers need to have the current (possibly new) config // sent. Note the order here is actually significant so we can't just // range the map which has no determined order. It's important because: diff --git a/logging/names.go b/logging/names.go index fcbdc23534..0826ee982d 100644 --- a/logging/names.go +++ b/logging/names.go @@ -53,5 +53,6 @@ const ( Transaction string = "txn" WAN string = "wan" Watch string = "watch" + XDS string = "xds" Vault string = "vault" )