From 4e215dc4114df0a03c26b8902e4e2ab71d63894a Mon Sep 17 00:00:00 2001 From: Freddy Date: Thu, 12 May 2022 15:04:44 -0600 Subject: [PATCH] [OSS] Add upsert handling for receiving CheckServiceNode (#13061) --- acl/enterprisemeta_oss.go | 4 + agent/consul/fsm/snapshot_oss.go | 12 +- agent/consul/leader_peering.go | 9 +- agent/consul/peering_backend.go | 5 + agent/rpc/peering/service.go | 164 ++++++++++-- agent/rpc/peering/service_test.go | 357 +++++++++++++++++++++++++ agent/rpc/peering/stream_test.go | 86 ++++-- agent/rpc/peering/subscription_view.go | 2 + agent/rpc/peering/testing.go | 60 ++--- agent/structs/structs.go | 14 + 10 files changed, 628 insertions(+), 85 deletions(-) diff --git a/acl/enterprisemeta_oss.go b/acl/enterprisemeta_oss.go index f0f15bc05f..44075a44f7 100644 --- a/acl/enterprisemeta_oss.go +++ b/acl/enterprisemeta_oss.go @@ -100,6 +100,10 @@ func (m *EnterpriseMeta) UnsetPartition() { // do nothing } +func (m *EnterpriseMeta) OverridePartition(_ string) { + // do nothing +} + func NewEnterpriseMetaWithPartition(_, _ string) EnterpriseMeta { return emptyEnterpriseMeta } diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index 3ee4c85580..a58f0cf1db 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -112,17 +112,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink, n := node.(*structs.Node) nodeEntMeta := n.GetEnterpriseMeta() - req := structs.RegisterRequest{ - ID: n.ID, - Node: n.Node, - Datacenter: n.Datacenter, - Address: n.Address, - TaggedAddresses: n.TaggedAddresses, - NodeMeta: n.Meta, - RaftIndex: n.RaftIndex, - EnterpriseMeta: *nodeEntMeta, - PeerName: n.PeerName, - } + req := n.ToRegisterRequest() // Register the node itself if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index d1dfc8c432..c0f69b1686 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -8,6 +8,7 @@ import ( "fmt" "net" + "github.com/hashicorp/consul/agent/rpc/peering" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" @@ -207,7 +208,13 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer return err } - err = s.peeringService.HandleStream(peer.ID, peer.PeerID, stream) + err = s.peeringService.HandleStream(peering.HandleStreamRequest{ + LocalID: peer.ID, + RemoteID: peer.PeerID, + PeerName: peer.Name, + Partition: peer.Partition, + Stream: stream, + }) if err == nil { // This will cancel the retry-er context, letting us break out of this loop when we want to shut down the stream. cancel() diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 7e8c698c8e..85123e15e4 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -123,4 +123,9 @@ func (a *peeringApply) PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDR return err } +func (a *peeringApply) CatalogRegister(req *structs.RegisterRequest) error { + _, err := a.srv.leaderRaftApply("Catalog.Register", structs.RegisterRequestType, req) + return err +} + var _ peering.Apply = (*peeringApply)(nil) diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index a2dbf53c77..bba4114c8d 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbstatus" + "github.com/hashicorp/consul/types" ) var ( @@ -105,6 +106,7 @@ type Backend interface { // Store provides a read-only interface for querying Peering data. type Store interface { PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error) + PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error) AbandonCh() <-chan struct{} @@ -115,6 +117,7 @@ type Apply interface { PeeringWrite(req *pbpeering.PeeringWriteRequest) error PeeringDelete(req *pbpeering.PeeringDeleteRequest) error PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error + CatalogRegister(req *structs.RegisterRequest) error } // GenerateToken implements the PeeringService RPC method to generate a @@ -405,44 +408,75 @@ func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResource return grpcstatus.Error(codes.InvalidArgument, fmt.Sprintf("subscription request to unknown resource URL: %s", req.ResourceURL)) } - // TODO(peering): Validate that a peering exists for this peer + _, p, err := s.Backend.Store().PeeringReadByID(nil, req.PeerID) + if err != nil { + s.logger.Error("failed to look up peer", "peer_id", req.PeerID, "error", err) + return grpcstatus.Error(codes.Internal, "failed to find PeerID: "+req.PeerID) + } + if p == nil { + return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID) + } + // TODO(peering): If the peering is marked as deleted, send a Terminated message and return // TODO(peering): Store subscription request so that an event publisher can separately handle pushing messages for it s.logger.Info("accepted initial replication request from peer", "peer_id", req.PeerID) // For server peers both of these ID values are the same, because we generated a token with a local ID, // and the client peer dials using that same ID. - return s.HandleStream(req.PeerID, req.PeerID, stream) + return s.HandleStream(HandleStreamRequest{ + LocalID: req.PeerID, + RemoteID: req.PeerID, + PeerName: p.Name, + Partition: p.Partition, + Stream: stream, + }) +} + +type HandleStreamRequest struct { + // LocalID is the UUID for the peering in the local Consul datacenter. + LocalID string + + // RemoteID is the UUID for the peering from the perspective of the peer. + RemoteID string + + // PeerName is the name of the peering. + PeerName string + + // Partition is the local partition associated with the peer. + Partition string + + // Stream is the open stream to the peer cluster. + Stream BidirectionalStream } // The localID provided is the locally-generated identifier for the peering. // The remoteID is an identifier that the remote peer recognizes for the peering. -func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStream) error { - logger := s.logger.Named("stream").With("peer_id", localID) +func (s *Service) HandleStream(req HandleStreamRequest) error { + logger := s.logger.Named("stream").With("peer_id", req.LocalID) logger.Trace("handling stream for peer") - status, err := s.streams.connected(localID) + status, err := s.streams.connected(req.LocalID) if err != nil { return fmt.Errorf("failed to register stream: %v", err) } // TODO(peering) Also need to clear subscriptions associated with the peer - defer s.streams.disconnected(localID) + defer s.streams.disconnected(req.LocalID) - mgr := newSubscriptionManager(stream.Context(), logger, s.Backend) - subCh := mgr.subscribe(stream.Context(), localID) + mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend) + subCh := mgr.subscribe(req.Stream.Context(), req.LocalID) sub := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ Request: &pbpeering.ReplicationMessage_Request{ ResourceURL: pbpeering.TypeURLService, - PeerID: remoteID, + PeerID: req.RemoteID, }, }, } logTraceSend(logger, sub) - if err := stream.Send(sub); err != nil { + if err := req.Stream.Send(sub); err != nil { if err == io.EOF { logger.Info("stream ended by peer") status.trackReceiveError(err.Error()) @@ -458,7 +492,7 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr go func() { defer close(recvChan) for { - msg, err := stream.Recv() + msg, err := req.Stream.Recv() if err == io.EOF { logger.Info("stream ended by peer") status.trackReceiveError(err.Error()) @@ -494,13 +528,13 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr } logTraceSend(logger, term) - if err := stream.Send(term); err != nil { + if err := req.Stream.Send(term); err != nil { status.trackSendError(err.Error()) return fmt.Errorf("failed to send to stream: %v", err) } logger.Trace("deleting stream status") - s.streams.deleteStatus(localID) + s.streams.deleteStatus(req.LocalID) return nil @@ -528,7 +562,8 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr } if resp := msg.GetResponse(); resp != nil { - req, err := processResponse(resp) + // TODO(peering): Ensure there's a nonce + reply, err := s.processResponse(req.PeerName, req.Partition, resp) if err != nil { logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID) status.trackReceiveError(err.Error()) @@ -536,8 +571,8 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr status.trackReceiveSuccess() } - logTraceSend(logger, req) - if err := stream.Send(req); err != nil { + logTraceSend(logger, reply) + if err := req.Stream.Send(reply); err != nil { status.trackSendError(err.Error()) return fmt.Errorf("failed to send to stream: %v", err) } @@ -549,7 +584,7 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr logger.Info("received peering termination message, cleaning up imported resources") // Once marked as terminated, a separate deferred deletion routine will clean up imported resources. - if err := s.Backend.Apply().PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: localID}); err != nil { + if err := s.Backend.Apply().PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: req.LocalID}); err != nil { return err } return nil @@ -558,7 +593,7 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr case update := <-subCh: switch { case strings.HasPrefix(update.CorrelationID, subExportedService): - if err := pushServiceResponse(logger, stream, status, update); err != nil { + if err := pushServiceResponse(logger, req.Stream, status, update); err != nil { return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err) } @@ -667,7 +702,7 @@ func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbp return msg } -func processResponse(resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) { +func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) { var ( err error errCode code.Code @@ -682,7 +717,10 @@ func processResponse(resp *pbpeering.ReplicationMessage_Response) (*pbpeering.Re switch resp.Operation { case pbpeering.ReplicationMessage_Response_UPSERT: - err = handleUpsert(resp.ResourceURL, resp.Resource) + if resp.Resource == nil { + break + } + err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource) if err != nil { errCode = code.Code_INTERNAL errMsg = err.Error() @@ -710,8 +748,90 @@ func processResponse(resp *pbpeering.ReplicationMessage_Response) (*pbpeering.Re return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err } -func handleUpsert(resourceURL string, resource *anypb.Any) error { - // TODO(peering): implement +func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error { + csn := &pbservice.IndexedCheckServiceNodes{} + err := ptypes.UnmarshalAny(resource, csn) + if err != nil { + return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + if csn == nil || len(csn.Nodes) == 0 { + return nil + } + + type checkTuple struct { + checkID types.CheckID + serviceID string + nodeID types.NodeID + + acl.EnterpriseMeta + } + + var ( + nodes = make(map[types.NodeID]*structs.Node) + services = make(map[types.NodeID][]*structs.NodeService) + checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck) + ) + + for _, pbinstance := range csn.Nodes { + instance, err := pbservice.CheckServiceNodeToStructs(pbinstance) + if err != nil { + return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + + nodes[instance.Node.ID] = instance.Node + services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service) + + if _, ok := checks[instance.Node.ID]; !ok { + checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck) + } + for _, c := range instance.Checks { + tuple := checkTuple{ + checkID: c.CheckID, + serviceID: c.ServiceID, + nodeID: instance.Node.ID, + EnterpriseMeta: c.EnterpriseMeta, + } + checks[instance.Node.ID][tuple] = c + } + } + + for nodeID, node := range nodes { + // For all nodes, services, and checks we override the peer name and partition to be + // the local partition and local name for the peer. + node.PeerName, node.Partition = peerName, partition + + // First register the node + req := node.ToRegisterRequest() + if err := s.Backend.Apply().CatalogRegister(&req); err != nil { + return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + + // Then register all services on that node + for _, svc := range services[nodeID] { + svc.PeerName = peerName + svc.OverridePartition(partition) + + req.Service = svc + if err := s.Backend.Apply().CatalogRegister(&req); err != nil { + return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + } + req.Service = nil + + // Then register all checks on that node + var chks structs.HealthChecks + for _, c := range checks[nodeID] { + c.PeerName = peerName + c.OverridePartition(partition) + + chks = append(chks, c) + } + + req.Checks = chks + if err := s.Backend.Apply().CatalogRegister(&req); err != nil { + return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + } return nil } diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index bfb5a8f3c6..728d9380aa 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -11,14 +11,18 @@ import ( "testing" "time" + "github.com/golang/protobuf/ptypes" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" gogrpc "google.golang.org/grpc" + "github.com/hashicorp/consul/agent/consul/state" grpc "github.com/hashicorp/consul/agent/grpc/private" "github.com/hashicorp/consul/agent/grpc/private/resolver" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/acl" @@ -307,6 +311,359 @@ func TestPeeringService_List(t *testing.T) { prototest.AssertDeepEqual(t, expect, resp) } +func Test_StreamHandler_UpsertServices(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + type testCase struct { + name string + msg *pbpeering.ReplicationMessage_Response + input structs.CheckServiceNodes + expect structs.CheckServiceNodes + } + + s := newTestServer(t, nil) + testrpc.WaitForLeader(t, s.Server.RPC, "dc1") + + srv := peering.NewService(testutil.Logger(t), consul.NewPeeringBackend(s.Server, nil)) + + require.NoError(t, s.Server.FSM().State().PeeringWrite(0, &pbpeering.Peering{ + Name: "my-peer", + })) + + _, p, err := s.Server.FSM().State().PeeringRead(nil, state.Query{Value: "my-peer"}) + require.NoError(t, err) + + client := peering.NewMockClient(context.Background()) + + errCh := make(chan error, 1) + client.ErrCh = errCh + + go func() { + // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv(). + // This matches gRPC's behavior when an error is returned by a server. + err := srv.StreamResources(client.ReplicationStream) + if err != nil { + errCh <- err + } + }() + + sub := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Request_{ + Request: &pbpeering.ReplicationMessage_Request{ + PeerID: p.ID, + ResourceURL: pbpeering.TypeURLService, + }, + }, + } + require.NoError(t, client.Send(sub)) + + // Receive subscription request from peer for our services + _, err = client.Recv() + require.NoError(t, err) + + remoteEntMeta := structs.DefaultEnterpriseMetaInPartition("remote-partition") + localEntMeta := acl.DefaultEnterpriseMeta() + localPeerName := "my-peer" + + // Scrub data we don't need for the assertions below. + scrubCheckServiceNodes := func(instances structs.CheckServiceNodes) { + for _, csn := range instances { + csn.Node.RaftIndex = structs.RaftIndex{} + + csn.Service.TaggedAddresses = nil + csn.Service.Weights = nil + csn.Service.RaftIndex = structs.RaftIndex{} + csn.Service.Proxy = structs.ConnectProxyConfig{} + + for _, c := range csn.Checks { + c.RaftIndex = structs.RaftIndex{} + c.Definition = structs.HealthCheckDefinition{} + } + } + } + + run := func(t *testing.T, tc testCase) { + pbCSN := &pbservice.IndexedCheckServiceNodes{} + for _, csn := range tc.input { + pbCSN.Nodes = append(pbCSN.Nodes, pbservice.NewCheckServiceNodeFromStructs(&csn)) + } + + any, err := ptypes.MarshalAny(pbCSN) + require.NoError(t, err) + tc.msg.Resource = any + + resp := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Response_{ + Response: tc.msg, + }, + } + require.NoError(t, client.Send(resp)) + + msg, err := client.RecvWithTimeout(1 * time.Second) + require.NoError(t, err) + + req := msg.GetRequest() + require.NotNil(t, req) + require.Equal(t, tc.msg.Nonce, req.Nonce) + require.Nil(t, req.Error) + + _, got, err := s.Server.FSM().State().CombinedCheckServiceNodes(nil, structs.NewServiceName("api", nil), localPeerName) + require.NoError(t, err) + + scrubCheckServiceNodes(got) + require.Equal(t, tc.expect, got) + } + + // NOTE: These test cases do not run against independent state stores, they show sequential updates for a given service. + // Every new upsert must replace the data from the previous case. + tt := []testCase{ + { + name: "upsert an instance on a node", + msg: &pbpeering.ReplicationMessage_Response{ + ResourceURL: pbpeering.TypeURLService, + ResourceID: "api", + Nonce: "1", + Operation: pbpeering.ReplicationMessage_Response_UPSERT, + }, + input: structs.CheckServiceNodes{ + { + Node: &structs.Node{ + ID: "112e2243-ab62-4e8a-9317-63306972183c", + Node: "node-1", + Address: "10.0.0.1", + Datacenter: "dc1", + Partition: remoteEntMeta.PartitionOrEmpty(), + }, + Service: &structs.NodeService{ + Kind: "", + ID: "api-1", + Service: "api", + Port: 8080, + EnterpriseMeta: *remoteEntMeta, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-1-check", + Node: "node-1", + Status: api.HealthPassing, + EnterpriseMeta: *remoteEntMeta, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + ServiceName: "api", + Node: "node-1", + Status: api.HealthCritical, + EnterpriseMeta: *remoteEntMeta, + }, + }, + }, + }, + expect: structs.CheckServiceNodes{ + { + Node: &structs.Node{ + ID: "112e2243-ab62-4e8a-9317-63306972183c", + Node: "node-1", + Address: "10.0.0.1", + Datacenter: "dc1", + Partition: localEntMeta.PartitionOrEmpty(), + PeerName: localPeerName, + }, + Service: &structs.NodeService{ + Kind: "", + ID: "api-1", + Service: "api", + Port: 8080, + EnterpriseMeta: *localEntMeta, + PeerName: localPeerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-1-check", + Node: "node-1", + Status: api.HealthPassing, + EnterpriseMeta: *localEntMeta, + PeerName: localPeerName, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + ServiceName: "api", + Node: "node-1", + Status: api.HealthCritical, + EnterpriseMeta: *localEntMeta, + PeerName: localPeerName, + }, + }, + }, + }, + }, + { + name: "upsert two instances on the same node", + msg: &pbpeering.ReplicationMessage_Response{ + ResourceURL: pbpeering.TypeURLService, + ResourceID: "api", + Nonce: "2", + Operation: pbpeering.ReplicationMessage_Response_UPSERT, + }, + input: structs.CheckServiceNodes{ + { + Node: &structs.Node{ + ID: "112e2243-ab62-4e8a-9317-63306972183c", + Node: "node-1", + Address: "10.0.0.1", + Datacenter: "dc1", + Partition: remoteEntMeta.PartitionOrEmpty(), + }, + Service: &structs.NodeService{ + Kind: "", + ID: "api-1", + Service: "api", + Port: 8080, + EnterpriseMeta: *remoteEntMeta, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-1-check", + Node: "node-1", + Status: api.HealthPassing, + EnterpriseMeta: *remoteEntMeta, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + ServiceName: "api", + Node: "node-1", + Status: api.HealthCritical, + EnterpriseMeta: *remoteEntMeta, + }, + }, + }, + { + Node: &structs.Node{ + ID: "112e2243-ab62-4e8a-9317-63306972183c", + Node: "node-1", + Address: "10.0.0.1", + Datacenter: "dc1", + Partition: remoteEntMeta.PartitionOrEmpty(), + }, + Service: &structs.NodeService{ + Kind: "", + ID: "api-2", + Service: "api", + Port: 9090, + EnterpriseMeta: *remoteEntMeta, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-1-check", + Node: "node-1", + Status: api.HealthPassing, + EnterpriseMeta: *remoteEntMeta, + }, + { + CheckID: "api-2-check", + ServiceID: "api-2", + ServiceName: "api", + Node: "node-1", + Status: api.HealthWarning, + EnterpriseMeta: *remoteEntMeta, + }, + }, + }, + }, + expect: structs.CheckServiceNodes{ + { + Node: &structs.Node{ + ID: "112e2243-ab62-4e8a-9317-63306972183c", + Node: "node-1", + Address: "10.0.0.1", + Datacenter: "dc1", + Partition: localEntMeta.PartitionOrEmpty(), + PeerName: localPeerName, + }, + Service: &structs.NodeService{ + Kind: "", + ID: "api-1", + Service: "api", + Port: 8080, + EnterpriseMeta: *localEntMeta, + PeerName: localPeerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-1-check", + Node: "node-1", + Status: api.HealthPassing, + EnterpriseMeta: *localEntMeta, + PeerName: localPeerName, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + ServiceName: "api", + Node: "node-1", + Status: api.HealthCritical, + EnterpriseMeta: *localEntMeta, + PeerName: localPeerName, + }, + }, + }, + { + Node: &structs.Node{ + ID: "112e2243-ab62-4e8a-9317-63306972183c", + Node: "node-1", + Address: "10.0.0.1", + Datacenter: "dc1", + Partition: localEntMeta.PartitionOrEmpty(), + PeerName: localPeerName, + }, + Service: &structs.NodeService{ + Kind: "", + ID: "api-2", + Service: "api", + Port: 9090, + EnterpriseMeta: *localEntMeta, + PeerName: localPeerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-1-check", + Node: "node-1", + Status: api.HealthPassing, + EnterpriseMeta: *localEntMeta, + PeerName: localPeerName, + }, + { + CheckID: "api-2-check", + ServiceID: "api-2", + ServiceName: "api", + Node: "node-1", + Status: api.HealthWarning, + EnterpriseMeta: *localEntMeta, + PeerName: localPeerName, + }, + }, + }, + }, + }, + } + for _, tc := range tt { + runStep(t, tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + if !t.Run(name, fn) { + t.FailNow() + } +} + // newTestServer is copied from partition/service_test.go, with the addition of certs/cas. // TODO(peering): these are endpoint tests and should live in the agent/consul // package. Instead, these can be written around a mock client (see testing.go) diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index 019ffc8ff8..ff577b6c3d 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -32,16 +32,23 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) { } run := func(t *testing.T, tc testCase) { - srv := NewService(testutil.Logger(t), nil) - client := newMockClient(context.Background()) + publisher := stream.NewEventPublisher(10 * time.Second) + store := newStateStore(t, publisher) + + srv := NewService(testutil.Logger(t), &testStreamBackend{ + store: store, + pub: publisher, + }) + + client := NewMockClient(context.Background()) errCh := make(chan error, 1) - client.errCh = errCh + client.ErrCh = errCh go func() { - // Pass errors from server handler into errCh so that they can be seen by the client on Recv(). + // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv(). // This matches gRPC's behavior when an error is returned by a server. - err := srv.StreamResources(client.replicationStream) + err := srv.StreamResources(client.ReplicationStream) if err != nil { errCh <- err } @@ -103,6 +110,18 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) { }, wantErr: status.Error(codes.InvalidArgument, "subscription request to unknown resource URL: nomad.Job"), }, + { + name: "unknown peer", + input: &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Request_{ + Request: &pbpeering.ReplicationMessage_Request{ + PeerID: "63b60245-c475-426b-b314-4588d210859d", + ResourceURL: pbpeering.TypeURLService, + }, + }, + }, + wantErr: status.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: 63b60245-c475-426b-b314-4588d210859d"), + }, } for _, tc := range tt { @@ -127,21 +146,30 @@ func TestStreamResources_Server_Terminate(t *testing.T) { } srv.streams.timeNow = it.Now - client := newMockClient(context.Background()) + client := NewMockClient(context.Background()) errCh := make(chan error, 1) - client.errCh = errCh + client.ErrCh = errCh go func() { - // Pass errors from server handler into errCh so that they can be seen by the client on Recv(). + // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv(). // This matches gRPC's behavior when an error is returned by a server. - if err := srv.StreamResources(client.replicationStream); err != nil { + if err := srv.StreamResources(client.ReplicationStream); err != nil { errCh <- err } }() + peering := pbpeering.Peering{ + Name: "my-peer", + } + require.NoError(t, store.PeeringWrite(0, &peering)) + + _, p, err := store.PeeringRead(nil, state.Query{Value: "my-peer"}) + require.NoError(t, err) + // Receive a subscription from a peer - peerID := "63b60245-c475-426b-b314-4588d210859d" + peerID := p.ID + sub := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ Request: &pbpeering.ReplicationMessage_Request{ @@ -150,7 +178,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) { }, }, } - err := client.Send(sub) + err = client.Send(sub) require.NoError(t, err) testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) { @@ -209,14 +237,23 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { } srv.streams.timeNow = it.Now - client := newMockClient(context.Background()) + client := NewMockClient(context.Background()) errCh := make(chan error, 1) go func() { - errCh <- srv.StreamResources(client.replicationStream) + errCh <- srv.StreamResources(client.ReplicationStream) }() - peerID := "63b60245-c475-426b-b314-4588d210859d" + peering := pbpeering.Peering{ + Name: "my-peer", + } + require.NoError(t, store.PeeringWrite(0, &peering)) + + _, p, err := store.PeeringRead(nil, state.Query{Value: "my-peer"}) + require.NoError(t, err) + + peerID := p.ID + sub := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ Request: &pbpeering.ReplicationMessage_Request{ @@ -225,7 +262,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }, }, } - err := client.Send(sub) + err = client.Send(sub) require.NoError(t, err) testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) { @@ -483,15 +520,15 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { pub: publisher, }) - client := newMockClient(context.Background()) + client := NewMockClient(context.Background()) errCh := make(chan error, 1) - client.errCh = errCh + client.ErrCh = errCh go func() { - // Pass errors from server handler into errCh so that they can be seen by the client on Recv(). + // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv(). // This matches gRPC's behavior when an error is returned by a server. - if err := srv.StreamResources(client.replicationStream); err != nil { + if err := srv.StreamResources(client.ReplicationStream); err != nil { errCh <- err } }() @@ -683,7 +720,7 @@ func (b *testStreamBackend) Apply() Apply { return nil } -func Test_processResponse(t *testing.T) { +func Test_processResponse_Validation(t *testing.T) { type testCase struct { name string in *pbpeering.ReplicationMessage_Response @@ -691,8 +728,15 @@ func Test_processResponse(t *testing.T) { wantErr bool } + publisher := stream.NewEventPublisher(10 * time.Second) + store := newStateStore(t, publisher) + srv := NewService(testutil.Logger(t), &testStreamBackend{ + store: store, + pub: publisher, + }) + run := func(t *testing.T, tc testCase) { - reply, err := processResponse(tc.in) + reply, err := srv.processResponse("", "", tc.in) if tc.wantErr { require.Error(t, err) } else { diff --git a/agent/rpc/peering/subscription_view.go b/agent/rpc/peering/subscription_view.go index d6b48e923b..a6f351b4f5 100644 --- a/agent/rpc/peering/subscription_view.go +++ b/agent/rpc/peering/subscription_view.go @@ -47,6 +47,8 @@ func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo { // NewMaterializer implements submatview.Request func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) { reqFn := func(index uint64) *pbsubscribe.SubscribeRequest { + // TODO(peering): We need to be able to receive both connect proxies and typical service instances for a given name. + // Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name. r := &pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, Key: e.req.ServiceName, diff --git a/agent/rpc/peering/testing.go b/agent/rpc/peering/testing.go index 25b1f184b7..fb6f7548f0 100644 --- a/agent/rpc/peering/testing.go +++ b/agent/rpc/peering/testing.go @@ -75,52 +75,52 @@ func TestPeeringToken(peerID string) structs.PeeringToken { } } -type mockClient struct { - mu sync.Mutex - errCh chan error +type MockClient struct { + mu sync.Mutex - replicationStream *mockStream + ErrCh chan error + ReplicationStream *MockStream } -func (c *mockClient) Send(r *pbpeering.ReplicationMessage) error { - c.replicationStream.recvCh <- r +func (c *MockClient) Send(r *pbpeering.ReplicationMessage) error { + c.ReplicationStream.recvCh <- r return nil } -func (c *mockClient) Recv() (*pbpeering.ReplicationMessage, error) { +func (c *MockClient) Recv() (*pbpeering.ReplicationMessage, error) { select { - case err := <-c.errCh: + case err := <-c.ErrCh: return nil, err - case r := <-c.replicationStream.sendCh: + case r := <-c.ReplicationStream.sendCh: return r, nil case <-time.After(10 * time.Millisecond): return nil, io.EOF } } -func (c *mockClient) RecvWithTimeout(dur time.Duration) (*pbpeering.ReplicationMessage, error) { +func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeering.ReplicationMessage, error) { select { - case err := <-c.errCh: + case err := <-c.ErrCh: return nil, err - case r := <-c.replicationStream.sendCh: + case r := <-c.ReplicationStream.sendCh: return r, nil case <-time.After(dur): return nil, io.EOF } } -func (c *mockClient) Close() { - close(c.replicationStream.recvCh) +func (c *MockClient) Close() { + close(c.ReplicationStream.recvCh) } -func newMockClient(ctx context.Context) *mockClient { - return &mockClient{ - replicationStream: newTestReplicationStream(ctx), +func NewMockClient(ctx context.Context) *MockClient { + return &MockClient{ + ReplicationStream: newTestReplicationStream(ctx), } } -// mockStream mocks peering.PeeringService_StreamResourcesServer -type mockStream struct { +// MockStream mocks peering.PeeringService_StreamResourcesServer +type MockStream struct { sendCh chan *pbpeering.ReplicationMessage recvCh chan *pbpeering.ReplicationMessage @@ -128,10 +128,10 @@ type mockStream struct { mu sync.Mutex } -var _ pbpeering.PeeringService_StreamResourcesServer = (*mockStream)(nil) +var _ pbpeering.PeeringService_StreamResourcesServer = (*MockStream)(nil) -func newTestReplicationStream(ctx context.Context) *mockStream { - return &mockStream{ +func newTestReplicationStream(ctx context.Context) *MockStream { + return &MockStream{ sendCh: make(chan *pbpeering.ReplicationMessage, 1), recvCh: make(chan *pbpeering.ReplicationMessage, 1), ctx: ctx, @@ -139,13 +139,13 @@ func newTestReplicationStream(ctx context.Context) *mockStream { } // Send implements pbpeering.PeeringService_StreamResourcesServer -func (s *mockStream) Send(r *pbpeering.ReplicationMessage) error { +func (s *MockStream) Send(r *pbpeering.ReplicationMessage) error { s.sendCh <- r return nil } // Recv implements pbpeering.PeeringService_StreamResourcesServer -func (s *mockStream) Recv() (*pbpeering.ReplicationMessage, error) { +func (s *MockStream) Recv() (*pbpeering.ReplicationMessage, error) { r := <-s.recvCh if r == nil { return nil, io.EOF @@ -154,32 +154,32 @@ func (s *mockStream) Recv() (*pbpeering.ReplicationMessage, error) { } // Context implements grpc.ServerStream and grpc.ClientStream -func (s *mockStream) Context() context.Context { +func (s *MockStream) Context() context.Context { return s.ctx } // SendMsg implements grpc.ServerStream and grpc.ClientStream -func (s *mockStream) SendMsg(m interface{}) error { +func (s *MockStream) SendMsg(m interface{}) error { return nil } // RecvMsg implements grpc.ServerStream and grpc.ClientStream -func (s *mockStream) RecvMsg(m interface{}) error { +func (s *MockStream) RecvMsg(m interface{}) error { return nil } // SetHeader implements grpc.ServerStream -func (s *mockStream) SetHeader(metadata.MD) error { +func (s *MockStream) SetHeader(metadata.MD) error { return nil } // SendHeader implements grpc.ServerStream -func (s *mockStream) SendHeader(metadata.MD) error { +func (s *MockStream) SendHeader(metadata.MD) error { return nil } // SetTrailer implements grpc.ServerStream -func (s *mockStream) SetTrailer(metadata.MD) {} +func (s *MockStream) SetTrailer(metadata.MD) {} type incrementalTime struct { base time.Time diff --git a/agent/structs/structs.go b/agent/structs/structs.go index e2849e5665..35d2f57a4a 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -855,6 +855,20 @@ func (n *Node) BestAddress(wan bool) string { return n.Address } +func (n *Node) ToRegisterRequest() RegisterRequest { + return RegisterRequest{ + ID: n.ID, + Node: n.Node, + Datacenter: n.Datacenter, + Address: n.Address, + TaggedAddresses: n.TaggedAddresses, + NodeMeta: n.Meta, + RaftIndex: n.RaftIndex, + EnterpriseMeta: *n.GetEnterpriseMeta(), + PeerName: n.PeerName, + } +} + type Nodes []*Node // IsSame return whether nodes are similar without taking into account