Merge pull request #14958 from hashicorp/peering/nonce

This commit is contained in:
Freddy 2022-10-12 08:18:15 -06:00 committed by GitHub
commit 9ca8bb8ec4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 107 additions and 21 deletions

View File

@ -54,11 +54,9 @@ func makeExportedServiceListResponse(
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedServiceList,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: subExportedServiceList,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
ResourceID: subExportedServiceList,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
@ -86,11 +84,9 @@ func makeServiceResponse(
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedService,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: serviceName,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
ResourceID: serviceName,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
@ -104,11 +100,9 @@ func makeCARootsResponse(
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: "roots",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
ResourceID: "roots",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
@ -122,11 +116,9 @@ func makeServerAddrsResponse(
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: "server-addrs",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
ResourceID: "server-addrs",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
@ -162,6 +154,15 @@ func (s *Server) processResponse(
err.Error(),
), err
}
if resp.Nonce == "" {
err := fmt.Errorf("received response without a nonce for: %s:%s", resp.ResourceURL, resp.ResourceID)
return makeNACKReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INVALID_ARGUMENT,
err.Error(),
), err
}
switch resp.Operation {
case pbpeerstream.Operation_OPERATION_UPSERT:

View File

@ -436,6 +436,9 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
incomingHeartbeatCtxCancel()
}()
// The nonce is used to correlate response/(ack|nack) pairs.
var nonce uint64
// The main loop that processes sends and receives.
for {
select {
@ -585,7 +588,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
}
if resp := msg.GetResponse(); resp != nil {
// TODO(peering): Ensure there's a nonce
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp)
if err != nil {
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
@ -669,6 +671,10 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
continue
}
// Assign a new unique nonce to the response.
nonce++
resp.Nonce = fmt.Sprintf("%08x", nonce)
replResp := makeReplicationResponse(resp)
if err := streamSend(replResp); err != nil {
// note: govet warns of context leak but it is cleaned up in a defer

View File

@ -1162,6 +1162,55 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
})
}
func TestStreamResources_Server_AckNackNonce(t *testing.T) {
srv, store := newTestServer(t, func(c *Config) {
c.incomingHeartbeatTimeout = 50 * time.Millisecond
})
p := writePeeringToBeDialed(t, store, 1, "my-peer")
require.Empty(t, p.PeerID, "should be empty if being dialed")
// Set the initial roots and CA configuration.
_, _ = writeInitialRootsAndCA(t, store)
client := makeClient(t, srv, testPeerID)
client.DrainStream(t)
testutil.RunStep(t, "ack contains nonce from response", func(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedService,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Nonce: "1234",
},
},
}
require.NoError(t, client.Send(resp))
msg, err := client.Recv()
require.NoError(t, err)
require.Equal(t, "1234", msg.GetRequest().ResponseNonce)
})
testutil.RunStep(t, "nack contains nonce from response", func(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedService,
Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED, // Unspecified gets NACK
Nonce: "5678",
},
},
}
require.NoError(t, client.Send(resp))
msg, err := client.Recv()
require.NoError(t, err)
require.Equal(t, "5678", msg.GetRequest().ResponseNonce)
})
}
// Test that when the client doesn't send a heartbeat in time, the stream is disconnected.
func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) {
it := incrementalTime{
@ -1618,6 +1667,28 @@ func Test_processResponse_Validation(t *testing.T) {
},
wantErr: true,
},
{
name: "missing a nonce",
in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "web",
Nonce: "",
Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED,
},
expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "",
Error: &pbstatus.Status{
Code: int32(code.Code_INVALID_ARGUMENT),
Message: fmt.Sprintf(`received response without a nonce for: %s:web`, pbpeerstream.TypeURLExportedService),
},
},
},
},
wantErr: true,
},
{
name: "unknown operation",
in: &pbpeerstream.ReplicationMessage_Response{
@ -1809,8 +1880,15 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test
}
})
nonces := make(map[string]struct{})
for i := 0; i < num; i++ {
checkFns[i](t, out[i])
// Ensure every nonce was unique.
if resp := out[i].GetResponse(); resp != nil {
require.NotContains(t, nonces, resp.Nonce)
nonces[resp.Nonce] = struct{}{}
}
}
}
@ -1879,6 +1957,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedServiceList,
ResourceID: subExportedServiceList,
Nonce: "2",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: makeAnyPB(t, &pbpeerstream.ExportedServiceList{Services: tc.exportedServices}),
}