From 71b254522e85d70d5e4acd19641b7717ba19745b Mon Sep 17 00:00:00 2001 From: Freddy Date: Mon, 13 Jun 2022 11:52:28 -0600 Subject: [PATCH] Clean up imported nodes/services/checks as needed (#13367) Previously, imported data would never be deleted. As nodes/services/checks were registered and deregistered, resources deleted from the exporting cluster would accumulate in the imported cluster. This commit makes updates to replication so that whenever an update is received for a service name we reconcile what was present in the catalog against what was received. This handleUpdateService method can handle both updates and deletions. --- agent/consul/peering_backend.go | 5 + agent/rpc/peering/replication.go | 185 ++++++- agent/rpc/peering/service.go | 3 + agent/rpc/peering/stream_test.go | 898 +++++++++++++++++++++++++++++++ proto/pbservice/convert.go | 19 + 5 files changed, 1083 insertions(+), 27 deletions(-) diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 56f217ee65..0af04f4ece 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -169,5 +169,10 @@ func (a *peeringApply) CatalogRegister(req *structs.RegisterRequest) error { return err } +func (a *peeringApply) CatalogDeregister(req *structs.DeregisterRequest) error { + _, err := a.srv.leaderRaftApply("Catalog.Deregister", structs.DeregisterRequestType, req) + return err +} + var _ peering.Apply = (*peeringApply)(nil) var _ peering.LeaderAddress = (*leaderAddr)(nil) diff --git a/agent/rpc/peering/replication.go b/agent/rpc/peering/replication.go index c8b9726647..aef8b45255 100644 --- a/agent/rpc/peering/replication.go +++ b/agent/rpc/peering/replication.go @@ -7,6 +7,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "github.com/hashicorp/consul/types" "github.com/hashicorp/go-hclog" "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/protobuf/types/known/anypb" @@ -209,7 +210,7 @@ func (s *Service) handleUpsert( return fmt.Errorf("failed to unmarshal resource: %w", err) } - return s.handleUpsertService(peerName, partition, sn, csn) + return s.handleUpdateService(peerName, partition, sn, csn) case pbpeering.TypeURLRoots: roots := &pbpeering.PeeringTrustBundle{} @@ -224,24 +225,29 @@ func (s *Service) handleUpsert( } } -func (s *Service) handleUpsertService( +// handleUpdateService handles both deletion and upsert events for a service. +// On an UPSERT event: +// - All nodes, services, checks in the input pbNodes are re-applied through Raft. +// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted. +// +// On a DELETE event: +// - A reconciliation against nil or empty input pbNodes leads to deleting all stored catalog resources +// associated with the service name. +func (s *Service) handleUpdateService( peerName string, partition string, sn structs.ServiceName, - csn *pbservice.IndexedCheckServiceNodes, + pbNodes *pbservice.IndexedCheckServiceNodes, ) error { - if csn == nil || len(csn.Nodes) == 0 { - return s.handleDeleteService(peerName, partition, sn) + // Capture instances in the state store for reconciliation later. + _, storedInstances, err := s.Backend.Store().CheckServiceNodes(nil, sn.Name, &sn.EnterpriseMeta, peerName) + if err != nil { + return fmt.Errorf("failed to read imported services: %w", err) } - // Convert exported data into structs format. - structsNodes := make([]structs.CheckServiceNode, 0, len(csn.Nodes)) - for _, pb := range csn.Nodes { - instance, err := pbservice.CheckServiceNodeToStructs(pb) - if err != nil { - return fmt.Errorf("failed to convert instance: %w", err) - } - structsNodes = append(structsNodes, *instance) + structsNodes, err := pbNodes.CheckServiceNodesToStruct() + if err != nil { + return fmt.Errorf("failed to convert protobuf instances to structs: %w", err) } // Normalize the data into a convenient form for operation. @@ -277,8 +283,145 @@ func (s *Service) handleUpsertService( } } - // TODO(peering): cleanup and deregister existing data that is now missing safely somehow + // + // Now that the data received has been stored in the state store, the rest of this + // function is responsible for cleaning up data in the catalog that wasn't in the snapshot. + // + // nodeCheckTuple uniquely identifies a node check in the catalog. + // The partition is not needed because we are only operating on one partition's catalog. + type nodeCheckTuple struct { + checkID types.CheckID + node string + } + + var ( + // unusedNodes tracks node names that were not present in the latest response. + // Missing nodes are not assumed to be deleted because there may be other service names + // registered on them. + // Inside we also track a map of node checks associated with the node. + unusedNodes = make(map[string]struct{}) + + // deletedNodeChecks tracks node checks that were not present in the latest response. + // A single node check will be attached to all service instances of a node, so this + // deduplication prevents issuing multiple deregistrations for a single check. + deletedNodeChecks = make(map[nodeCheckTuple]struct{}) + ) + for _, csn := range storedInstances { + if _, ok := snap.Nodes[csn.Node.ID]; !ok { + unusedNodes[string(csn.Node.ID)] = struct{}{} + + // Since the node is not in the snapshot we can know the associated service + // instance is not in the snapshot either, since a service instance can't + // exist without a node. + // This will also delete all service checks. + err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{ + Node: csn.Node.Node, + ServiceID: csn.Service.ID, + EnterpriseMeta: csn.Service.EnterpriseMeta, + PeerName: peerName, + }) + if err != nil { + return fmt.Errorf("failed to deregister service %q: %w", csn.Service.CompoundServiceID(), err) + } + + // We can't know if a node check was deleted from the exporting cluster + // (but not the node itself) if the node wasn't in the snapshot, + // so we do not loop over checks here. + // If the unusedNode gets deleted below that will also delete node checks. + continue + } + + // Delete the service instance if not in the snapshot. + sid := csn.Service.CompoundServiceID() + if _, ok := snap.Nodes[csn.Node.ID].Services[sid]; !ok { + err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{ + Node: csn.Node.Node, + ServiceID: csn.Service.ID, + EnterpriseMeta: csn.Service.EnterpriseMeta, + PeerName: peerName, + }) + if err != nil { + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/ns:%s/service_id:%s", + csn.Service.PartitionOrDefault(), peerName, csn.Node.Node, csn.Service.NamespaceOrDefault(), csn.Service.ID) + return fmt.Errorf("failed to deregister service %q: %w", ident, err) + } + + // When a service is deleted all associated checks also get deleted as a side effect. + continue + } + + // Reconcile checks. + for _, chk := range csn.Checks { + if _, ok := snap.Nodes[csn.Node.ID].Services[sid].Checks[chk.CheckID]; !ok { + // Checks without a ServiceID are node checks. + // If the node exists but the check does not then the check was deleted. + if chk.ServiceID == "" { + // Deduplicate node checks to avoid deregistering a check multiple times. + tuple := nodeCheckTuple{ + checkID: chk.CheckID, + node: chk.Node, + } + deletedNodeChecks[tuple] = struct{}{} + continue + } + + // If the check isn't a node check then it's a service check. + // Service checks that were not present can be deleted immediately because + // checks for a given service ID will only be attached to a single CheckServiceNode. + err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{ + Node: chk.Node, + CheckID: chk.CheckID, + EnterpriseMeta: chk.EnterpriseMeta, + PeerName: peerName, + }) + if err != nil { + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/ns:%s/check_id:%s", + chk.PartitionOrDefault(), peerName, chk.Node, chk.NamespaceOrDefault(), chk.CheckID) + return fmt.Errorf("failed to deregister check %q: %w", ident, err) + } + } + } + } + + // Delete all deduplicated node checks. + for chk := range deletedNodeChecks { + nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()) + err := s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{ + Node: chk.node, + CheckID: chk.checkID, + EnterpriseMeta: *nodeMeta, + PeerName: peerName, + }) + if err != nil { + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/check_id:%s", nodeMeta.PartitionOrDefault(), peerName, chk.node, chk.checkID) + return fmt.Errorf("failed to deregister node check %q: %w", ident, err) + } + } + + // Delete any nodes that do not have any other services registered on them. + for node := range unusedNodes { + nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()) + _, ns, err := s.Backend.Store().NodeServices(nil, node, nodeMeta, peerName) + if err != nil { + return fmt.Errorf("failed to query services on node: %w", err) + } + if ns != nil && len(ns.Services) >= 1 { + // At least one service is still registered on this node, so we keep it. + continue + } + + // All services on the node were deleted, so the node is also cleaned up. + err = s.Backend.Apply().CatalogDeregister(&structs.DeregisterRequest{ + Node: node, + PeerName: peerName, + EnterpriseMeta: *nodeMeta, + }) + if err != nil { + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node) + return fmt.Errorf("failed to deregister node %q: %w", ident, err) + } + } return nil } @@ -307,25 +450,13 @@ func (s *Service) handleDelete( case pbpeering.TypeURLService: sn := structs.ServiceNameFromString(resourceID) sn.OverridePartition(partition) - return s.handleDeleteService(peerName, partition, sn) + return s.handleUpdateService(peerName, partition, sn, nil) default: return fmt.Errorf("unexpected resourceURL: %s", resourceURL) } } -func (s *Service) handleDeleteService( - peerName string, - partition string, - sn structs.ServiceName, -) error { - // Deregister: ServiceID == DeleteService ANd checks - // Deregister: ServiceID(empty) CheckID(empty) == DeleteNode - - // TODO(peering): implement - return nil -} - func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeering.ReplicationMessage { var rpcErr *pbstatus.Status if errCode != code.Code_OK || errMsg != "" { diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index acbd2e6f9c..c57710c548 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -130,6 +130,8 @@ type Store interface { PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) + CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) + NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) AbandonCh() <-chan struct{} @@ -142,6 +144,7 @@ type Apply interface { PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error CatalogRegister(req *structs.RegisterRequest) error + CatalogDeregister(req *structs.DeregisterRequest) error } // GenerateToken implements the PeeringService RPC method to generate a diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index 27fc3055e1..dc30fa6868 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -18,16 +18,19 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbstatus" "github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/hashicorp/consul/types" ) func TestStreamResources_Server_Follower(t *testing.T) { @@ -949,6 +952,7 @@ func makeClient( type testStreamBackend struct { pub state.EventPublisher store *state.Store + applier *testApplier leader func() bool leaderAddress *leaderAddress } @@ -1019,6 +1023,49 @@ func (b *testStreamBackend) EnterpriseCheckNamespaces(_ string) error { } func (b *testStreamBackend) Apply() Apply { + return b.applier +} + +type testApplier struct { + store *state.Store +} + +func (a *testApplier) PeeringWrite(req *pbpeering.PeeringWriteRequest) error { + panic("not implemented") +} + +func (a *testApplier) PeeringDelete(req *pbpeering.PeeringDeleteRequest) error { + panic("not implemented") +} + +func (a *testApplier) PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error { + panic("not implemented") +} + +func (a *testApplier) PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error { + panic("not implemented") +} + +// CatalogRegister mocks catalog registrations through Raft by copying the logic of FSM.applyRegister. +func (a *testApplier) CatalogRegister(req *structs.RegisterRequest) error { + return a.store.EnsureRegistration(1, req) +} + +// CatalogDeregister mocks catalog de-registrations through Raft by copying the logic of FSM.applyDeregister. +func (a *testApplier) CatalogDeregister(req *structs.DeregisterRequest) error { + if req.ServiceID != "" { + if err := a.store.DeleteService(1, req.Node, req.ServiceID, &req.EnterpriseMeta, req.PeerName); err != nil { + return err + } + } else if req.CheckID != "" { + if err := a.store.DeleteCheck(1, req.Node, req.CheckID, &req.EnterpriseMeta, req.PeerName); err != nil { + return err + } + } else { + if err := a.store.DeleteNode(1, req.Node, &req.EnterpriseMeta, req.PeerName); err != nil { + return err + } + } return nil } @@ -1273,3 +1320,854 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test checkFns[i](t, out[i]) } } + +func TestHandleUpdateService(t *testing.T) { + publisher := stream.NewEventPublisher(10 * time.Second) + store := newStateStore(t, publisher) + + srv := NewService( + testutil.Logger(t), + Config{ + Datacenter: "dc1", + ConnectEnabled: true, + }, + &testStreamBackend{ + store: store, + applier: &testApplier{store: store}, + pub: publisher, + leader: func() bool { + return false + }, + }, + ) + + type testCase struct { + name string + seed []*structs.RegisterRequest + input *pbservice.IndexedCheckServiceNodes + expect map[string]structs.CheckServiceNodes + } + + peerName := "billing" + remoteMeta := pbcommon.NewEnterpriseMetaFromStructs(*structs.DefaultEnterpriseMetaInPartition("billing-ap")) + + // "api" service is imported from the billing-ap partition, corresponding to the billing peer. + // Locally it is stored to the default partition. + defaultMeta := *acl.DefaultEnterpriseMeta() + apiSN := structs.NewServiceName("api", &defaultMeta) + + run := func(t *testing.T, tc testCase) { + // Seed the local catalog with some data to reconcile against. + for _, reg := range tc.seed { + require.NoError(t, srv.Backend.Apply().CatalogRegister(reg)) + } + + // Simulate an update arriving for billing/api. + require.NoError(t, srv.handleUpdateService(peerName, acl.DefaultPartitionName, apiSN, tc.input)) + + for svc, expect := range tc.expect { + t.Run(svc, func(t *testing.T) { + _, got, err := srv.Backend.Store().CheckServiceNodes(nil, svc, &defaultMeta, peerName) + require.NoError(t, err) + requireEqualInstances(t, expect, got) + }) + } + } + + tt := []testCase{ + { + name: "upsert two service instances to the same node", + input: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + }, + }, + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-2", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + { + CheckID: "api-2-check", + ServiceID: "api-2", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + expect: map[string]structs.CheckServiceNodes{ + "api": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + + // The remote billing-ap partition is overwritten for all resources with the local default. + Partition: defaultMeta.PartitionOrEmpty(), + + // The name of the peer "billing" is attached as well. + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-2", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + { + CheckID: "api-2-check", + ServiceID: "api-2", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + }, + { + name: "upsert two service instances to different nodes", + input: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + }, + }, + { + Node: &pbservice.Node{ + ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2", + Node: "node-bar", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-2", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "node-bar-check", + Node: "node-bar", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + { + CheckID: "api-2-check", + ServiceID: "api-2", + Node: "node-bar", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + expect: map[string]structs.CheckServiceNodes{ + "api": { + { + Node: &structs.Node{ + ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2", + Node: "node-bar", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-2", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-bar-check", + Node: "node-bar", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + { + CheckID: "api-2-check", + ServiceID: "api-2", + Node: "node-bar", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + + // The remote billing-ap partition is overwritten for all resources with the local default. + Partition: defaultMeta.PartitionOrEmpty(), + + // The name of the peer "billing" is attached as well. + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + }, + { + name: "receiving a nil input leads to deleting data in the catalog", + seed: []*structs.RegisterRequest{ + { + ID: types.NodeID("c0f97de9-4e1b-4e80-a1c6-cd8725835ab2"), + Node: "node-bar", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-2", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-bar", + ServiceID: "api-2", + CheckID: types.CheckID("api-2-check"), + PeerName: peerName, + }, + { + Node: "node-bar", + CheckID: types.CheckID("node-bar-check"), + PeerName: peerName, + }, + }, + }, + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "api-1", + CheckID: types.CheckID("api-1-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + }, + input: nil, + expect: map[string]structs.CheckServiceNodes{ + "api": {}, + }, + }, + { + name: "deleting one service name from a node does not delete other service names", + seed: []*structs.RegisterRequest{ + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "redis-2", + CheckID: types.CheckID("redis-2-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "api-1", + CheckID: types.CheckID("api-1-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + }, + // Nil input is for the "api" service. + input: nil, + expect: map[string]structs.CheckServiceNodes{ + "api": {}, + // Existing redis service was not affected by deletion. + "redis": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "node-foo-check", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + { + CheckID: "redis-2-check", + ServiceID: "redis-2", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + }, + { + name: "service checks are cleaned up when not present in a response", + seed: []*structs.RegisterRequest{ + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "api-1", + CheckID: types.CheckID("api-1-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + }, + input: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + // Service check was deleted + }, + }, + }, + }, + expect: map[string]structs.CheckServiceNodes{ + // Service check should be gone + "api": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{}, + }, + }, + }, + }, + { + name: "node checks are cleaned up when not present in a response", + seed: []*structs.RegisterRequest{ + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "redis-2", + CheckID: types.CheckID("redis-2-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "api-1", + CheckID: types.CheckID("api-1-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + }, + input: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + Service: &pbservice.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + // Node check was deleted + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + expect: map[string]structs.CheckServiceNodes{ + // Node check should be gone + "api": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "api-1-check", + ServiceID: "api-1", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + "redis": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + CheckID: "redis-2-check", + ServiceID: "redis-2", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + }, + { + name: "replacing a service instance on a node cleans up the old instance", + seed: []*structs.RegisterRequest{ + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "redis-2", + CheckID: types.CheckID("redis-2-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + { + ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), + Node: "node-foo", + PeerName: peerName, + Service: &structs.NodeService{ + ID: "api-1", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: structs.HealthChecks{ + { + Node: "node-foo", + ServiceID: "api-1", + CheckID: types.CheckID("api-1-check"), + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: types.CheckID("node-foo-check"), + PeerName: peerName, + }, + }, + }, + }, + input: &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + { + Node: &pbservice.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: remoteMeta.Partition, + PeerName: peerName, + }, + // New service ID and checks for the api service. + Service: &pbservice.NodeService{ + ID: "new-api-v2", + Service: "api", + EnterpriseMeta: remoteMeta, + PeerName: peerName, + }, + Checks: []*pbservice.HealthCheck{ + { + Node: "node-foo", + ServiceID: "new-api-v2", + CheckID: "new-api-v2-check", + PeerName: peerName, + }, + { + Node: "node-foo", + CheckID: "node-foo-check", + PeerName: peerName, + }, + }, + }, + }, + }, + expect: map[string]structs.CheckServiceNodes{ + "api": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "new-api-v2", + Service: "api", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + Node: "node-foo", + CheckID: "node-foo-check", + PeerName: peerName, + }, + { + CheckID: "new-api-v2-check", + ServiceID: "new-api-v2", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + "redis": { + { + Node: &structs.Node{ + ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", + Node: "node-foo", + Partition: defaultMeta.PartitionOrEmpty(), + PeerName: peerName, + }, + Service: &structs.NodeService{ + ID: "redis-2", + Service: "redis", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + Checks: []*structs.HealthCheck{ + { + Node: "node-foo", + CheckID: "node-foo-check", + PeerName: peerName, + }, + { + CheckID: "redis-2-check", + ServiceID: "redis-2", + Node: "node-foo", + EnterpriseMeta: defaultMeta, + PeerName: peerName, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes) { + t.Helper() + + require.Equal(t, len(expect), len(got), "got differing number of instances") + + for i := range expect { + // Node equality + require.Equal(t, expect[i].Node.ID, got[i].Node.ID, "node mismatch") + require.Equal(t, expect[i].Node.Partition, got[i].Node.Partition, "partition mismatch") + require.Equal(t, expect[i].Node.PeerName, got[i].Node.PeerName, "peer name mismatch") + + // Service equality + require.Equal(t, expect[i].Service.ID, got[i].Service.ID, "service id mismatch") + require.Equal(t, expect[i].Service.PeerName, got[i].Service.PeerName, "peer name mismatch") + require.Equal(t, expect[i].Service.PartitionOrDefault(), got[i].Service.PartitionOrDefault(), "partition mismatch") + + // Check equality + require.Equal(t, len(expect[i].Checks), len(got[i].Checks), "got differing number of check") + + for j := range expect[i].Checks { + require.Equal(t, expect[i].Checks[j].CheckID, got[i].Checks[j].CheckID, "check id mismatch") + require.Equal(t, expect[i].Checks[j].PeerName, got[i].Checks[j].PeerName, "peer name mismatch") + require.Equal(t, expect[i].Checks[j].PartitionOrDefault(), got[i].Checks[j].PartitionOrDefault(), "partition mismatch") + } + } + +} diff --git a/proto/pbservice/convert.go b/proto/pbservice/convert.go index d5233dd990..02895adf96 100644 --- a/proto/pbservice/convert.go +++ b/proto/pbservice/convert.go @@ -1,6 +1,8 @@ package pbservice import ( + "fmt" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/types" @@ -42,6 +44,23 @@ func NewMapHeadersFromStructs(t map[string][]string) map[string]*HeaderValue { return s } +// CheckServiceNodesToStruct converts the contained CheckServiceNodes to their structs equivalent. +func (s *IndexedCheckServiceNodes) CheckServiceNodesToStruct() ([]structs.CheckServiceNode, error) { + if s == nil { + return nil, nil + } + + resp := make([]structs.CheckServiceNode, 0, len(s.Nodes)) + for _, pb := range s.Nodes { + instance, err := CheckServiceNodeToStructs(pb) + if err != nil { + return resp, fmt.Errorf("failed to convert instance: %w", err) + } + resp = append(resp, *instance) + } + return resp, nil +} + // TODO: use mog once it supports pointers and slices func CheckServiceNodeToStructs(s *CheckServiceNode) (*structs.CheckServiceNode, error) { if s == nil {