mirror of https://github.com/status-im/consul.git
peering: avoid a race between peering establishment and termination (#13389)
This commit is contained in:
parent
7393374fc0
commit
edb2e55335
|
@ -510,7 +510,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
|
|||
|
||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||
Name: c.NodeName,
|
||||
Level: hclog.Trace,
|
||||
Level: testutil.TestLogLevel,
|
||||
Output: testutil.NewLogBuffer(t),
|
||||
})
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/hashicorp/consul/agent/rpc/peering"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
|
@ -17,6 +16,7 @@ import (
|
|||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/rpc/peering"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
)
|
||||
|
@ -50,6 +50,39 @@ func (s *Server) stopPeeringStreamSync() {
|
|||
// syncPeeringsAndBlock is a long-running goroutine that is responsible for watching
|
||||
// changes to peerings in the state store and managing streams to those peers.
|
||||
func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, cancelFns map[string]context.CancelFunc) error {
|
||||
// We have to be careful not to introduce a data race here. We want to
|
||||
// compare the current known peerings in the state store with known
|
||||
// connected streams to know when we should TERMINATE stray peerings.
|
||||
//
|
||||
// If you read the current peerings from the state store, then read the
|
||||
// current established streams you could lose the data race and have the
|
||||
// sequence of events be:
|
||||
//
|
||||
// 1. list peerings [A,B,C]
|
||||
// 2. persist new peering [D]
|
||||
// 3. accept new stream for [D]
|
||||
// 4. list streams [A,B,C,D]
|
||||
// 5. terminate [D]
|
||||
//
|
||||
// Which is wrong. If we instead ensure that (4) happens before (1), given
|
||||
// that you can't get an established stream without first passing a "does
|
||||
// this peering exist in the state store?" inquiry then this happens:
|
||||
//
|
||||
// 1. list streams [A,B,C]
|
||||
// 2. list peerings [A,B,C]
|
||||
// 3. persist new peering [D]
|
||||
// 4. accept new stream for [D]
|
||||
// 5. terminate []
|
||||
//
|
||||
// Or even this is fine:
|
||||
//
|
||||
// 1. list streams [A,B,C]
|
||||
// 2. persist new peering [D]
|
||||
// 3. accept new stream for [D]
|
||||
// 4. list peerings [A,B,C,D]
|
||||
// 5. terminate []
|
||||
connectedStreams := s.peeringService.ConnectedStreams()
|
||||
|
||||
state := s.fsm.State()
|
||||
|
||||
// Pull the state store contents and set up to block for changes.
|
||||
|
@ -121,7 +154,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
|
|||
|
||||
// Clean up active streams of peerings that were deleted from the state store.
|
||||
// TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK?
|
||||
for stream, doneCh := range s.peeringService.ConnectedStreams() {
|
||||
for stream, doneCh := range connectedStreams {
|
||||
if _, ok := stored[stream]; ok {
|
||||
// Active stream is in the state store, nothing to do.
|
||||
continue
|
||||
|
|
|
@ -93,6 +93,10 @@ func (x ReplicationMessage_Response_Operation) GoString() string {
|
|||
return x.String()
|
||||
}
|
||||
|
||||
func (x PeeringState) GoString() string {
|
||||
return x.String()
|
||||
}
|
||||
|
||||
func (r *TrustBundleReadRequest) CacheInfo() cache.RequestInfo {
|
||||
info := cache.RequestInfo{
|
||||
// TODO(peering): Revisit whether this is the token to use once request types accept a token.
|
||||
|
|
Loading…
Reference in New Issue