From 30fffd0c90247bf111f5e0403fdfee12049d7ad4 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Wed, 13 Jul 2022 10:00:35 -0500 Subject: [PATCH] peerstream: some cosmetic refactors to make this easier to follow (#13732) - Use some protobuf construction helper methods for brevity. - Rename a local variable to avoid later shadowing. - Rename the Nonce field to be more like xDS's naming. - Be more explicit about which PeerID fields are empty. --- agent/consul/leader_peering.go | 4 + .../public/services/peerstream/replication.go | 121 +++++++++--------- .../services/peerstream/stream_resources.go | 92 ++++++++----- .../public/services/peerstream/stream_test.go | 108 ++++++++-------- proto/pbpeerstream/peerstream.pb.go | 107 ++++++++-------- proto/pbpeerstream/peerstream.proto | 4 +- 6 files changed, 240 insertions(+), 196 deletions(-) diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 48ce59d07d..a289412ea6 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -252,6 +252,10 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer return err } + if peer.PeerID == "" { + return fmt.Errorf("expected PeerID to be non empty; the wrong end of peering is being dialed") + } + streamReq := peerstream.HandleStreamRequest{ LocalID: peer.ID, RemoteID: peer.PeerID, diff --git a/agent/grpc/public/services/peerstream/replication.go b/agent/grpc/public/services/peerstream/replication.go index bbb54c4080..f9f5ce76b0 100644 --- a/agent/grpc/public/services/peerstream/replication.go +++ b/agent/grpc/public/services/peerstream/replication.go @@ -38,12 +38,10 @@ import ( func makeServiceResponse( logger hclog.Logger, update cache.UpdateEvent, -) *pbpeerstream.ReplicationMessage { +) (*pbpeerstream.ReplicationMessage_Response, error) { any, csn, err := marshalToProtoAny[*pbservice.IndexedCheckServiceNodes](update.Result) if err != nil { - // Log the error and skip this response to avoid locking up peering due to a bad update event. - logger.Error("failed to marshal", "error", err) - return nil + return nil, fmt.Errorf("failed to marshal: %w", err) } serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService) @@ -56,60 +54,43 @@ func makeServiceResponse( // We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that. // Case #1 is a no-op for the importing peer. if len(csn.Nodes) == 0 { - resp := &pbpeerstream.ReplicationMessage{ - Payload: &pbpeerstream.ReplicationMessage_Response_{ - Response: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: serviceName, - Operation: pbpeerstream.Operation_OPERATION_DELETE, - }, - }, - } - return resp + return &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLService, + // TODO(peering): Nonce management + Nonce: "", + ResourceID: serviceName, + Operation: pbpeerstream.Operation_OPERATION_DELETE, + }, nil } // If there are nodes in the response, we push them as an UPSERT operation. - resp := &pbpeerstream.ReplicationMessage{ - Payload: &pbpeerstream.ReplicationMessage_Response_{ - Response: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLService, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: serviceName, - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, - }, - }, - } - return resp + return &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLService, + // TODO(peering): Nonce management + Nonce: "", + ResourceID: serviceName, + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, + }, nil } func makeCARootsResponse( logger hclog.Logger, update cache.UpdateEvent, -) *pbpeerstream.ReplicationMessage { +) (*pbpeerstream.ReplicationMessage_Response, error) { any, _, err := marshalToProtoAny[*pbpeering.PeeringTrustBundle](update.Result) if err != nil { - // Log the error and skip this response to avoid locking up peering due to a bad update event. - logger.Error("failed to marshal", "error", err) - return nil + return nil, fmt.Errorf("failed to marshal: %w", err) } - resp := &pbpeerstream.ReplicationMessage{ - Payload: &pbpeerstream.ReplicationMessage_Response_{ - Response: &pbpeerstream.ReplicationMessage_Response{ - ResourceURL: pbpeerstream.TypeURLRoots, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: "roots", - Operation: pbpeerstream.Operation_OPERATION_UPSERT, - Resource: any, - }, - }, - } - return resp + return &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLRoots, + // TODO(peering): Nonce management + Nonce: "", + ResourceID: "roots", + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, + }, nil } // marshalToProtoAny takes any input and returns: @@ -136,7 +117,7 @@ func (s *Server) processResponse( ) (*pbpeerstream.ReplicationMessage, error) { if !pbpeerstream.KnownTypeURL(resp.ResourceURL) { err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL) - return makeReply( + return makeNACKReply( resp.ResourceURL, resp.Nonce, code.Code_INVALID_ARGUMENT, @@ -148,7 +129,7 @@ func (s *Server) processResponse( case pbpeerstream.Operation_OPERATION_UPSERT: if resp.Resource == nil { err := fmt.Errorf("received upsert response with no content") - return makeReply( + return makeNACKReply( resp.ResourceURL, resp.Nonce, code.Code_INVALID_ARGUMENT, @@ -157,7 +138,7 @@ func (s *Server) processResponse( } if err := s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource); err != nil { - return makeReply( + return makeNACKReply( resp.ResourceURL, resp.Nonce, code.Code_INTERNAL, @@ -165,18 +146,18 @@ func (s *Server) processResponse( ), fmt.Errorf("upsert error: %w", err) } - return makeReply(resp.ResourceURL, resp.Nonce, code.Code_OK, ""), nil + return makeACKReply(resp.ResourceURL, resp.Nonce), nil case pbpeerstream.Operation_OPERATION_DELETE: if err := s.handleDelete(peerName, partition, resp.ResourceURL, resp.ResourceID); err != nil { - return makeReply( + return makeNACKReply( resp.ResourceURL, resp.Nonce, code.Code_INTERNAL, fmt.Sprintf("delete error, ResourceURL: %q, ResourceID: %q: %v", resp.ResourceURL, resp.ResourceID, err), ), fmt.Errorf("delete error: %w", err) } - return makeReply(resp.ResourceURL, resp.Nonce, code.Code_OK, ""), nil + return makeACKReply(resp.ResourceURL, resp.Nonce), nil default: var errMsg string @@ -185,7 +166,7 @@ func (s *Server) processResponse( } else { errMsg = fmt.Sprintf("unsupported operation: %d", resp.Operation) } - return makeReply( + return makeNACKReply( resp.ResourceURL, resp.Nonce, code.Code_INVALID_ARGUMENT, @@ -458,7 +439,14 @@ func (s *Server) handleDelete( } } -func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeerstream.ReplicationMessage { +func makeACKReply(resourceURL, nonce string) *pbpeerstream.ReplicationMessage { + return makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{ + ResourceURL: resourceURL, + ResponseNonce: nonce, + }) +} + +func makeNACKReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeerstream.ReplicationMessage { var rpcErr *pbstatus.Status if errCode != code.Code_OK || errMsg != "" { rpcErr = &pbstatus.Status{ @@ -467,14 +455,27 @@ func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbp } } - // TODO: shouldn't this be response? + return makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{ + ResourceURL: resourceURL, + ResponseNonce: nonce, + Error: rpcErr, + }) +} + +// makeReplicationRequest is a convenience method to make a Request-type ReplicationMessage. +func makeReplicationRequest(req *pbpeerstream.ReplicationMessage_Request) *pbpeerstream.ReplicationMessage { return &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ - Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: resourceURL, - Nonce: nonce, - Error: rpcErr, - }, + Request: req, + }, + } +} + +// makeReplicationResponse is a convenience method to make a Response-type ReplicationMessage. +func makeReplicationResponse(resp *pbpeerstream.ReplicationMessage_Response) *pbpeerstream.ReplicationMessage { + return &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Response_{ + Response: resp, }, } } diff --git a/agent/grpc/public/services/peerstream/stream_resources.go b/agent/grpc/public/services/peerstream/stream_resources.go index 17a606e5b1..f85da232f1 100644 --- a/agent/grpc/public/services/peerstream/stream_resources.go +++ b/agent/grpc/public/services/peerstream/stream_resources.go @@ -32,6 +32,9 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes logger.Trace("Started processing request") defer logger.Trace("Finished processing request") + // NOTE: this code should have similar error handling to the new-request + // handling code in HandleStream() + if !s.Backend.IsLeader() { // we are not the leader so we will hang up on the dialer @@ -68,11 +71,14 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes if req.PeerID == "" { return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must specify a PeerID") } - if req.Nonce != "" { + if req.ResponseNonce != "" { return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must not contain a nonce") } + if req.Error != nil { + return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must not contain an error") + } if !pbpeerstream.KnownTypeURL(req.ResourceURL) { - return grpcstatus.Error(codes.InvalidArgument, fmt.Sprintf("subscription request to unknown resource URL: %s", req.ResourceURL)) + return grpcstatus.Errorf(codes.InvalidArgument, "subscription request to unknown resource URL: %s", req.ResourceURL) } _, p, err := s.GetStore().PeeringReadByID(nil, req.PeerID) @@ -88,9 +94,13 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes // TODO(peering): Store subscription request so that an event publisher can separately handle pushing messages for it logger.Info("accepted initial replication request from peer", "peer_id", p.ID) + if p.PeerID != "" { + return grpcstatus.Error(codes.InvalidArgument, "expected PeerID to be empty; the wrong end of peering is being dialed") + } + streamReq := HandleStreamRequest{ LocalID: p.ID, - RemoteID: p.PeerID, + RemoteID: "", PeerName: p.Name, Partition: p.Partition, Stream: stream, @@ -123,6 +133,10 @@ type HandleStreamRequest struct { Stream BidirectionalStream } +func (r HandleStreamRequest) WasDialed() bool { + return r.RemoteID == "" +} + // DrainStream attempts to gracefully drain the stream when the connection is going to be torn down. // Tearing down the connection too quickly can lead our peer receiving a context cancellation error before the stream termination message. // Handling the termination message is important to set the expectation that the peering will not be reestablished unless recreated. @@ -143,18 +157,21 @@ func (s *Server) DrainStream(req HandleStreamRequest) { // The localID provided is the locally-generated identifier for the peering. // The remoteID is an identifier that the remote peer recognizes for the peering. -func (s *Server) HandleStream(req HandleStreamRequest) error { +func (s *Server) HandleStream(streamReq HandleStreamRequest) error { // TODO: pass logger down from caller? - logger := s.Logger.Named("stream").With("peer_name", req.PeerName, "peer_id", req.LocalID) + logger := s.Logger.Named("stream"). + With("peer_name", streamReq.PeerName). + With("peer_id", streamReq.LocalID). + With("dialed", streamReq.WasDialed()) logger.Trace("handling stream for peer") - status, err := s.Tracker.Connected(req.LocalID) + status, err := s.Tracker.Connected(streamReq.LocalID) if err != nil { return fmt.Errorf("failed to register stream: %v", err) } // TODO(peering) Also need to clear subscriptions associated with the peer - defer s.Tracker.Disconnected(req.LocalID) + defer s.Tracker.Disconnected(streamReq.LocalID) var trustDomain string if s.ConnectEnabled { @@ -167,26 +184,22 @@ func (s *Server) HandleStream(req HandleStreamRequest) error { } mgr := newSubscriptionManager( - req.Stream.Context(), + streamReq.Stream.Context(), logger, s.Config, trustDomain, s.Backend, s.GetStore, ) - subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.PeerName, req.Partition) + subCh := mgr.subscribe(streamReq.Stream.Context(), streamReq.LocalID, streamReq.PeerName, streamReq.Partition) - sub := &pbpeerstream.ReplicationMessage{ - Payload: &pbpeerstream.ReplicationMessage_Request_{ - Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - PeerID: req.RemoteID, - }, - }, - } + sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{ + ResourceURL: pbpeerstream.TypeURLService, + PeerID: streamReq.RemoteID, + }) logTraceSend(logger, sub) - if err := req.Stream.Send(sub); err != nil { + if err := streamReq.Stream.Send(sub); err != nil { if err == io.EOF { logger.Info("stream ended by peer") status.TrackReceiveError(err.Error()) @@ -202,7 +215,7 @@ func (s *Server) HandleStream(req HandleStreamRequest) error { go func() { defer close(recvChan) for { - msg, err := req.Stream.Recv() + msg, err := streamReq.Stream.Recv() if err == nil { logTraceRecv(logger, msg) recvChan <- msg @@ -233,13 +246,13 @@ func (s *Server) HandleStream(req HandleStreamRequest) error { } logTraceSend(logger, term) - if err := req.Stream.Send(term); err != nil { + if err := streamReq.Stream.Send(term); err != nil { status.TrackSendError(err.Error()) return fmt.Errorf("failed to send to stream: %v", err) } logger.Trace("deleting stream status") - s.Tracker.DeleteStatus(req.LocalID) + s.Tracker.DeleteStatus(streamReq.LocalID) return nil @@ -249,6 +262,9 @@ func (s *Server) HandleStream(req HandleStreamRequest) error { return nil } + // NOTE: this code should have similar error handling to the + // initial handling code in StreamResources() + if !s.Backend.IsLeader() { // we are not the leader anymore so we will hang up on the dialer logger.Error("node is not a leader anymore; cannot continue streaming") @@ -265,8 +281,11 @@ func (s *Server) HandleStream(req HandleStreamRequest) error { } if req := msg.GetRequest(); req != nil { + if !pbpeerstream.KnownTypeURL(req.ResourceURL) { + return grpcstatus.Errorf(codes.InvalidArgument, "subscription request to unknown resource URL: %s", req.ResourceURL) + } switch { - case req.Nonce == "": + case req.ResponseNonce == "": // TODO(peering): This can happen on a client peer since they don't try to receive subscriptions before entering HandleStream. // Should change that behavior or only allow it that one time. @@ -283,7 +302,7 @@ func (s *Server) HandleStream(req HandleStreamRequest) error { if resp := msg.GetResponse(); resp != nil { // TODO(peering): Ensure there's a nonce - reply, err := s.processResponse(req.PeerName, req.Partition, resp) + reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, resp) if err != nil { logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID) status.TrackReceiveError(err.Error()) @@ -292,7 +311,7 @@ func (s *Server) HandleStream(req HandleStreamRequest) error { } logTraceSend(logger, reply) - if err := req.Stream.Send(reply); err != nil { + if err := streamReq.Stream.Send(reply); err != nil { status.TrackSendError(err.Error()) return fmt.Errorf("failed to send to stream: %v", err) } @@ -304,23 +323,33 @@ func (s *Server) HandleStream(req HandleStreamRequest) error { logger.Info("peering was deleted by our peer: marking peering as terminated and cleaning up imported resources") // Once marked as terminated, a separate deferred deletion routine will clean up imported resources. - if err := s.Backend.PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: req.LocalID}); err != nil { + if err := s.Backend.PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: streamReq.LocalID}); err != nil { logger.Error("failed to mark peering as terminated: %w", err) } return nil } case update := <-subCh: - var resp *pbpeerstream.ReplicationMessage + var resp *pbpeerstream.ReplicationMessage_Response switch { case strings.HasPrefix(update.CorrelationID, subExportedService): - resp = makeServiceResponse(logger, update) + resp, err = makeServiceResponse(logger, update) + if err != nil { + // Log the error and skip this response to avoid locking up peering due to a bad update event. + logger.Error("failed to create service response", "error", err) + continue + } case strings.HasPrefix(update.CorrelationID, subMeshGateway): // TODO(Peering): figure out how to sync this separately case update.CorrelationID == subCARoot: - resp = makeCARootsResponse(logger, update) + resp, err = makeCARootsResponse(logger, update) + if err != nil { + // Log the error and skip this response to avoid locking up peering due to a bad update event. + logger.Error("failed to create ca roots response", "error", err) + continue + } default: logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID) @@ -329,8 +358,11 @@ func (s *Server) HandleStream(req HandleStreamRequest) error { if resp == nil { continue } - logTraceSend(logger, resp) - if err := req.Stream.Send(resp); err != nil { + + replResp := makeReplicationResponse(resp) + + logTraceSend(logger, replResp) + if err := streamReq.Stream.Send(replResp); err != nil { status.TrackSendError(err.Error()) return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err) } diff --git a/agent/grpc/public/services/peerstream/stream_test.go b/agent/grpc/public/services/peerstream/stream_test.go index 8bf644c5bd..de1455a632 100644 --- a/agent/grpc/public/services/peerstream/stream_test.go +++ b/agent/grpc/public/services/peerstream/stream_test.go @@ -109,7 +109,8 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { } }() - p := writeEstablishedPeering(t, store, 1, "my-peer") + p := writePeeringToBeDialed(t, store, 1, "my-peer") + require.Empty(t, p.PeerID, "should be empty if being dialed") peerID := p.ID // Set the initial roots and CA configuration. @@ -139,8 +140,8 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { input2 := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - Nonce: "1", + ResourceURL: pbpeerstream.TypeURLService, + ResponseNonce: "1", }, }, } @@ -225,8 +226,8 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) { input: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - PeerID: "63b60245-c475-426b-b314-4588d210859d", - Nonce: "1", + PeerID: "63b60245-c475-426b-b314-4588d210859d", + ResponseNonce: "1", }, }, }, @@ -275,16 +276,14 @@ func TestStreamResources_Server_Terminate(t *testing.T) { c.Tracker.SetClock(it.Now) }) - p := writeEstablishedPeering(t, store, 1, "my-peer") - var ( - peerID = p.ID // for Send - remotePeerID = p.PeerID // for Recv - ) + p := writePeeringToBeDialed(t, store, 1, "my-peer") + require.Empty(t, p.PeerID, "should be empty if being dialed") + peerID := p.ID // Set the initial roots and CA configuration. _, _ = writeInitialRootsAndCA(t, store) - client := makeClient(t, srv, peerID, remotePeerID) + client := makeClient(t, srv, peerID) // TODO(peering): test fails if we don't drain the stream with this call because the // server gets blocked sending the termination message. Figure out a way to let @@ -334,13 +333,11 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { // Set the initial roots and CA configuration. _, rootA := writeInitialRootsAndCA(t, store) - p := writeEstablishedPeering(t, store, 1, "my-peer") - var ( - peerID = p.ID // for Send - remotePeerID = p.PeerID // for Recv - ) + p := writePeeringToBeDialed(t, store, 1, "my-peer") + require.Empty(t, p.PeerID, "should be empty if being dialed") + peerID := p.ID - client := makeClient(t, srv, peerID, remotePeerID) + client := makeClient(t, srv, peerID) testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) { retry.Run(t, func(r *retry.R) { @@ -357,9 +354,9 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { ack := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - PeerID: peerID, - ResourceURL: pbpeerstream.TypeURLService, - Nonce: "1", + PeerID: peerID, + ResourceURL: pbpeerstream.TypeURLService, + ResponseNonce: "1", // Acks do not have an Error populated in the request }, @@ -390,9 +387,9 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { nack := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - PeerID: peerID, - ResourceURL: pbpeerstream.TypeURLService, - Nonce: "2", + PeerID: peerID, + ResourceURL: pbpeerstream.TypeURLService, + ResponseNonce: "2", Error: &pbstatus.Status{ Code: int32(code.Code_UNAVAILABLE), Message: "bad bad not good", @@ -463,8 +460,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expectAck := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - Nonce: "21", + ResourceURL: pbpeerstream.TypeURLService, + ResponseNonce: "21", }, }, } @@ -513,8 +510,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expectNack := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - Nonce: "24", + ResourceURL: pbpeerstream.TypeURLService, + ResponseNonce: "24", Error: &pbstatus.Status{ Code: int32(code.Code_INVALID_ARGUMENT), Message: `unsupported operation: "OPERATION_UNSPECIFIED"`, @@ -577,12 +574,13 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { // Create a peering var lastIdx uint64 = 1 - p := writeEstablishedPeering(t, store, lastIdx, "my-peering") + p := writePeeringToBeDialed(t, store, lastIdx, "my-peering") + require.Empty(t, p.PeerID, "should be empty if being dialed") // Set the initial roots and CA configuration. _, _ = writeInitialRootsAndCA(t, store) - client := makeClient(t, srv, p.ID, p.PeerID) + client := makeClient(t, srv, p.ID) // Register a service that is not yet exported mysql := &structs.CheckServiceNode{ @@ -800,12 +798,13 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) { // Create a peering var lastIdx uint64 = 1 - p := writeEstablishedPeering(t, store, lastIdx, "my-peering") + p := writePeeringToBeDialed(t, store, lastIdx, "my-peering") + require.Empty(t, p.PeerID, "should be empty if being dialed") // Set the initial roots and CA configuration. clusterID, rootA := writeInitialRootsAndCA(t, store) - client := makeClient(t, srv, p.ID, p.PeerID) + client := makeClient(t, srv, p.ID) testutil.RunStep(t, "initial CA Roots replication", func(t *testing.T) { expectReplEvents(t, client, @@ -856,12 +855,7 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) { // makeClient sets up a *MockClient with the initial subscription // message handshake. -func makeClient( - t *testing.T, - srv pbpeerstream.PeerStreamServiceServer, - peerID string, - remotePeerID string, -) *MockClient { +func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID string) *MockClient { t.Helper() client := NewMockClient(context.Background()) @@ -896,7 +890,10 @@ func makeClient( Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ ResourceURL: pbpeerstream.TypeURLService, - PeerID: remotePeerID, + // The PeerID field is only set for the messages coming FROM + // the establishing side and are going to be empty from the + // other side. + PeerID: "", }, }, } @@ -1003,8 +1000,8 @@ func Test_processResponse_Validation(t *testing.T) { expect: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - Nonce: "1", + ResourceURL: pbpeerstream.TypeURLService, + ResponseNonce: "1", }, }, }, @@ -1021,8 +1018,8 @@ func Test_processResponse_Validation(t *testing.T) { expect: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - Nonce: "1", + ResourceURL: pbpeerstream.TypeURLService, + ResponseNonce: "1", }, }, }, @@ -1038,8 +1035,8 @@ func Test_processResponse_Validation(t *testing.T) { expect: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: "nomad.Job", - Nonce: "1", + ResourceURL: "nomad.Job", + ResponseNonce: "1", Error: &pbstatus.Status{ Code: int32(code.Code_INVALID_ARGUMENT), Message: `received response for unknown resource type "nomad.Job"`, @@ -1059,8 +1056,8 @@ func Test_processResponse_Validation(t *testing.T) { expect: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - Nonce: "1", + ResourceURL: pbpeerstream.TypeURLService, + ResponseNonce: "1", Error: &pbstatus.Status{ Code: int32(code.Code_INVALID_ARGUMENT), Message: `unsupported operation: "OPERATION_UNSPECIFIED"`, @@ -1080,8 +1077,8 @@ func Test_processResponse_Validation(t *testing.T) { expect: &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ Request: &pbpeerstream.ReplicationMessage_Request{ - ResourceURL: pbpeerstream.TypeURLService, - Nonce: "1", + ResourceURL: pbpeerstream.TypeURLService, + ResponseNonce: "1", Error: &pbstatus.Status{ Code: int32(code.Code_INVALID_ARGUMENT), Message: `unsupported operation: 100000`, @@ -1099,12 +1096,21 @@ func Test_processResponse_Validation(t *testing.T) { } } -// writeEstablishedPeering creates a peering with the provided name and ensures +// writePeeringToDialFrom creates a peering with the provided name and ensures // the PeerID field is set for the ID of the remote peer. -func writeEstablishedPeering(t *testing.T, store *state.Store, idx uint64, peerName string) *pbpeering.Peering { +func writePeeringToDialFrom(t *testing.T, store *state.Store, idx uint64, peerName string) *pbpeering.Peering { remotePeerID, err := uuid.GenerateUUID() require.NoError(t, err) + return writeTestPeering(t, store, idx, peerName, remotePeerID) +} +// writePeeringToBeDialed creates a peering with the provided name and ensures +// the PeerID field is NOT set for the ID of the remote peer. +func writePeeringToBeDialed(t *testing.T, store *state.Store, idx uint64, peerName string) *pbpeering.Peering { + return writeTestPeering(t, store, idx, peerName, "") +} + +func writeTestPeering(t *testing.T, store *state.Store, idx uint64, peerName, remotePeerID string) *pbpeering.Peering { peering := pbpeering.Peering{ ID: testUUID(t), Name: peerName, @@ -1187,7 +1193,7 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test if reqA.ResourceURL != reqB.ResourceURL { return reqA.ResourceURL < reqB.ResourceURL } - return reqA.Nonce < reqB.Nonce + return reqA.ResponseNonce < reqB.ResponseNonce case *pbpeerstream.ReplicationMessage_Response_: respA, respB := a.GetResponse(), b.GetResponse() diff --git a/proto/pbpeerstream/peerstream.pb.go b/proto/pbpeerstream/peerstream.pb.go index 12afdd2d9f..17d20ff4e8 100644 --- a/proto/pbpeerstream/peerstream.pb.go +++ b/proto/pbpeerstream/peerstream.pb.go @@ -229,11 +229,11 @@ type ReplicationMessage_Request struct { // An identifier for the peer making the request. // This identifier is provisioned by the serving peer prior to the request from the dialing peer. PeerID string `protobuf:"bytes,1,opt,name=PeerID,proto3" json:"PeerID,omitempty"` - // Nonce corresponding to that of the response being ACKed or NACKed. + // ResponseNonce corresponding to that of the response being ACKed or NACKed. // Initial subscription requests will have an empty nonce. // The nonce is generated and incremented by the exporting peer. // TODO - Nonce string `protobuf:"bytes,2,opt,name=Nonce,proto3" json:"Nonce,omitempty"` + ResponseNonce string `protobuf:"bytes,2,opt,name=ResponseNonce,proto3" json:"ResponseNonce,omitempty"` // The type URL for the resource being requested or ACK/NACKed. ResourceURL string `protobuf:"bytes,3,opt,name=ResourceURL,proto3" json:"ResourceURL,omitempty"` // The error if the previous response was not applied successfully. @@ -280,9 +280,9 @@ func (x *ReplicationMessage_Request) GetPeerID() string { return "" } -func (x *ReplicationMessage_Request) GetNonce() string { +func (x *ReplicationMessage_Request) GetResponseNonce() string { if x != nil { - return x.Nonce + return x.ResponseNonce } return "" } @@ -436,7 +436,7 @@ var file_proto_pbpeerstream_peerstream_proto_rawDesc = []byte{ 0x6d, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2f, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd2, 0x04, 0x0a, 0x12, 0x52, 0x65, + 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe3, 0x04, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x42, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, @@ -451,54 +451,55 @@ var file_proto_pbpeerstream_peerstream_proto_rawDesc = []byte{ 0x29, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x48, 0x00, 0x52, 0x0a, 0x74, 0x65, - 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x1a, 0x7f, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x65, 0x65, 0x72, 0x49, 0x44, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x65, 0x65, 0x72, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x4e, - 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x4e, 0x6f, 0x6e, 0x63, - 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x55, 0x52, 0x4c, 0x12, 0x24, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x1a, 0xc9, 0x01, 0x0a, 0x08, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, - 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x1e, - 0x0a, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x12, 0x30, - 0x0a, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x12, 0x33, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, - 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x6f, 0x70, 0x65, 0x72, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x0c, 0x0a, 0x0a, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, - 0x74, 0x65, 0x64, 0x42, 0x09, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x29, - 0x0a, 0x0d, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, - 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x2a, 0x52, 0x0a, 0x09, 0x4f, 0x70, 0x65, - 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a, 0x15, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, - 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, - 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, - 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, - 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x02, 0x32, 0x6a, 0x0a, - 0x11, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x12, 0x55, 0x0a, 0x0f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x99, 0x01, 0x0a, 0x0e, 0x63, 0x6f, - 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x0f, 0x50, 0x65, - 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, - 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, - 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xa2, - 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xe2, - 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5c, 0x47, 0x50, 0x42, - 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x1a, 0x8f, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x65, 0x65, 0x72, 0x49, 0x44, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x65, 0x65, 0x72, 0x49, 0x44, 0x12, 0x24, 0x0a, 0x0d, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4e, 0x6f, 0x6e, + 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, + 0x4c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x55, 0x52, 0x4c, 0x12, 0x24, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x1a, 0xc9, 0x01, 0x0a, 0x08, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, + 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, + 0x1e, 0x0a, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x12, + 0x30, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x12, 0x33, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x6f, 0x70, 0x65, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x0c, 0x0a, 0x0a, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, + 0x61, 0x74, 0x65, 0x64, 0x42, 0x09, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, + 0x29, 0x0a, 0x0d, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, + 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x2a, 0x52, 0x0a, 0x09, 0x4f, 0x70, + 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a, 0x15, 0x4f, 0x50, 0x45, 0x52, 0x41, + 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, + 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, + 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, + 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x02, 0x32, 0x6a, + 0x0a, 0x11, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x12, 0x55, 0x0a, 0x0f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x99, 0x01, 0x0a, 0x0e, 0x63, + 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x0f, 0x50, + 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, + 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, + 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5c, 0x47, 0x50, + 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/pbpeerstream/peerstream.proto b/proto/pbpeerstream/peerstream.proto index 4ab0f6798a..47ea8f1ae5 100644 --- a/proto/pbpeerstream/peerstream.proto +++ b/proto/pbpeerstream/peerstream.proto @@ -32,11 +32,11 @@ message ReplicationMessage { // This identifier is provisioned by the serving peer prior to the request from the dialing peer. string PeerID = 1; - // Nonce corresponding to that of the response being ACKed or NACKed. + // ResponseNonce corresponding to that of the response being ACKed or NACKed. // Initial subscription requests will have an empty nonce. // The nonce is generated and incremented by the exporting peer. // TODO - string Nonce = 2; + string ResponseNonce = 2; // The type URL for the resource being requested or ACK/NACKed. string ResourceURL = 3;