mirror of https://github.com/status-im/consul.git
peerstream: dialer should reconnect when stream closes (#13745)
* peerstream: dialer should reconnect when stream closes If the stream is closed unexpectedly (i.e. when we haven't received a terminated message), the dialer should attempt to re-establish the stream. Previously, the `HandleStream` would return `nil` when the stream was closed. The caller then assumed the stream was terminated on purpose and so didn't reconnect when instead it was stopped unexpectedly and the dialer should have attempted to reconnect.
This commit is contained in:
parent
0678bf91a7
commit
ca3d7c964c
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/consul/types"
|
||||
|
@ -423,6 +424,120 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
// Test that the dialing peer attempts to reestablish connections when the accepting peer
|
||||
// shuts down without sending a Terminated message.
|
||||
//
|
||||
// To test this, we start the two peer servers (accepting and dialing), set up peering, and then shut down
|
||||
// the accepting peer. This terminates the connection without sending a Terminated message.
|
||||
// We then restart the accepting peer (we actually spin up a new server with the same config and port) and then
|
||||
// assert that the dialing peer reestablishes the connection.
|
||||
func TestLeader_Peering_DialerReestablishesConnectionOnError(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
// Reserve a gRPC port so we can restart the accepting server with the same port.
|
||||
ports := freeport.GetN(t, 1)
|
||||
acceptingServerPort := ports[0]
|
||||
|
||||
_, acceptingServer := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "acceptingServer.dc1"
|
||||
c.Datacenter = "dc1"
|
||||
c.TLSConfig.Domain = "consul"
|
||||
c.GRPCPort = acceptingServerPort
|
||||
})
|
||||
testrpc.WaitForLeader(t, acceptingServer.RPC, "dc1")
|
||||
|
||||
// Create a peering by generating a token.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
conn, err := grpc.DialContext(ctx, acceptingServer.config.RPCAddr.String(),
|
||||
grpc.WithContextDialer(newServerDialer(acceptingServer.config.RPCAddr.String())),
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock())
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
peeringClient := pbpeering.NewPeeringServiceClient(conn)
|
||||
req := pbpeering.GenerateTokenRequest{
|
||||
PeerName: "my-peer-dialing-server",
|
||||
}
|
||||
resp, err := peeringClient.GenerateToken(ctx, &req)
|
||||
require.NoError(t, err)
|
||||
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
|
||||
require.NoError(t, err)
|
||||
var token structs.PeeringToken
|
||||
require.NoError(t, json.Unmarshal(tokenJSON, &token))
|
||||
|
||||
var (
|
||||
dialingServerPeerID = token.PeerID
|
||||
acceptingServerPeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
|
||||
)
|
||||
|
||||
// Bring up dialingServer and store acceptingServer's token so that it attempts to dial.
|
||||
_, dialingServer := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "dialing-server.dc2"
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc2"
|
||||
})
|
||||
testrpc.WaitForLeader(t, dialingServer.RPC, "dc2")
|
||||
p := &pbpeering.Peering{
|
||||
ID: acceptingServerPeerID,
|
||||
Name: "my-peer-accepting-server",
|
||||
PeerID: token.PeerID,
|
||||
PeerCAPems: token.CA,
|
||||
PeerServerName: token.ServerName,
|
||||
PeerServerAddresses: token.ServerAddresses,
|
||||
}
|
||||
require.True(t, p.ShouldDial())
|
||||
require.NoError(t, dialingServer.fsm.State().PeeringWrite(1000, p))
|
||||
|
||||
// Wait for the stream to be connected.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
status, found := dialingServer.peerStreamServer.StreamStatus(p.ID)
|
||||
require.True(r, found)
|
||||
require.True(r, status.Connected)
|
||||
})
|
||||
|
||||
// Wait until the dialing server has sent its roots over. This avoids a race condition where the accepting server
|
||||
// shuts down, but the dialing server is still sending messages to the stream. When this happens, an error is raised
|
||||
// which causes the stream to restart.
|
||||
// In this test, we want to test what happens when the stream is closed when there are _no_ messages being sent.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, bundle, err := acceptingServer.fsm.State().PeeringTrustBundleRead(nil, state.Query{Value: "my-peer-dialing-server"})
|
||||
require.NoError(r, err)
|
||||
require.NotNil(r, bundle)
|
||||
})
|
||||
|
||||
// Shutdown the accepting server.
|
||||
require.NoError(t, acceptingServer.Shutdown())
|
||||
// Have to manually shut down the gRPC server otherwise it stays bound to the port.
|
||||
acceptingServer.externalGRPCServer.Stop()
|
||||
|
||||
// Mimic the server restarting by starting a new server with the same config.
|
||||
_, acceptingServerRestart := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "acceptingServer.dc1"
|
||||
c.Datacenter = "dc1"
|
||||
c.TLSConfig.Domain = "consul"
|
||||
c.GRPCPort = acceptingServerPort
|
||||
})
|
||||
testrpc.WaitForLeader(t, acceptingServerRestart.RPC, "dc1")
|
||||
|
||||
// Re-insert the peering state.
|
||||
require.NoError(t, acceptingServerRestart.fsm.State().PeeringWrite(2000, &pbpeering.Peering{
|
||||
ID: dialingServerPeerID,
|
||||
Name: "my-peer-dialing-server",
|
||||
State: pbpeering.PeeringState_PENDING,
|
||||
}))
|
||||
|
||||
// The dialing peer should eventually reconnect.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
connStreams := acceptingServerRestart.peerStreamServer.ConnectedStreams()
|
||||
require.Contains(r, connStreams, dialingServerPeerID)
|
||||
})
|
||||
}
|
||||
|
||||
func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastIdx uint64) uint64 {
|
||||
lastIdx++
|
||||
require.NoError(t, store.PeeringTrustBundleWrite(lastIdx, &pbpeering.PeeringTrustBundle{
|
||||
|
|
|
@ -258,8 +258,13 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
|
|||
|
||||
case msg, open := <-recvChan:
|
||||
if !open {
|
||||
logger.Trace("no longer receiving data on the stream")
|
||||
return nil
|
||||
// The only time we expect the stream to end is when we've received a "Terminated" message.
|
||||
// We handle the case of receiving the Terminated message below and then this function exits.
|
||||
// So if the channel is closed while this function is still running then we haven't received a Terminated
|
||||
// message which means we want to try and reestablish the stream.
|
||||
// It's the responsibility of the caller of this function to reestablish the stream on error and so that's
|
||||
// why we return an error here.
|
||||
return fmt.Errorf("stream ended unexpectedly")
|
||||
}
|
||||
|
||||
// NOTE: this code should have similar error handling to the
|
||||
|
|
Loading…
Reference in New Issue