diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index d8f0fbd4df..5c35e3f338 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -510,7 +510,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps { logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ Name: c.NodeName, - Level: hclog.Trace, + Level: testutil.TestLogLevel, Output: testutil.NewLogBuffer(t), }) diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index c0f69b1686..41736e3d09 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -8,7 +8,6 @@ import ( "fmt" "net" - "github.com/hashicorp/consul/agent/rpc/peering" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" @@ -17,6 +16,7 @@ import ( "google.golang.org/grpc/credentials" "github.com/hashicorp/consul/agent/pool" + "github.com/hashicorp/consul/agent/rpc/peering" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" ) @@ -50,6 +50,39 @@ func (s *Server) stopPeeringStreamSync() { // syncPeeringsAndBlock is a long-running goroutine that is responsible for watching // changes to peerings in the state store and managing streams to those peers. func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, cancelFns map[string]context.CancelFunc) error { + // We have to be careful not to introduce a data race here. We want to + // compare the current known peerings in the state store with known + // connected streams to know when we should TERMINATE stray peerings. + // + // If you read the current peerings from the state store, then read the + // current established streams you could lose the data race and have the + // sequence of events be: + // + // 1. list peerings [A,B,C] + // 2. persist new peering [D] + // 3. accept new stream for [D] + // 4. list streams [A,B,C,D] + // 5. terminate [D] + // + // Which is wrong. If we instead ensure that (4) happens before (1), given + // that you can't get an established stream without first passing a "does + // this peering exist in the state store?" inquiry then this happens: + // + // 1. list streams [A,B,C] + // 2. list peerings [A,B,C] + // 3. persist new peering [D] + // 4. accept new stream for [D] + // 5. terminate [] + // + // Or even this is fine: + // + // 1. list streams [A,B,C] + // 2. persist new peering [D] + // 3. accept new stream for [D] + // 4. list peerings [A,B,C,D] + // 5. terminate [] + connectedStreams := s.peeringService.ConnectedStreams() + state := s.fsm.State() // Pull the state store contents and set up to block for changes. @@ -121,7 +154,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, // Clean up active streams of peerings that were deleted from the state store. // TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK? - for stream, doneCh := range s.peeringService.ConnectedStreams() { + for stream, doneCh := range connectedStreams { if _, ok := stored[stream]; ok { // Active stream is in the state store, nothing to do. continue diff --git a/proto/pbpeering/peering.go b/proto/pbpeering/peering.go index 4c42650211..e52affe4ab 100644 --- a/proto/pbpeering/peering.go +++ b/proto/pbpeering/peering.go @@ -93,6 +93,10 @@ func (x ReplicationMessage_Response_Operation) GoString() string { return x.String() } +func (x PeeringState) GoString() string { + return x.String() +} + func (r *TrustBundleReadRequest) CacheInfo() cache.RequestInfo { info := cache.RequestInfo{ // TODO(peering): Revisit whether this is the token to use once request types accept a token.