Avoid potential proxycfg/xDS deadlock using non-blocking send

This commit is contained in:
Freddy 2021-02-08 16:14:06 -07:00 committed by hashicorp-ci
parent 556b8bd1c2
commit c18a218bbb
4 changed files with 51 additions and 11 deletions

3
.changelog/9689.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
proxycfg: avoid potential deadlock in delivering proxy snapshot to watchers.
```

View File

@ -596,10 +596,11 @@ func (s *state) run() {
case <-s.ctx.Done():
return
case u := <-s.ch:
s.logger.Trace("A blocking query returned; handling snapshot update")
if err := s.handleUpdate(u, &snap); err != nil {
s.logger.Error("watch error",
"id", u.CorrelationID,
"error", err,
s.logger.Error("Failed to handle update from watch",
"id", u.CorrelationID, "error", err,
)
continue
}
@ -610,12 +611,16 @@ func (s *state) run() {
snapCopy, err := snap.Clone()
if err != nil {
s.logger.Error("Failed to copy config snapshot for proxy",
"proxy", s.proxyID,
"error", err,
)
continue
}
s.snapCh <- *snapCopy
select {
// Try to send
case s.snapCh <- *snapCopy:
s.logger.Trace("Delivered new snapshot to proxy config watchers")
// Allow the next change to trigger a send
coalesceTimer = nil
@ -623,10 +628,30 @@ func (s *state) run() {
// this iteration
continue
// Avoid blocking if a snapshot is already buffered in snapCh as this can result in a deadlock.
// See PR #9689 for more details.
default:
s.logger.Trace("Failed to deliver new snapshot to proxy config watchers")
// Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly.
if coalesceTimer == nil {
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
sendCh <- struct{}{}
})
}
// Do not reset coalesceTimer since we just queued a timer-based refresh
continue
}
case replyCh := <-s.reqCh:
s.logger.Trace("A proxy config snapshot was requested")
if !snap.Valid() {
// Not valid yet just respond with nil and move on to next task.
replyCh <- nil
s.logger.Trace("The proxy's config snapshot is not valid yet")
continue
}
// Make a deep copy of snap so we don't mutate any of the embedded structs
@ -634,7 +659,6 @@ func (s *state) run() {
snapCopy, err := snap.Clone()
if err != nil {
s.logger.Error("Failed to copy config snapshot for proxy",
"proxy", s.proxyID,
"error", err,
)
continue

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/hashicorp/consul/logging"
"sync/atomic"
"time"
@ -186,6 +187,8 @@ const (
)
func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) error {
logger := s.Logger.Named(logging.XDS)
// xDS requires a unique nonce to correlate response/request pairs
var nonce uint64
@ -342,6 +345,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
// state machine.
defer watchCancel()
logger.Trace("watching proxy, pending initial proxycfg snapshot",
"service_id", proxyID.String())
// Now wait for the config so we can check ACL
state = statePendingInitialConfig
case statePendingInitialConfig:
@ -353,6 +359,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
// Got config, try to authenticate next.
state = stateRunning
logger.Trace("Got initial config snapshot",
"service_id", cfgSnap.ProxyID.String())
// Lets actually process the config we just got or we'll mis responding
fallthrough
case stateRunning:
@ -364,6 +373,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
// timer is first started.
extendAuthTimer()
logger.Trace("Invoking all xDS resource handlers and sending new data if there is any",
"service_id", cfgSnap.ProxyID.String())
// See if any handlers need to have the current (possibly new) config
// sent. Note the order here is actually significant so we can't just
// range the map which has no determined order. It's important because:

View File

@ -53,5 +53,6 @@ const (
Transaction string = "txn"
WAN string = "wan"
Watch string = "watch"
XDS string = "xds"
Vault string = "vault"
)