Fixup stream tear-down steps.

1. Fix a bug where the peering leader routine would not track all active
   peerings in the "stored" reconciliation map. This could lead to
   tearing down streams where the token was generated, since the
   ConnectedStreams() method used for reconciliation returns all streams
   and not just the ones initiated by this leader routine.

2. Fix a race where stream contexts were being canceled before
   termination messages were being processed by a peer.

   Previously the leader routine would tear down streams by canceling
   their context right after the termination message was sent. This
   context cancelation could be propagated to the server side faster
   than the termination message. Now there is a change where the
   dialing peer uses CloseSend() to signal when no more messages will
   be sent. Eventually the server peer will read an EOF after receiving
   and processing the preceding termination message.

   Using CloseSend() is actually not enough to address the issue
   mentioned, since it doesn't wait for the server peer to finish
   processing messages. Because of this now the dialing peer also reads
   from the stream until an error signals that there are no more
   messages. Receiving an EOF from our peer indicates that they
   processed the termination message and have no additional work to do.

   Given that the stream is being closed, all the messages received by
   Recv are discarded. We only check for errors to avoid importing new
   data.
This commit is contained in:
freddygv 2022-06-13 08:22:46 -06:00
parent cc921a9c78
commit 6c8ab1bbac
7 changed files with 86 additions and 45 deletions

View File

@ -114,13 +114,19 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
for _, peer := range peers {
logger.Trace("evaluating stored peer", "peer", peer.Name, "should_dial", peer.ShouldDial(), "sequence_id", seq)
if !peer.ShouldDial() {
if !peer.IsActive() {
// The peering is marked for deletion, no need to dial or track them.
continue
}
// TODO(peering) Account for deleted peers that are still in the state store
// Track all active peerings,since the reconciliation loop below applies to the token generator as well.
stored[peer.ID] = struct{}{}
if !peer.ShouldDial() {
// We do not need to dial peerings where we generated the peering token.
continue
}
status, found := s.peeringService.StreamStatus(peer.ID)
// TODO(peering): If there is new peering data and a connected stream, should we tear down the stream?
@ -179,6 +185,8 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
}
func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer *pbpeering.Peering, cancelFns map[string]context.CancelFunc) error {
logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID)
tlsOption := grpc.WithInsecure()
if len(peer.PeerCAPems) > 0 {
var haveCerts bool
@ -208,7 +216,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer
buffer = buffer.Next()
}
logger.Trace("establishing stream to peer", "peer_id", peer.ID)
logger.Trace("establishing stream to peer")
retryCtx, cancel := context.WithCancel(ctx)
cancelFns[peer.ID] = cancel
@ -224,7 +232,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer
return fmt.Errorf("peer server address type %T is not a string", buffer.Value)
}
logger.Trace("dialing peer", "peer_id", peer.ID, "addr", addr)
logger.Trace("dialing peer", "addr", addr)
conn, err := grpc.DialContext(retryCtx, addr,
grpc.WithContextDialer(newPeerDialer(addr)),
grpc.WithBlock(),
@ -241,16 +249,23 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer
return err
}
err = s.peeringService.HandleStream(peering.HandleStreamRequest{
streamReq := peering.HandleStreamRequest{
LocalID: peer.ID,
RemoteID: peer.PeerID,
PeerName: peer.Name,
Partition: peer.Partition,
Stream: stream,
})
}
err = s.peeringService.HandleStream(streamReq)
// A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown.
if err == nil {
stream.CloseSend()
s.peeringService.DrainStream(streamReq)
// This will cancel the retry-er context, letting us break out of this loop when we want to shut down the stream.
cancel()
logger.Info("closed outbound stream")
}
return err

View File

