From 7f9a5d0f58d89e86b3d9588b87bb3fcc303e71c8 Mon Sep 17 00:00:00 2001 From: freddygv Date: Tue, 11 Oct 2022 19:02:04 -0600 Subject: [PATCH] Add basic nonce management This commit adds a monotonically increasing nonce to include in peering replication response messages. Every ack/nack from the peer handling a response will include this nonce, allowing to correlate the ack/nack with a specific resource. At the moment nothing is done with the nonce when it is received. In the future we may want to add functionality such as retries on NACKs, depending on the class of error. --- .../services/peerstream/replication.go | 41 +++++----- .../services/peerstream/stream_resources.go | 8 +- .../services/peerstream/stream_test.go | 78 +++++++++++++++++++ 3 files changed, 106 insertions(+), 21 deletions(-) diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index 9b2a61a5ba..74b3278f21 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -54,11 +54,9 @@ func makeExportedServiceListResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedServiceList, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: subExportedServiceList, - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: subExportedServiceList, + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -86,11 +84,9 @@ func makeServiceResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedService, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: serviceName, - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: serviceName, + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -104,11 +100,9 @@ func makeCARootsResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: "roots", - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: "roots", + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -122,11 +116,9 @@ func makeServerAddrsResponse( return &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: "server-addrs", - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, + ResourceID: "server-addrs", + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, }, nil } @@ -162,6 +154,15 @@ func (s *Server) processResponse( err.Error(), ), err } + if resp.Nonce == "" { + err := fmt.Errorf("received response without a nonce for: %s:%s", resp.ResourceURL, resp.ResourceID) + return makeNACKReply( + resp.ResourceURL, + resp.Nonce, + code.Code_INVALID_ARGUMENT, + err.Error(), + ), err + } switch resp.Operation { case pbpeerstream.Operation_OPERATION_UPSERT: diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index ce6a5a73ed..e045cfa16b 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -436,6 +436,9 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { incomingHeartbeatCtxCancel() }() + // The nonce is used to correlate response/(ack|nack) pairs. + var nonce uint64 + // The main loop that processes sends and receives. for { select { @@ -585,7 +588,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { } if resp := msg.GetResponse(); resp != nil { - // TODO(peering): Ensure there's a nonce reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp) if err != nil { logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID) @@ -669,6 +671,10 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { continue } + // Assign a new unique nonce to the response. + nonce++ + resp.Nonce = fmt.Sprintf("%08x", nonce) + replResp := makeReplicationResponse(resp) if err := streamSend(replResp); err != nil { // note: govet warns of context leak but it is cleaned up in a defer diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index baf437daa4..e41cfb9a11 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1162,6 +1162,55 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) { }) } +func TestStreamResources_Server_AckNackNonce(t *testing.T) { + srv, store := newTestServer(t, func(c *Config) { + c.incomingHeartbeatTimeout = 50 * time.Millisecond + }) + + p := writePeeringToBeDialed(t, store, 1, "my-peer") + require.Empty(t, p.PeerID, "should be empty if being dialed") + + // Set the initial roots and CA configuration. + _, _ = writeInitialRootsAndCA(t, store) + + client := makeClient(t, srv, testPeerID) + client.DrainStream(t) + + testutil.RunStep(t, "ack contains nonce from response", func(t *testing.T) { + resp := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Response_{ + Response: &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLExportedService, + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Nonce: "1234", + }, + }, + } + require.NoError(t, client.Send(resp)) + + msg, err := client.Recv() + require.NoError(t, err) + require.Equal(t, "1234", msg.GetRequest().ResponseNonce) + }) + + testutil.RunStep(t, "nack contains nonce from response", func(t *testing.T) { + resp := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Response_{ + Response: &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLExportedService, + Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED, // Unspecified gets NACK + Nonce: "5678", + }, + }, + } + require.NoError(t, client.Send(resp)) + + msg, err := client.Recv() + require.NoError(t, err) + require.Equal(t, "5678", msg.GetRequest().ResponseNonce) + }) +} + // Test that when the client doesn't send a heartbeat in time, the stream is disconnected. func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) { it := incrementalTime{ @@ -1618,6 +1667,28 @@ func Test_processResponse_Validation(t *testing.T) { }, wantErr: true, }, + { + name: "missing a nonce", + in: &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLExportedService, + ResourceID: "web", + Nonce: "", + Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED, + }, + expect: &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Request_{ + Request: &pbpeerstream.ReplicationMessage_Request{ + ResourceURL: pbpeerstream.TypeURLExportedService, + ResponseNonce: "", + Error: &pbstatus.Status{ + Code: int32(code.Code_INVALID_ARGUMENT), + Message: fmt.Sprintf(`received response without a nonce for: %s:web`, pbpeerstream.TypeURLExportedService), + }, + }, + }, + }, + wantErr: true, + }, { name: "unknown operation", in: &pbpeerstream.ReplicationMessage_Response{ @@ -1809,8 +1880,14 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test } }) + nonces := make(map[string]struct{}) for i := 0; i < num; i++ { checkFns[i](t, out[i]) + + // Ensure every nonce was unique. + if resp := out[i].GetResponse(); resp != nil { + require.NotContains(t, nonces, resp.Nonce) + } } } @@ -1879,6 +1956,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { resp := &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedServiceList, ResourceID: subExportedServiceList, + Nonce: "2", Operation: pbpeerstream.Operation_OPERATION_UPSERT, Resource: makeAnyPB(t, &pbpeerstream.ExportedServiceList{Services: tc.exportedServices}), }