Add basic nonce management

This commit adds a monotonically increasing nonce to include in peering
replication response messages. Every ack/nack from the peer handling a
response will include this nonce, allowing to correlate the ack/nack
with a specific resource.

At the moment nothing is done with the nonce when it is received. In the
future we may want to add functionality such as retries on NACKs,
depending on the class of error.
This commit is contained in:
freddygv 2022-10-11 19:02:04 -06:00
parent d17af23641
commit 7f9a5d0f58
3 changed files with 106 additions and 21 deletions

View File

@ -54,11 +54,9 @@ func makeExportedServiceListResponse(
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedServiceList,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: subExportedServiceList,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
ResourceID: subExportedServiceList,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
@ -86,11 +84,9 @@ func makeServiceResponse(
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedService,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: serviceName,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
ResourceID: serviceName,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
@ -104,11 +100,9 @@ func makeCARootsResponse(
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: "roots",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
ResourceID: "roots",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
@ -122,11 +116,9 @@ func makeServerAddrsResponse(
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: "server-addrs",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
ResourceID: "server-addrs",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
@ -162,6 +154,15 @@ func (s *Server) processResponse(
err.Error(),
), err
}
if resp.Nonce == "" {
err := fmt.Errorf("received response without a nonce for: %s:%s", resp.ResourceURL, resp.ResourceID)
return makeNACKReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INVALID_ARGUMENT,
err.Error(),
), err
}
switch resp.Operation {
case pbpeerstream.Operation_OPERATION_UPSERT:

View File

@ -436,6 +436,9 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
incomingHeartbeatCtxCancel()
}()
// The nonce is used to correlate response/(ack|nack) pairs.
var nonce uint64
// The main loop that processes sends and receives.
for {
select {
@ -585,7 +588,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
}
if resp := msg.GetResponse(); resp != nil {
// TODO(peering): Ensure there's a nonce
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp)
if err != nil {
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
@ -669,6 +671,10 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
continue
}
// Assign a new unique nonce to the response.
nonce++
resp.Nonce = fmt.Sprintf("%08x", nonce)
replResp := makeReplicationResponse(resp)
if err := streamSend(replResp); err != nil {
// note: govet warns of context leak but it is cleaned up in a defer

View File

@ -1162,6 +1162,55 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
})
}
func TestStreamResources_Server_AckNackNonce(t *testing.T) {
srv, store := newTestServer(t, func(c *Config) {
c.incomingHeartbeatTimeout = 50 * time.Millisecond
})
p := writePeeringToBeDialed(t, store, 1, "my-peer")
require.Empty(t, p.PeerID, "should be empty if being dialed")
// Set the initial roots and CA configuration.
_, _ = writeInitialRootsAndCA(t, store)
client := makeClient(t, srv, testPeerID)
client.DrainStream(t)
testutil.RunStep(t, "ack contains nonce from response", func(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedService,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Nonce: "1234",
},
},
}
require.NoError(t, client.Send(resp))
msg, err := client.Recv()
require.NoError(t, err)
require.Equal(t, "1234", msg.GetRequest().ResponseNonce)
})
testutil.RunStep(t, "nack contains nonce from response", func(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedService,
Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED, // Unspecified gets NACK
Nonce: "5678",
},
},
}
require.NoError(t, client.Send(resp))
msg, err := client.Recv()
require.NoError(t, err)
require.Equal(t, "5678", msg.GetRequest().ResponseNonce)
})
}
// Test that when the client doesn't send a heartbeat in time, the stream is disconnected.
func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) {
it := incrementalTime{
@ -1618,6 +1667,28 @@ func Test_processResponse_Validation(t *testing.T) {
},
wantErr: true,
},
{
name: "missing a nonce",
in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "web",
Nonce: "",
Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED,
},
expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "",
Error: &pbstatus.Status{
Code: int32(code.Code_INVALID_ARGUMENT),
Message: fmt.Sprintf(`received response without a nonce for: %s:web`, pbpeerstream.TypeURLExportedService),
},
},
},
},
wantErr: true,
},
{
name: "unknown operation",
in: &pbpeerstream.ReplicationMessage_Response{
@ -1809,8 +1880,14 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test
}
})
nonces := make(map[string]struct{})
for i := 0; i < num; i++ {
checkFns[i](t, out[i])
// Ensure every nonce was unique.
if resp := out[i].GetResponse(); resp != nil {
require.NotContains(t, nonces, resp.Nonce)
}
}
}
@ -1879,6 +1956,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedServiceList,
ResourceID: subExportedServiceList,
Nonce: "2",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: makeAnyPB(t, &pbpeerstream.ExportedServiceList{Services: tc.exportedServices}),
}