@ -88,10 +88,12 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
require.True(r, status.Connected)
})
// Delete the peering to trigger the termination sequence
require.NoError(t, s2.fsm.State().PeeringDelete(2000, state.Query{
Value: "my-peer-s1",
}))
// Delete the peering to trigger the termination sequence.
deleted := &pbpeering.Peering{
Name: "my-peer-s1",
DeletedAt: structs.TimeToProto(time.Now()),
}
require.NoError(t, s2.fsm.State().PeeringWrite(2000, deleted))
s2.logger.Trace("deleted peering for my-peer-s1")
retry.Run(t, func(r *retry.R) {
@ -175,10 +177,12 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
require.True(r, status.Connected)
})
// Delete the peering from the server peer to trigger the termination sequence
require.NoError(t, s1.fsm.State().PeeringDelete(2000, state.Query{
Value: "my-peer-s2",
}))
// Delete the peering from the server peer to trigger the termination sequence.
deleted := &pbpeering.Peering{
Name: "my-peer-s2",
DeletedAt: structs.TimeToProto(time.Now()),
}
require.NoError(t, s1.fsm.State().PeeringWrite(2000, deleted))
s2.logger.Trace("deleted peering for my-peer-s1")
retry.Run(t, func(r *retry.R) {
@ -186,7 +190,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
require.False(r, found)
})
// s2 should have received the termination message and updated the peering state
// s2 should have received the termination message and updated the peering state.
retry.Run(t, func(r *retry.R) {
_, peering, err := s2.fsm.State().PeeringRead(nil, state.Query{
Value: "my-peer-s1",

View File

@ -17,8 +17,6 @@ import (
const (
tablePeering = "peering"
tablePeeringTrustBundles = "peering-trust-bundles"
indexDeleted = "deleted"
)
func peeringTableSchema() *memdb.TableSchema {

View File

@ -1051,8 +1051,7 @@ func TestStateStore_PeeringsForService(t *testing.T) {
},
{
peering: &pbpeering.Peering{
Name: "peer2",
State: pbpeering.PeeringState_TERMINATED,
Name: "peer2",
},
delete: true,
},

View File

@ -64,7 +64,10 @@ type IndexEntry struct {
Value uint64
}
const tableIndex = "index"
const (
tableIndex = "index"
indexDeleted = "deleted"
)
// indexTableSchema returns a new table schema used for tracking various the
// latest raft index for a table or entities within a table.

View File

@ -556,17 +556,24 @@ func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResource
// TODO(peering): If the peering is marked as deleted, send a Terminated message and return
// TODO(peering): Store subscription request so that an event publisher can separately handle pushing messages for it
s.logger.Info("accepted initial replication request from peer", "peer_id", req.PeerID)
s.logger.Info("accepted initial replication request from peer", "peer_id", p.ID)
// For server peers both of these ID values are the same, because we generated a token with a local ID,
// and the client peer dials using that same ID.
return s.HandleStream(HandleStreamRequest{
streamReq := HandleStreamRequest{
LocalID: p.ID,
RemoteID: p.PeerID,
PeerName: p.Name,
Partition: p.Partition,
Stream: stream,
})
}
err = s.HandleStream(streamReq)
// A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown.
if err == nil {
s.DrainStream(streamReq)
return nil
}
s.logger.Error("error handling stream", "peer_name", p.Name, "peer_id", req.PeerID, "error", err)
return err
}
type HandleStreamRequest struct {
@ -586,10 +593,28 @@ type HandleStreamRequest struct {
Stream BidirectionalStream
}
// DrainStream attempts to gracefully drain the stream when the connection is going to be torn down.
// Tearing down the connection too quickly can lead our peer receiving a context cancellation error before the stream termination message.
// Handling the termination message is important to set the expectation that the peering will not be reestablished unless recreated.
func (s *Service) DrainStream(req HandleStreamRequest) {
for {
// Ensure that we read until an error, or the peer has nothing more to send.
if _, err := req.Stream.Recv(); err != nil {
if err != io.EOF {
s.logger.Warn("failed to tear down stream gracefully: peer may not have received termination message",
"peer_name", req.PeerName, "peer_id", req.LocalID, "error", err)
}
break
}
// Since the peering is being torn down we discard all replication messages without an error.
// We want to avoid importing new data at this point.
}
}
// The localID provided is the locally-generated identifier for the peering.
// The remoteID is an identifier that the remote peer recognizes for the peering.
func (s *Service) HandleStream(req HandleStreamRequest) error {
logger := s.logger.Named("stream").With("peer_id", req.LocalID)
logger := s.logger.Named("stream").With("peer_name", req.PeerName, "peer_id", req.LocalID)
logger.Trace("handling stream for peer")
status, err := s.streams.connected(req.LocalID)
@ -646,25 +671,20 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
defer close(recvChan)
for {
msg, err := req.Stream.Recv()
if err == nil {
logTraceRecv(logger, msg)
recvChan <- msg
continue
}
if err == io.EOF {
logger.Info("stream ended by peer")
status.trackReceiveError(err.Error())
return
}
if e, ok := grpcstatus.FromError(err); ok {
// Cancelling the stream is not an error, that means we or our peer intended to terminate the peering.
if e.Code() == codes.Canceled {
return
}
}
if err != nil {
logger.Error("failed to receive from stream", "error", err)
status.trackReceiveError(err.Error())
return
}
logTraceRecv(logger, msg)
recvChan <- msg
logger.Error("failed to receive from stream", "error", err)
status.trackReceiveError(err.Error())
return
}
}()
@ -693,13 +713,12 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
case msg, open := <-recvChan:
if !open {
// No longer receiving data on the stream.
logger.Trace("no longer receiving data on the stream")
return nil
}
if !s.Backend.IsLeader() {
// we are not the leader anymore so we will hang up on the dialer
logger.Error("node is not a leader anymore; cannot continue streaming")
st, err := grpcstatus.New(codes.FailedPrecondition,
@ -750,11 +769,11 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
}
if term := msg.GetTerminated(); term != nil {
logger.Info("received peering termination message, cleaning up imported resources")
logger.Info("peering was deleted by our peer: marking peering as terminated and cleaning up imported resources")
// Once marked as terminated, a separate deferred deletion routine will clean up imported resources.
if err := s.Backend.Apply().PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: req.LocalID}); err != nil {
return err
logger.Error("failed to mark peering as terminated: %w", err)
}
return nil
}

View File

@ -88,7 +88,7 @@ func (msg *EstablishRequest) Timeout(rpcHoldTimeout time.Duration, maxQueryTime
// If we generated a token for this peer we did not store our server addresses under PeerServerAddresses.
// These server addresses are for dialing, and only the peer initiating the peering will do the dialing.
func (p *Peering) ShouldDial() bool {
return len(p.PeerServerAddresses) > 0 && p.State != PeeringState_TERMINATED
return len(p.PeerServerAddresses) > 0
}
func (x ReplicationMessage_Response_Operation) GoString() string {
@ -178,6 +178,9 @@ func PeeringStateFromAPI(t api.PeeringState) PeeringState {
}
func (p *Peering) IsActive() bool {
if p != nil && p.State == PeeringState_TERMINATED {
return false
}
if p == nil || p.DeletedAt == nil {
return true
}