diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 64960f5f4e..d0f7c7470d 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -277,7 +277,7 @@ type HandleStreamRequest struct { Stream BidirectionalStream } -func (r HandleStreamRequest) WasDialed() bool { +func (r HandleStreamRequest) IsAcceptor() bool { return r.RemoteID == "" } @@ -316,7 +316,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { logger := s.Logger.Named("stream"). With("peer_name", streamReq.PeerName). With("peer_id", streamReq.LocalID). - With("dialed", streamReq.WasDialed()) + With("dailer", !streamReq.IsAcceptor()) logger.Trace("handling stream for peer") // handleStreamCtx is local to this function. @@ -380,13 +380,18 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { return err } - // Subscribe to all relevant resource types. - for _, resourceURL := range []string{ + resources := []string{ pbpeerstream.TypeURLExportedService, pbpeerstream.TypeURLExportedServiceList, pbpeerstream.TypeURLPeeringTrustBundle, - pbpeerstream.TypeURLPeeringServerAddresses, - } { + } + // Acceptors should not subscribe to server address updates, because they should always have an empty list. + if !streamReq.IsAcceptor() { + resources = append(resources, pbpeerstream.TypeURLPeeringServerAddresses) + } + + // Subscribe to all relevant resource types. + for _, resourceURL := range resources { sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{ ResourceURL: resourceURL, PeerID: streamReq.RemoteID, @@ -558,7 +563,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { // This must be a new subscription request to add a new // resource type, vet it like a new request. - if !streamReq.WasDialed() { + if !streamReq.IsAcceptor() { if req.PeerID != "" && req.PeerID != streamReq.RemoteID { // Not necessary after the first request from the dialer, // but if provided must match. diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 4674e34b37..e53cb8cc71 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -125,7 +125,7 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { // Receive a subscription from a peer. This message arrives while the // server is a leader and should work. - testutil.RunStep(t, "send subscription request to leader and consume its four requests", func(t *testing.T) { + testutil.RunStep(t, "send subscription request to leader and consume its three requests", func(t *testing.T) { sub := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Open_{ Open: &pbpeerstream.ReplicationMessage_Open{ @@ -148,10 +148,6 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { msg3, err := client.Recv() require.NoError(t, err) require.NotEmpty(t, msg3) - - msg4, err := client.Recv() - require.NoError(t, err) - require.NotEmpty(t, msg4) }) // The ACK will be a new request but at this point the server is not the @@ -551,6 +547,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { it := incrementalTime{ base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), } + waitUntil := it.FutureNow(6) srv, store := newTestServer(t, nil) srv.Tracker.setClock(it.Now) @@ -576,6 +573,11 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { client.DrainStream(t) + // Wait for async workflows to complete. + retry.Run(t, func(r *retry.R) { + require.Equal(r, waitUntil, it.FutureNow(1)) + }) + // Manually grab the last success time from sending the trust bundle or exported services list. status, ok := srv.StreamStatus(testPeerID) require.True(t, ok) @@ -605,7 +607,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { LastAck: lastSendAck, ExportedServices: []string{}, } - retry.Run(t, func(r *retry.R) { rStatus, ok := srv.StreamStatus(testPeerID) require.True(r, ok) @@ -894,9 +895,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { require.NoError(t, store.EnsureConfigEntry(lastIdx, entry)) expectReplEvents(t, client, - func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLPeeringServerAddresses, msg.GetRequest().ResourceURL) - }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL) // Roots tested in TestStreamResources_Server_CARootUpdates @@ -1105,9 +1103,6 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) { testutil.RunStep(t, "initial CA Roots replication", func(t *testing.T) { expectReplEvents(t, client, - func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLPeeringServerAddresses, msg.GetRequest().ResourceURL) - }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL) require.Equal(t, "roots", msg.GetResponse().ResourceID)