From ca3d7c964c0a1532f267b1f8665f2795d72e17bb Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Fri, 15 Jul 2022 11:58:33 -0700 Subject: [PATCH] peerstream: dialer should reconnect when stream closes (#13745) * peerstream: dialer should reconnect when stream closes If the stream is closed unexpectedly (i.e. when we haven't received a terminated message), the dialer should attempt to re-establish the stream. Previously, the `HandleStream` would return `nil` when the stream was closed. The caller then assumed the stream was terminated on purpose and so didn't reconnect when instead it was stopped unexpectedly and the dialer should have attempted to reconnect. --- agent/consul/leader_peering_test.go | 115 ++++++++++++++++++ .../services/peerstream/stream_resources.go | 9 +- 2 files changed, 122 insertions(+), 2 deletions(-) diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 33ef26d615..aa720bd6b5 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbpeering" + "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/types" @@ -423,6 +424,120 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) { }) } +// Test that the dialing peer attempts to reestablish connections when the accepting peer +// shuts down without sending a Terminated message. +// +// To test this, we start the two peer servers (accepting and dialing), set up peering, and then shut down +// the accepting peer. This terminates the connection without sending a Terminated message. +// We then restart the accepting peer (we actually spin up a new server with the same config and port) and then +// assert that the dialing peer reestablishes the connection. +func TestLeader_Peering_DialerReestablishesConnectionOnError(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + // Reserve a gRPC port so we can restart the accepting server with the same port. + ports := freeport.GetN(t, 1) + acceptingServerPort := ports[0] + + _, acceptingServer := testServerWithConfig(t, func(c *Config) { + c.NodeName = "acceptingServer.dc1" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + c.GRPCPort = acceptingServerPort + }) + testrpc.WaitForLeader(t, acceptingServer.RPC, "dc1") + + // Create a peering by generating a token. + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err := grpc.DialContext(ctx, acceptingServer.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(acceptingServer.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + peeringClient := pbpeering.NewPeeringServiceClient(conn) + req := pbpeering.GenerateTokenRequest{ + PeerName: "my-peer-dialing-server", + } + resp, err := peeringClient.GenerateToken(ctx, &req) + require.NoError(t, err) + tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken) + require.NoError(t, err) + var token structs.PeeringToken + require.NoError(t, json.Unmarshal(tokenJSON, &token)) + + var ( + dialingServerPeerID = token.PeerID + acceptingServerPeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + ) + + // Bring up dialingServer and store acceptingServer's token so that it attempts to dial. + _, dialingServer := testServerWithConfig(t, func(c *Config) { + c.NodeName = "dialing-server.dc2" + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc2" + }) + testrpc.WaitForLeader(t, dialingServer.RPC, "dc2") + p := &pbpeering.Peering{ + ID: acceptingServerPeerID, + Name: "my-peer-accepting-server", + PeerID: token.PeerID, + PeerCAPems: token.CA, + PeerServerName: token.ServerName, + PeerServerAddresses: token.ServerAddresses, + } + require.True(t, p.ShouldDial()) + require.NoError(t, dialingServer.fsm.State().PeeringWrite(1000, p)) + + // Wait for the stream to be connected. + retry.Run(t, func(r *retry.R) { + status, found := dialingServer.peerStreamServer.StreamStatus(p.ID) + require.True(r, found) + require.True(r, status.Connected) + }) + + // Wait until the dialing server has sent its roots over. This avoids a race condition where the accepting server + // shuts down, but the dialing server is still sending messages to the stream. When this happens, an error is raised + // which causes the stream to restart. + // In this test, we want to test what happens when the stream is closed when there are _no_ messages being sent. + retry.Run(t, func(r *retry.R) { + _, bundle, err := acceptingServer.fsm.State().PeeringTrustBundleRead(nil, state.Query{Value: "my-peer-dialing-server"}) + require.NoError(r, err) + require.NotNil(r, bundle) + }) + + // Shutdown the accepting server. + require.NoError(t, acceptingServer.Shutdown()) + // Have to manually shut down the gRPC server otherwise it stays bound to the port. + acceptingServer.externalGRPCServer.Stop() + + // Mimic the server restarting by starting a new server with the same config. + _, acceptingServerRestart := testServerWithConfig(t, func(c *Config) { + c.NodeName = "acceptingServer.dc1" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + c.GRPCPort = acceptingServerPort + }) + testrpc.WaitForLeader(t, acceptingServerRestart.RPC, "dc1") + + // Re-insert the peering state. + require.NoError(t, acceptingServerRestart.fsm.State().PeeringWrite(2000, &pbpeering.Peering{ + ID: dialingServerPeerID, + Name: "my-peer-dialing-server", + State: pbpeering.PeeringState_PENDING, + })) + + // The dialing peer should eventually reconnect. + retry.Run(t, func(r *retry.R) { + connStreams := acceptingServerRestart.peerStreamServer.ConnectedStreams() + require.Contains(r, connStreams, dialingServerPeerID) + }) +} + func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastIdx uint64) uint64 { lastIdx++ require.NoError(t, store.PeeringTrustBundleWrite(lastIdx, &pbpeering.PeeringTrustBundle{ diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index eabd011412..57bc350c08 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -258,8 +258,13 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error { case msg, open := <-recvChan: if !open { - logger.Trace("no longer receiving data on the stream") - return nil + // The only time we expect the stream to end is when we've received a "Terminated" message. + // We handle the case of receiving the Terminated message below and then this function exits. + // So if the channel is closed while this function is still running then we haven't received a Terminated + // message which means we want to try and reestablish the stream. + // It's the responsibility of the caller of this function to reestablish the stream on error and so that's + // why we return an error here. + return fmt.Errorf("stream ended unexpectedly") } // NOTE: this code should have similar error handling to the