diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 3288a141a1..954daddce4 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "math" "time" "github.com/armon/go-metrics" @@ -16,8 +17,10 @@ import ( "github.com/hashicorp/go-uuid" "golang.org/x/time/rate" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + grpcstatus "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -38,6 +41,15 @@ var LeaderPeeringMetrics = []prometheus.GaugeDefinition{ "We emit this metric every 9 seconds", }, } +var ( + // fastConnRetryTimeout is how long we wait between retrying connections following the "fast" path + // which is triggered on specific connection errors. + fastConnRetryTimeout = 8 * time.Millisecond + // maxFastConnRetries is the maximum number of fast connection retries before we follow exponential backoff. + maxFastConnRetries = uint(5) + // maxFastRetryBackoff is the maximum amount of time we'll wait between retries following the fast path. + maxFastRetryBackoff = 8192 * time.Millisecond +) func (s *Server) startPeeringStreamSync(ctx context.Context) { s.leaderRoutineManager.Start(ctx, peeringStreamsRoutineName, s.runPeeringSync) @@ -300,7 +312,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer } // Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes. - go retryLoopBackoff(retryCtx, func() error { + go retryLoopBackoffPeering(retryCtx, logger, func() error { // Try a new address on each iteration by advancing the ring buffer on errors. defer func() { buffer = buffer.Next() @@ -349,18 +361,21 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer if err == nil { stream.CloseSend() s.peerStreamServer.DrainStream(streamReq) - - // This will cancel the retry-er context, letting us break out of this loop when we want to shut down the stream. cancel() - logger.Info("closed outbound stream") } return err }, func(err error) { + // TODO(peering): why are we using TrackSendError here? This could also be a receive error. streamStatus.TrackSendError(err.Error()) - logger.Error("error managing peering stream", "peer_id", peer.ID, "error", err) - }) + if isFailedPreconditionErr(err) { + logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting", + "error", err) + return + } + logger.Error("error managing peering stream", "error", err) + }, peeringRetryTimeout) return nil } @@ -517,3 +532,84 @@ func (s *Server) deleteTrustBundleFromPeer(ctx context.Context, limiter *rate.Li _, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, req) return err } + +// retryLoopBackoffPeering re-runs loopFn with a backoff on error. errFn is run whenever +// loopFn returns an error. retryTimeFn is used to calculate the time between retries on error. +// It is passed the number of errors in a row that loopFn has returned and the latest error +// from loopFn. +// +// This function is modelled off of retryLoopBackoffHandleSuccess but is specific to peering +// because peering needs to use different retry times depending on which error is returned. +// This function doesn't use a rate limiter, unlike retryLoopBackoffHandleSuccess, because +// the rate limiter is only needed in the success case when loopFn returns nil and we want to +// loop again. In the peering case, we exit on a successful loop so we don't need the limter. +func retryLoopBackoffPeering(ctx context.Context, logger hclog.Logger, loopFn func() error, errFn func(error), + retryTimeFn func(failedAttempts uint, loopErr error) time.Duration) { + var failedAttempts uint + var err error + for { + if err = loopFn(); err != nil { + errFn(err) + + if failedAttempts < math.MaxUint { + failedAttempts++ + } + + retryTime := retryTimeFn(failedAttempts, err) + logger.Trace("in connection retry backoff", "delay", retryTime) + timer := time.NewTimer(retryTime) + + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + continue + } + return + } +} + +// peeringRetryTimeout returns the time that should be waited between re-establishing a peering +// connection after an error. We follow the default backoff from retryLoopBackoff +// unless the error is a "failed precondition" error in which case we retry much more quickly. +// Retrying quickly is important in the case of a failed precondition error because we expect it to resolve +// quickly. For example in the case of connecting with a follower through a load balancer, we just need to retry +// until our request lands on a leader. +func peeringRetryTimeout(failedAttempts uint, loopErr error) time.Duration { + if loopErr != nil && isFailedPreconditionErr(loopErr) { + // Wait a constant time for the first number of retries. + if failedAttempts <= maxFastConnRetries { + return fastConnRetryTimeout + } + // From here, follow an exponential backoff maxing out at maxFastRetryBackoff. + // The below equation multiples the constantRetryTimeout by 2^n where n is the number of failed attempts + // we're on, starting at 1 now that we're past our maxFastConnRetries. + // For example if fastConnRetryTimeout == 8ms and maxFastConnRetries == 5, then at 6 failed retries + // we'll do 8ms * 2^1 = 16ms, then 8ms * 2^2 = 32ms, etc. + ms := fastConnRetryTimeout * (1 << (failedAttempts - maxFastConnRetries)) + if ms > maxFastRetryBackoff { + return maxFastRetryBackoff + } + return ms + } + + // Else we go with the default backoff from retryLoopBackoff. + if (1 << failedAttempts) < maxRetryBackoff { + return (1 << failedAttempts) * time.Second + } + return time.Duration(maxRetryBackoff) * time.Second +} + +// isFailedPreconditionErr returns true if err is a gRPC error with code FailedPrecondition. +func isFailedPreconditionErr(err error) bool { + if err == nil { + return false + } + grpcErr, ok := grpcstatus.FromError(err) + if !ok { + return false + } + return grpcErr.Code() == codes.FailedPrecondition +} diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index feaf5be027..8a0461c38a 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -4,14 +4,18 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" "io/ioutil" "testing" "time" "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -1122,3 +1126,150 @@ func TestLeader_Peering_NoEstablishmentWhenPeeringDisabled(t *testing.T) { return found }, 7*time.Second, 1*time.Second, "peering should not have been established") } + +// Test peeringRetryTimeout when the errors are FailedPrecondition errors because these +// errors have a different backoff. +func TestLeader_Peering_peeringRetryTimeout_failedPreconditionErrors(t *testing.T) { + cases := []struct { + failedAttempts uint + expDuration time.Duration + }{ + // Constant time backoff. + {0, 8 * time.Millisecond}, + {1, 8 * time.Millisecond}, + {2, 8 * time.Millisecond}, + {3, 8 * time.Millisecond}, + {4, 8 * time.Millisecond}, + {5, 8 * time.Millisecond}, + // Then exponential. + {6, 16 * time.Millisecond}, + {7, 32 * time.Millisecond}, + {13, 2048 * time.Millisecond}, + {14, 4096 * time.Millisecond}, + {15, 8192 * time.Millisecond}, + // Max. + {16, 8192 * time.Millisecond}, + {17, 8192 * time.Millisecond}, + } + + for _, c := range cases { + t.Run(fmt.Sprintf("failed attempts %d", c.failedAttempts), func(t *testing.T) { + err := grpcstatus.Error(codes.FailedPrecondition, "msg") + require.Equal(t, c.expDuration, peeringRetryTimeout(c.failedAttempts, err)) + }) + } +} + +// Test peeringRetryTimeout with non-FailedPrecondition errors because these errors have a different +// backoff from FailedPrecondition errors. +func TestLeader_Peering_peeringRetryTimeout_regularErrors(t *testing.T) { + cases := []struct { + failedAttempts uint + expDuration time.Duration + }{ + // Exponential. + {0, 1 * time.Second}, + {1, 2 * time.Second}, + {2, 4 * time.Second}, + {3, 8 * time.Second}, + // Until max. + {8, 256 * time.Second}, + {9, 256 * time.Second}, + {10, 256 * time.Second}, + } + + for _, c := range cases { + t.Run(fmt.Sprintf("failed attempts %d", c.failedAttempts), func(t *testing.T) { + err := errors.New("error") + require.Equal(t, c.expDuration, peeringRetryTimeout(c.failedAttempts, err)) + }) + } +} + +// This test exercises all the functionality of retryLoopBackoffPeering. +func TestLeader_Peering_retryLoopBackoffPeering(t *testing.T) { + ctx := context.Background() + logger := hclog.NewNullLogger() + + // loopCount counts how many times we executed loopFn. + loopCount := 0 + // loopTimes holds the times at which each loopFn was executed. We use this to test the timeout functionality. + var loopTimes []time.Time + // loopFn will run 5 times and do something different on each loop. + loopFn := func() error { + loopCount++ + loopTimes = append(loopTimes, time.Now()) + if loopCount == 1 { + return fmt.Errorf("error 1") + } + if loopCount == 2 { + return fmt.Errorf("error 2") + } + if loopCount == 3 { + // On the 3rd loop, return success which ends the loop. + return nil + } + return nil + } + // allErrors collects all the errors passed into errFn. + var allErrors []error + errFn := func(e error) { + allErrors = append(allErrors, e) + } + retryTimeFn := func(_ uint, _ error) time.Duration { + return 1 * time.Millisecond + } + + retryLoopBackoffPeering(ctx, logger, loopFn, errFn, retryTimeFn) + + // Ensure loopFn ran the number of expected times. + require.Equal(t, 3, loopCount) + // Ensure errFn ran as expected. + require.Equal(t, []error{ + fmt.Errorf("error 1"), + fmt.Errorf("error 2"), + }, allErrors) + + // Test retryTimeFn by comparing the difference between when each loopFn ran. + for i := range loopTimes { + if i == 0 { + // Can't compare first time. + continue + } + require.True(t, loopTimes[i].Sub(loopTimes[i-1]) >= 1*time.Millisecond, + "time between indices %d and %d was > 1ms", i, i-1) + } +} + +// Test that if the context is cancelled the loop exits. +func TestLeader_Peering_retryLoopBackoffPeering_cancelContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + logger := hclog.NewNullLogger() + + // loopCount counts how many times we executed loopFn. + loopCount := 0 + loopFn := func() error { + loopCount++ + return fmt.Errorf("error %d", loopCount) + } + // allErrors collects all the errors passed into errFn. + var allErrors []error + errFn := func(e error) { + allErrors = append(allErrors, e) + } + // Set the retry time to a huge number. + retryTimeFn := func(_ uint, _ error) time.Duration { + return 1 * time.Millisecond + } + + // Cancel the context before the loop runs. It should run once and then exit. + cancel() + retryLoopBackoffPeering(ctx, logger, loopFn, errFn, retryTimeFn) + + // Ensure loopFn ran the number of expected times. + require.Equal(t, 1, loopCount) + // Ensure errFn ran as expected. + require.Equal(t, []error{ + fmt.Errorf("error 1"), + }, allErrors) +} diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 1feb7f01d6..2079560e7b 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -37,9 +37,8 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes // handling code in HandleStream() if !s.Backend.IsLeader() { - // we are not the leader so we will hang up on the dialer - - logger.Error("cannot establish a peering stream on a follower node") + // We are not the leader so we will hang up on the dialer. + logger.Debug("cannot establish a peering stream on a follower node") st, err := grpcstatus.New(codes.FailedPrecondition, "cannot establish a peering stream on a follower node").WithDetails( @@ -180,6 +179,10 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { With("dialed", streamReq.WasDialed()) logger.Trace("handling stream for peer") + // handleStreamCtx is local to this function. + handleStreamCtx, cancel := context.WithCancel(streamReq.Stream.Context()) + defer cancel() + status, err := s.Tracker.Connected(streamReq.LocalID) if err != nil { return fmt.Errorf("failed to register stream: %v", err) @@ -242,58 +245,53 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { PeerID: streamReq.RemoteID, }) if err := streamSend(sub); err != nil { - if err == io.EOF { - logger.Info("stream ended by peer") - return nil - } // TODO(peering) Test error handling in calls to Send/Recv return fmt.Errorf("failed to send subscription for %q to stream: %w", resourceURL, err) } } - // TODO(peering): Should this be buffered? - recvChan := make(chan *pbpeerstream.ReplicationMessage) + // recvCh sends messages from the gRPC stream. + recvCh := make(chan *pbpeerstream.ReplicationMessage) + // recvErrCh sends errors received from the gRPC stream. + recvErrCh := make(chan error) + + // Start a goroutine to read from the stream and pass to recvCh and recvErrCh. + // Using a separate goroutine allows us to process sends and receives all in the main for{} loop. go func() { - defer close(recvChan) for { msg, err := streamReq.Stream.Recv() - if err == nil { - logTraceRecv(logger, msg) - recvChan <- msg - continue - } - - if err == io.EOF { - logger.Info("stream ended by peer") - status.TrackRecvError(err.Error()) + if err != nil { + recvErrCh <- err + return + } + logTraceRecv(logger, msg) + select { + case recvCh <- msg: + case <-handleStreamCtx.Done(): return } - logger.Error("failed to receive from stream", "error", err) - status.TrackRecvError(err.Error()) - return } }() - // Heartbeat sender. + // Start a goroutine to send heartbeats at a regular interval. go func() { tick := time.NewTicker(s.outgoingHeartbeatInterval) defer tick.Stop() for { select { - case <-streamReq.Stream.Context().Done(): + case <-handleStreamCtx.Done(): return case <-tick.C: - } - - heartbeat := &pbpeerstream.ReplicationMessage{ - Payload: &pbpeerstream.ReplicationMessage_Heartbeat_{ - Heartbeat: &pbpeerstream.ReplicationMessage_Heartbeat{}, - }, - } - if err := streamSend(heartbeat); err != nil { - logger.Warn("error sending heartbeat", "err", err) + heartbeat := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Heartbeat_{ + Heartbeat: &pbpeerstream.ReplicationMessage_Heartbeat{}, + }, + } + if err := streamSend(heartbeat); err != nil { + logger.Warn("error sending heartbeat", "err", err) + } } } }() @@ -308,6 +306,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { incomingHeartbeatCtxCancel() }() + // The main loop that processes sends and receives. for { select { // When the doneCh is closed that means that the peering was deleted locally. @@ -331,28 +330,30 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { return nil + // Handle errors received from the stream by shutting down our handler. + case err := <-recvErrCh: + if err == io.EOF { + // NOTE: We don't expect to receive an io.EOF error here when the stream is disconnected gracefully. + // When the peering is deleted locally, status.Done() returns which is handled elsewhere and this method + // exits. When we receive a Terminated message, that's also handled elsewhere and this method + // exits. After the method exits this code here won't receive any recv errors and those will be handled + // by DrainStream(). + err = fmt.Errorf("stream ended unexpectedly") + } + status.TrackRecvError(err.Error()) + return err + // We haven't received a heartbeat within the expected interval. Kill the stream. case <-incomingHeartbeatCtx.Done(): - logger.Error("ending stream due to heartbeat timeout") return fmt.Errorf("heartbeat timeout") - case msg, open := <-recvChan: - if !open { - // The only time we expect the stream to end is when we've received a "Terminated" message. - // We handle the case of receiving the Terminated message below and then this function exits. - // So if the channel is closed while this function is still running then we haven't received a Terminated - // message which means we want to try and reestablish the stream. - // It's the responsibility of the caller of this function to reestablish the stream on error and so that's - // why we return an error here. - return fmt.Errorf("stream ended unexpectedly") - } - + case msg := <-recvCh: // NOTE: this code should have similar error handling to the // initial handling code in StreamResources() if !s.Backend.IsLeader() { - // we are not the leader anymore so we will hang up on the dialer - logger.Error("node is not a leader anymore; cannot continue streaming") + // We are not the leader anymore, so we will hang up on the dialer. + logger.Info("node is not a leader anymore; cannot continue streaming") st, err := grpcstatus.New(codes.FailedPrecondition, "node is not a leader anymore; cannot continue streaming").WithDetails( diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 174ecf59f3..c4458acf0c 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -589,7 +589,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { testutil.RunStep(t, "client disconnect marks stream as disconnected", func(t *testing.T) { lastRecvError = it.FutureNow(1) disconnectTime := it.FutureNow(2) - lastRecvErrorMsg = io.EOF.Error() + lastRecvErrorMsg = "stream ended unexpectedly" client.Close() @@ -597,7 +597,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: false, - DisconnectErrorMessage: "stream ended unexpectedly", + DisconnectErrorMessage: lastRecvErrorMsg, LastAck: lastSendSuccess, LastNack: lastNack, LastNackMessage: lastNackMsg,