diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 56f217ee65..0af04f4ece 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -169,5 +169,10 @@ func (a *peeringApply) CatalogRegister(req *structs.RegisterRequest) error { return err } +func (a *peeringApply) CatalogDeregister(req *structs.DeregisterRequest) error { + _, err := a.srv.leaderRaftApply("Catalog.Deregister", structs.DeregisterRequestType, req) + return err +} + var _ peering.Apply = (*peeringApply)(nil) var _ peering.LeaderAddress = (*leaderAddr)(nil) diff --git a/agent/rpc/peering/replication.go b/agent/rpc/peering/replication.go index c8b9726647..aef8b45255 100644 --- a/agent/rpc/peering/replication.go +++ b/agent/rpc/peering/replication.go @@ -7,6 +7,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "github.com/hashicorp/consul/types" "github.com/hashicorp/go-hclog" "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/protobuf/types/known/anypb" @@ -209,7 +210,7 @@ func (s *Service) handleUpsert( return fmt.Errorf("failed to unmarshal resource: %w", err) } - return s.handleUpsertService(peerName, partition, sn, csn) + return s.handleUpdateService(peerName, partition, sn, csn) case pbpeering.TypeURLRoots: roots := &pbpeering.PeeringTrustBundle{} @@ -224,24 +225,29 @@ func (s *Service) handleUpsert( } } -func (s *Service) handleUpsertService( +// handleUpdateService handles both deletion and upsert events for a service. +// On an UPSERT event: +// - All nodes, services, checks in the input pbNodes are re-applied through Raft. +// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted. +// +// On a DELETE event: +// - A reconciliation against nil or empty input pbNodes leads to deleting all stored catalog resources +// associated with the service name. +func (s *Service) handleUpdateService( peerName string, partition string, sn structs.ServiceName, - csn *pbservice.IndexedCheckServiceNodes, + pbNodes *pbservice.IndexedCheckServiceNodes, ) error { - if csn == nil || len(csn.Nodes) == 0 { - return s.handleDeleteService(peerName, partition, sn) + // Capture instances in the state store for reconciliation later. + _, storedInstances, err := s.Backend.Store().CheckServiceNodes(nil, sn.Name, &sn.EnterpriseMeta, peerName) + if err != nil { + return fmt.Errorf("failed to read imported services: %w", err) } - // Convert exported data into structs format. - structsNodes := make([]structs.CheckServiceNode, 0, len(csn.Nodes)) - for _, pb := range csn.Nodes { - instance, err := pbservice.CheckServiceNodeToStructs(pb) - if err != nil { - return fmt.Errorf("failed to convert instance: %w", err) - } - structsNodes = append(structsNodes, *instance) + structsNodes, err := pbNodes.CheckServiceNodesToStruct() + if err != nil { + return fmt.Errorf("failed to convert protobuf instances to structs: %w", err) } // Normalize the data into a convenient form for operation. @@ -277,8 +283,145 @@ func (s *Service) handleUpsertService( } } - // TODO(peering): cleanup and deregister existing data that is now missing safely somehow + // + // Now that the data received has been stored in the state store, the rest of this + // function is responsible for cleaning up data in the catalog that wasn't in the snapshot. + // + // nodeCheckTuple uniquely identifies a node check in the catalog. + // The partition is not needed because we are only operating on one partition's catalog. + type nodeCheckTuple struct { + checkID types.CheckID + node string + } + + var ( + // unusedNodes tracks node names that were not present in the latest response. + // Missing nodes are not assumed to be deleted because there may be other service names + // registered on them. + // Inside we also track a map of node checks associated with the node. + unusedNodes = make(map[string]struct{}) + + // deletedNodeChecks tracks node checks that were not present in the latest response. + // A single node check will be attached to all service instances of a node, so this + // deduplication prevents issuing multiple deregistrations for a single check. + deletedNodeChecks = make(map[nodeCheckTuple]struct{}) + ) + for _, csn := range storedInstances { + if _, ok := snap.Nodes[csn.Node.ID]; !ok { + unusedNodes[string(csn.Node.ID)] = struct{}{} + + // Since the node is not in the snapshot we can know the associated service + // instance is not in the snapshot either, since a service instance can't + // exist without a node. + // This will also delete all service checks. + err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{ + Node: csn.Node.Node, + ServiceID: csn.Service.ID, + EnterpriseMeta: csn.Service.EnterpriseMeta, + PeerName: peerName, + }) + if err != nil { + return fmt.Errorf("failed to deregister service %q: %w", csn.Service.CompoundServiceID(), err) + } + + // We can't know if a node check was deleted from the exporting cluster + // (but not the node itself) if the node wasn't in the snapshot, + // so we do not loop over checks here. + // If the unusedNode gets deleted below that will also delete node checks. + continue + } + + // Delete the service instance if not in the snapshot. + sid := csn.Service.CompoundServiceID() + if _, ok := snap.Nodes[csn.Node.ID].Services[sid]; !ok { + err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{ + Node: csn.Node.Node, + ServiceID: csn.Service.ID, + EnterpriseMeta: csn.Service.EnterpriseMeta, + PeerName: peerName, + }) + if err != nil { + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/ns:%s/service_id:%s", + csn.Service.PartitionOrDefault(), peerName, csn.Node.Node, csn.Service.NamespaceOrDefault(), csn.Service.ID) + return fmt.Errorf("failed to deregister service %q: %w", ident, err) + } + + // When a service is deleted all associated checks also get deleted as a side effect. + continue + } + + // Reconcile checks. + for _, chk := range csn.Checks { + if _, ok := snap.Nodes[csn.Node.ID].Services[sid].Checks[chk.CheckID]; !ok { + // Checks without a ServiceID are node checks. + // If the node exists but the check does not then the check was deleted. + if chk.ServiceID == "" { + // Deduplicate node checks to avoid deregistering a check multiple times. + tuple := nodeCheckTuple{ + checkID: chk.CheckID, + node: chk.Node, + } + deletedNodeChecks[tuple] = struct{}{} + continue + } + + // If the check isn't a node check then it's a service check. + // Service checks that were not present can be deleted immediately because + // checks for a given service ID will only be attached to a single CheckServiceNode. + err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{ + Node: chk.Node, + CheckID: chk.CheckID, + EnterpriseMeta: chk.EnterpriseMeta, + PeerName: peerName, + }) + if err != nil { + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/ns:%s/check_id:%s", + chk.PartitionOrDefault(), peerName, chk.Node, chk.NamespaceOrDefault(), chk.CheckID) + return fmt.Errorf("failed to deregister check %q: %w", ident, err) + } + } + } + } + + // Delete all deduplicated node checks. + for chk := range deletedNodeChecks { + nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()) + err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{ + Node: chk.node, + CheckID: chk.checkID, + EnterpriseMeta: *nodeMeta, + PeerName: peerName, + }) + if err != nil { + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/check_id:%s", nodeMeta.PartitionOrDefault(), peerName, chk.node, chk.checkID) + return fmt.Errorf("failed to deregister node check %q: %w", ident, err) + } + } + + // Delete any nodes that do not have any other services registered on them. + for node := range unusedNodes { + nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()) + _, ns, err := s.Backend.Store().NodeServices(nil, node, nodeMeta, peerName) + if err != nil { + return fmt.Errorf("failed to query services on node: %w", err) + } + if ns != nil && len(ns.Services) >= 1 { + // At least one service is still registered on this node, so we keep it. + continue + } + + // All services on the node were deleted, so the node is also cleaned up. + err = s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{ + Node: node, + PeerName: peerName, + EnterpriseMeta: *nodeMeta, + }) + if err != nil { + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node) + return fmt.Errorf("failed to deregister node %q: %w", ident, err) + } + } return nil } @@ -307,25 +450,13 @@ func (s *Service) handleDelete( case pbpeering.TypeURLService: sn := structs.ServiceNameFromString(resourceID) sn.OverridePartition(partition) - return s.handleDeleteService(peerName, partition, sn) + return s.handleUpdateService(peerName, partition, sn, nil) default: return fmt.Errorf("unexpected resourceURL: %s", resourceURL) } } -func (s *Service) handleDeleteService( - peerName string, - partition string, - sn structs.ServiceName, -) error { - // Deregister: ServiceID == DeleteService ANd checks - // Deregister: ServiceID(empty) CheckID(empty) == DeleteNode - - // TODO(peering): implement - return nil -} - func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeering.ReplicationMessage { var rpcErr *pbstatus.Status if errCode != code.Code_OK || errMsg != "" { diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index acbd2e6f9c..c57710c548 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -130,6 +130,8 @@ type Store interface { PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) + CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) + NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) AbandonCh() <-chan struct{} @@ -142,6 +144,7 @@ type Apply interface { PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error CatalogRegister(req *structs.RegisterRequest) error + CatalogDeregister(req *structs.DeregisterRequest) error } // GenerateToken implements the PeeringService RPC method to generate a diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index 27fc3055e1..dc30fa6868 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -18,16 +18,19 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbstatus" "github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/hashicorp/consul/types" ) func TestStreamResources_Server_Follower(t *testing.T) { @@ -949,6 +952,7 @@ func makeClient( type testStreamBackend struct { pub state.EventPublisher store *state.Store + applier *testApplier leader func() bool leaderAddress *leaderAddress } @@ -1019,6 +1023,49 @@ func (b *testStreamBackend) EnterpriseCheckNamespaces(_ string) error { } func (b *testStreamBackend) Apply() Apply { + return b.applier +} + +type testApplier struct { + store *state.Store +} + +func (a *testApplier) PeeringWrite(req *pbpeering.PeeringWriteRequest) error { + panic("not implemented") +} + +func (a *testApplier) PeeringDelete(req *pbpeering.PeeringDeleteRequest) error { + panic("not implemented") +} + +func (a *testApplier) PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error { + panic("not implemented") +} + +func (a *testApplier) PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error { + panic("not implemented") +} + +// CatalogRegister mocks catalog registrations through Raft by copying the logic of FSM.applyRegister. +func (a *testApplier) CatalogRegister(req *structs.RegisterRequest) error { + return a.store.EnsureRegistration(1, req) +} + +// CatalogDeregister mocks catalog de-registrations through Raft by copying the logic of FSM.applyDeregister. +func (a *testApplier) CatalogDeregister(req *structs.DeregisterRequest) error { + if req.ServiceID != "" { + if err := a.store.DeleteService(1, req.Node, req.ServiceID, &req.EnterpriseMeta, req.PeerName); err != nil { + return err + } + } else if req.CheckID != "" { + if err := a.store.DeleteCheck(1, req.Node, req.CheckID, &req.EnterpriseMeta, req.PeerName); err != nil { + return err + } + } else { + if err := a.store.DeleteNode(1, req.Node, &req.EnterpriseMeta, req.PeerName); err != nil { + return err + } + } return nil } @@ -1273,3 +1320,854 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test checkFns[i](t, out[i]) } } + +func TestHandleUpdateService(t *testing.T) { + publisher := stream.NewEventPublisher(10 * time.Second) + store := newStateStore(t, publisher) + + srv := NewService( + testutil.Logger(t), + Config{ + Datacenter: "dc1", + ConnectEnabled: true, + }, + &testStreamBackend{ + store: store, + applier: &testApplier{store: store}, + pub: publisher, + leader: func() bool { + return false + }, + }, + ) + + type testCase struct { + name string + seed []*structs.RegisterRequest + input *pbservice.IndexedCheckServiceNodes + expect map[string]structs.CheckServiceNodes + } + + peerName := "billing" + remoteMeta := pbcommon.NewEnterpriseMetaFromStructs(*structs.DefaultEnterpriseMetaInPartition("billing-ap")) + + // "api" service is imported from the billing-ap partition, corresponding to the billing peer. + // Locally it is stored to the default partition. + defaultMeta := *acl.DefaultEnterpriseMeta() + apiSN := structs.NewServiceName("api", &defaultMeta) + + run := func(t *testing.T, tc testCase) { + // Seed the local catalog with some data to reconcile against. + for _, reg := range tc.seed { + require.NoError(t, srv.Backend.Apply().CatalogRegister(reg)) + } + + // Simulate an update arriving for billing/api. + require.NoError(t, srv.handleUpdateService(peerName, acl.DefaultPartitionName, apiSN, tc.input)) + + for svc, expect := range tc.expect { + t.Run(svc, func(t *testing.T) { + _, got, err := srv.Backend.Store().CheckServiceNodes(nil, svc, &defaultMeta, peerName) + require.NoError(t, err) + requireEqualInstances(t, expect, got) + }) + } + } + + tt := []testCase{ + { + name: "upsert two service instances to the same node", + input: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + }, + }, + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-2", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + { + CheckID: "api-2-check", + ServiceID: "api-2", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + expect: map[string]structs.CheckServiceNodes{ + "api": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + + // The remote billing-ap partition is overwritten for all resources with the local default. + Partition: defaultMeta.PartitionOrEmpty(), + + // The name of the peer "billing" is attached as well. + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-2", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + { + CheckID: "api-2-check", + ServiceID: "api-2", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + }, + { + name: "upsert two service instances to different nodes", + input: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + }, + }, + { + Node: &pbservice.Node{ + ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2", + Node: "node-bar", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-2", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "node-bar-check", + Node: "node-bar", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + { + CheckID: "api-2-check", + ServiceID: "api-2", + Node: "node-bar", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + expect: map[string]structs.CheckServiceNodes{ + "api": { + { + Node: &structs.Node{ + ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2", + Node: "node-bar", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-2", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-bar-check", + Node: "node-bar", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + { + CheckID: "api-2-check", + ServiceID: "api-2", + Node: "node-bar", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + + // The remote billing-ap partition is overwritten for all resources with the local default. + Partition: defaultMeta.PartitionOrEmpty(), + + // The name of the peer "billing" is attached as well. + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + }, + { + name: "receiving a nil input leads to deleting data in the catalog", + seed: []*structs.RegisterRequest{ + { + ID: types.NodeID("c0f97de9-4e1b-4e80-a1c6-cd8725835ab2"), + Node: "node-bar", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-2", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-bar", + ServiceID: "api-2", + CheckID: types.CheckID("api-2-check"), + PeerName: peerName, + }, + { + Node: "node-bar", + CheckID: types.CheckID("node-bar-check"), + PeerName: peerName, + }, + }, + }, + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "api-1", + CheckID: types.CheckID("api-1-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + }, + input: nil, + expect: map[string]structs.CheckServiceNodes{ + "api": {}, + }, + }, + { + name: "deleting one service name from a node does not delete other service names", + seed: []*structs.RegisterRequest{ + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "redis-2", + CheckID: types.CheckID("redis-2-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "api-1", + CheckID: types.CheckID("api-1-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + }, + // Nil input is for the "api" service. + input: nil, + expect: map[string]structs.CheckServiceNodes{ + "api": {}, + // Existing redis service was not affected by deletion. + "redis": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + { + CheckID: "redis-2-check", + ServiceID: "redis-2", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + }, + { + name: "service checks are cleaned up when not present in a response", + seed: []*structs.RegisterRequest{ + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "api-1", + CheckID: types.CheckID("api-1-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + }, + input: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + // Service check was deleted + }, + }, + }, + }, + expect: map[string]structs.CheckServiceNodes{ + // Service check should be gone + "api": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{}, + }, + }, + }, + }, + { + name: "node checks are cleaned up when not present in a response", + seed: []*structs.RegisterRequest{ + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "redis-2", + CheckID: types.CheckID("redis-2-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "api-1", + CheckID: types.CheckID("api-1-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + }, + input: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + // Node check was deleted + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + expect: map[string]structs.CheckServiceNodes{ + // Node check should be gone + "api": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + "redis": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "redis-2-check", + ServiceID: "redis-2", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + }, + { + name: "replacing a service instance on a node cleans up the old instance", + seed: []*structs.RegisterRequest{ + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "redis-2", + CheckID: types.CheckID("redis-2-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "api-1", + CheckID: types.CheckID("api-1-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + }, + input: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + // New service ID and checks for the api service. + Service: &pbservice.NodeService{ + ID: "new-api-v2", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + { + Node: "node-foo", + ServiceID: "new-api-v2", + CheckID: "new-api-v2-check", + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: "node-foo-check", + PeerName: peerName, + }, + }, + }, + }, + }, + expect: map[string]structs.CheckServiceNodes{ + "api": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "new-api-v2", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + Node: "node-foo", + CheckID: "node-foo-check", + PeerName: peerName, + }, + { + CheckID: "new-api-v2-check", + ServiceID: "new-api-v2", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + "redis": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + Node: "node-foo", + CheckID: "node-foo-check", + PeerName: peerName, + }, + { + CheckID: "redis-2-check", + ServiceID: "redis-2", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes) { + t.Helper() + + require.Equal(t, len(expect), len(got), "got differing number of instances") + + for i := range expect { + // Node equality + require.Equal(t, expect[i].Node.ID, got[i].Node.ID, "node mismatch") + require.Equal(t, expect[i].Node.Partition, got[i].Node.Partition, "partition mismatch") + require.Equal(t, expect[i].Node.PeerName, got[i].Node.PeerName, "peer name mismatch") + + // Service equality + require.Equal(t, expect[i].Service.ID, got[i].Service.ID, "service id mismatch") + require.Equal(t, expect[i].Service.PeerName, got[i].Service.PeerName, "peer name mismatch") + require.Equal(t, expect[i].Service.PartitionOrDefault(), got[i].Service.PartitionOrDefault(), "partition mismatch") + + // Check equality + require.Equal(t, len(expect[i].Checks), len(got[i].Checks), "got differing number of check") + + for j := range expect[i].Checks { + require.Equal(t, expect[i].Checks[j].CheckID, got[i].Checks[j].CheckID, "check id mismatch") + require.Equal(t, expect[i].Checks[j].PeerName, got[i].Checks[j].PeerName, "peer name mismatch") + require.Equal(t, expect[i].Checks[j].PartitionOrDefault(), got[i].Checks[j].PartitionOrDefault(), "partition mismatch") + } + } + +} diff --git a/proto/pbservice/convert.go b/proto/pbservice/convert.go index d5233dd990..02895adf96 100644 --- a/proto/pbservice/convert.go +++ b/proto/pbservice/convert.go @@ -1,6 +1,8 @@ package pbservice import ( + "fmt" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/types" @@ -42,6 +44,23 @@ func NewMapHeadersFromStructs(t map[string][]string) map[string]*HeaderValue { return s } +// CheckServiceNodesToStruct converts the contained CheckServiceNodes to their structs equivalent. +func (s *IndexedCheckServiceNodes) CheckServiceNodesToStruct() ([]structs.CheckServiceNode, error) { + if s == nil { + return nil, nil + } + + resp := make([]structs.CheckServiceNode, 0, len(s.Nodes)) + for _, pb := range s.Nodes { + instance, err := CheckServiceNodeToStructs(pb) + if err != nil { + return resp, fmt.Errorf("failed to convert instance: %w", err) + } + resp = append(resp, *instance) + } + return resp, nil +} + // TODO: use mog once it supports pointers and slices func CheckServiceNodeToStructs(s *CheckServiceNode) (*structs.CheckServiceNode, error) { if s == nil {