From 50ef6a697ec4a81a364c442f035a9a2b7c2a676a Mon Sep 17 00:00:00 2001 From: Derek Menteer <105233703+hashi-derek@users.noreply.github.com> Date: Mon, 8 May 2023 13:13:25 -0500 Subject: [PATCH] Fix issue with peer stream node cleanup. (#17235) Fix issue with peer stream node cleanup. This commit encompasses a few problems that are closely related due to their proximity in the code. 1. The peerstream utilizes node IDs in several locations to determine which nodes / services / checks should be cleaned up or created. While VM deployments with agents will likely always have a node ID, agentless uses synthetic nodes and does not populate the field. This means that for consul-k8s deployments, all services were likely bundled together into the same synthetic node in some code paths (but not all), resulting in strange behavior. The Node.Node field should be used instead as a unique identifier, as it should always be populated. 2. The peerstream cleanup process for unused nodes uses an incorrect query for node deregistration. This query is NOT namespace aware and results in the node (and corresponding services) being deregistered prematurely whenever it has zero default-namespace services and 1+ non-default-namespace services registered on it. This issue is tricky to find due to the incorrect logic mentioned in #1, combined with the fact that the affected services must be co-located on the same node as the currently deregistering service for this to be encountered. 3. The stream tracker did not understand differences between services in different namespaces and could therefore report incorrect numbers. It was updated to utilize the full service name to avoid conflicts and return proper results. --- .changelog/17235.txt | 3 + .../services/peerstream/replication.go | 39 ++- .../services/peerstream/server.go | 2 +- .../services/peerstream/stream_test.go | 320 ++++++++++-------- .../services/peerstream/stream_tracker.go | 5 +- 5 files changed, 201 insertions(+), 168 deletions(-) create mode 100644 .changelog/17235.txt diff --git a/.changelog/17235.txt b/.changelog/17235.txt new file mode 100644 index 0000000000..3356b715ef --- /dev/null +++ b/.changelog/17235.txt @@ -0,0 +1,3 @@ +```release-note:bug +peering: Fix issue where peer streams could incorrectly deregister services in various scenarios. +``` diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index 8f1491dadb..29d8db9dc6 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -13,6 +13,7 @@ import ( newproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" @@ -341,7 +342,7 @@ func (s *Server) handleUpdateService( for _, nodeSnap := range snap.Nodes { // First register the node - skip the unchanged ones changed := true - if storedNode, ok := storedNodesMap[nodeSnap.Node.ID]; ok { + if storedNode, ok := storedNodesMap[nodeSnap.Node.Node]; ok { if storedNode.IsSame(nodeSnap.Node) { changed = false } @@ -357,7 +358,7 @@ func (s *Server) handleUpdateService( // Then register all services on that node - skip the unchanged ones for _, svcSnap := range nodeSnap.Services { changed = true - if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.ID, svcSnap.Service.ID)]; ok { + if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.Node, svcSnap.Service.ID)]; ok { if storedSvcInst.IsSame(svcSnap.Service) { changed = false } @@ -377,7 +378,7 @@ func (s *Server) handleUpdateService( for _, svcSnap := range nodeSnap.Services { for _, c := range svcSnap.Checks { changed := true - if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.ID, svcSnap.Service.ID, c.CheckID)]; ok { + if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.Node, svcSnap.Service.ID, c.CheckID)]; ok { if chk.IsSame(c) { changed = false } @@ -515,8 +516,10 @@ func (s *Server) handleUpdateService( // Delete any nodes that do not have any other services registered on them. for node := range unusedNodes { - nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()) - _, ns, err := s.GetStore().NodeServices(nil, node, nodeMeta, peerName) + // The wildcard is used here so that all services, regardless of namespace are returned + // by the following query. Without this, the node might accidentally be cleaned up early. + wildcardNSMeta := acl.NewEnterpriseMetaWithPartition(sn.PartitionOrDefault(), acl.WildcardName) + _, ns, err := s.GetStore().NodeServiceList(nil, node, &wildcardNSMeta, peerName) if err != nil { return fmt.Errorf("failed to query services on node: %w", err) } @@ -529,10 +532,10 @@ func (s *Server) handleUpdateService( err = s.Backend.CatalogDeregister(&structs.DeregisterRequest{ Node: node, PeerName: peerName, - EnterpriseMeta: *nodeMeta, + EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault()), }) if err != nil { - ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node) + ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", sn.PartitionOrDefault(), peerName, node) return fmt.Errorf("failed to deregister node %q: %w", ident, err) } } @@ -635,31 +638,35 @@ type nodeCheckIdentity struct { checkID string } -func makeNodeSvcInstID(nodeID types.NodeID, serviceID string) nodeSvcInstIdentity { +func makeNodeSvcInstID(node string, serviceID string) nodeSvcInstIdentity { return nodeSvcInstIdentity{ - nodeID: string(nodeID), + nodeID: node, serviceID: serviceID, } } -func makeNodeCheckID(nodeID types.NodeID, serviceID string, checkID types.CheckID) nodeCheckIdentity { +func makeNodeCheckID(node string, serviceID string, checkID types.CheckID) nodeCheckIdentity { return nodeCheckIdentity{ serviceID: serviceID, checkID: string(checkID), - nodeID: string(nodeID), + nodeID: node, } } -func buildStoredMap(storedInstances structs.CheckServiceNodes) (map[types.NodeID]*structs.Node, map[nodeSvcInstIdentity]*structs.NodeService, map[nodeCheckIdentity]*structs.HealthCheck) { - nodesMap := map[types.NodeID]*structs.Node{} +func buildStoredMap(storedInstances structs.CheckServiceNodes) ( + map[string]*structs.Node, + map[nodeSvcInstIdentity]*structs.NodeService, + map[nodeCheckIdentity]*structs.HealthCheck, +) { + nodesMap := map[string]*structs.Node{} svcInstMap := map[nodeSvcInstIdentity]*structs.NodeService{} checksMap := map[nodeCheckIdentity]*structs.HealthCheck{} for _, csn := range storedInstances { - nodesMap[csn.Node.ID] = csn.Node - svcInstMap[makeNodeSvcInstID(csn.Node.ID, csn.Service.ID)] = csn.Service + nodesMap[csn.Node.Node] = csn.Node + svcInstMap[makeNodeSvcInstID(csn.Node.Node, csn.Service.ID)] = csn.Service for _, chk := range csn.Checks { - checksMap[makeNodeCheckID(csn.Node.ID, csn.Service.ID, chk.CheckID)] = chk + checksMap[makeNodeCheckID(csn.Node.Node, csn.Service.ID, chk.CheckID)] = chk } } return nodesMap, svcInstMap, checksMap diff --git a/agent/grpc-external/services/peerstream/server.go b/agent/grpc-external/services/peerstream/server.go index e0dd4d759d..58e436bd1f 100644 --- a/agent/grpc-external/services/peerstream/server.go +++ b/agent/grpc-external/services/peerstream/server.go @@ -122,7 +122,7 @@ type StateStore interface { ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) - NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error) + NodeServiceList(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServiceList, error) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 140ae947dc..0a7d6812ab 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1594,7 +1594,11 @@ func Test_ExportedServicesCount(t *testing.T) { mst, err := srv.Tracker.Connected(peerID) require.NoError(t, err) - services := []string{"web", "api", "mongo"} + services := []string{ + structs.NewServiceName("web", nil).String(), + structs.NewServiceName("api", nil).String(), + structs.NewServiceName("mongo", nil).String(), + } update := cache.UpdateEvent{ CorrelationID: subExportedServiceList, Result: &pbpeerstream.ExportedServiceList{ @@ -1938,36 +1942,30 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test } } -func Test_processResponse_ExportedServiceUpdates(t *testing.T) { - srv, store := newTestServer(t, func(c *Config) { - backend := c.Backend.(*testStreamBackend) - backend.leader = func() bool { - return false - } - }) - - type testCase struct { - name string - seed []*structs.RegisterRequest - input *pbpeerstream.ExportedService - expect map[string]structs.CheckServiceNodes - exportedServices []string - } - - peerName := "billing" - peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5" - remoteMeta := pbcommon.NewEnterpriseMetaFromStructs(*structs.DefaultEnterpriseMetaInPartition("billing-ap")) - - // "api" service is imported from the billing-ap partition, corresponding to the billing peer. - // Locally it is stored to the default partition. - defaultMeta := *acl.DefaultEnterpriseMeta() - apiSN := structs.NewServiceName("api", &defaultMeta) +type PeeringProcessResponse_testCase struct { + name string + seed []*structs.RegisterRequest + inputServiceName structs.ServiceName + input *pbpeerstream.ExportedService + expect map[structs.ServiceName]structs.CheckServiceNodes + exportedServices []string +} +func processResponse_ExportedServiceUpdates( + t *testing.T, + srv *testServer, + store *state.Store, + localEntMeta acl.EnterpriseMeta, + peerName string, + tests []PeeringProcessResponse_testCase, +) { // create a peering in the state store + peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5" require.NoError(t, store.PeeringWrite(31, &pbpeering.PeeringWriteRequest{ Peering: &pbpeering.Peering{ - ID: peerID, - Name: peerName, + ID: peerID, + Name: peerName, + Partition: localEntMeta.PartitionOrDefault(), }, })) @@ -1975,7 +1973,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { mst, err := srv.Tracker.Connected(peerID) require.NoError(t, err) - run := func(t *testing.T, tc testCase) { + run := func(t *testing.T, tc PeeringProcessResponse_testCase) { // Seed the local catalog with some data to reconcile against. // and increment the tracker's imported services count var serviceNames []structs.ServiceName @@ -1989,14 +1987,14 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { in := &pbpeerstream.ReplicationMessage_Response{ ResourceURL: pbpeerstream.TypeURLExportedService, - ResourceID: apiSN.String(), + ResourceID: tc.inputServiceName.String(), Nonce: "1", Operation: pbpeerstream.Operation_OPERATION_UPSERT, Resource: makeAnyPB(t, tc.input), } // Simulate an update arriving for billing/api. - _, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, in) + _, err = srv.processResponse(peerName, localEntMeta.PartitionOrDefault(), mst, in) require.NoError(t, err) if len(tc.exportedServices) > 0 { @@ -2009,63 +2007,81 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { } // Simulate an update arriving for billing/api. - _, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, resp) + _, err = srv.processResponse(peerName, localEntMeta.PartitionOrDefault(), mst, resp) require.NoError(t, err) // Test the count and contents separately to ensure the count code path is hit. require.Equal(t, mst.GetImportedServicesCount(), len(tc.exportedServices)) require.ElementsMatch(t, mst.ImportedServices, tc.exportedServices) } - _, allServices, err := srv.GetStore().ServiceList(nil, &defaultMeta, peerName) + wildcardNS := acl.NewEnterpriseMetaWithPartition(localEntMeta.PartitionOrDefault(), acl.WildcardName) + _, allServices, err := srv.GetStore().ServiceList(nil, &wildcardNS, peerName) require.NoError(t, err) // This ensures that only services specified under tc.expect are stored. It includes // all exported services plus their sidecar proxies. for _, svc := range allServices { - _, ok := tc.expect[svc.Name] + _, ok := tc.expect[svc] require.True(t, ok) } for svc, expect := range tc.expect { - t.Run(svc, func(t *testing.T) { - _, got, err := srv.GetStore().CheckServiceNodes(nil, svc, &defaultMeta, peerName) + t.Run(svc.String(), func(t *testing.T) { + _, got, err := srv.GetStore().CheckServiceNodes(nil, svc.Name, &svc.EnterpriseMeta, peerName) require.NoError(t, err) requireEqualInstances(t, expect, got) }) } } - tt := []testCase{ + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func Test_processResponse_ExportedServiceUpdates(t *testing.T) { + peerName := "billing" + localEntMeta := *acl.DefaultEnterpriseMeta() + + remoteMeta := *structs.DefaultEnterpriseMetaInPartition("billing-ap") + pbRemoteMeta := pbcommon.NewEnterpriseMetaFromStructs(remoteMeta) + + apiLocalSN := structs.NewServiceName("api", &localEntMeta) + redisLocalSN := structs.NewServiceName("redis", &localEntMeta) + tests := []PeeringProcessResponse_testCase{ { name: "upsert two service instances to the same node", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, @@ -2074,42 +2090,42 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, }, }, }, - expect: map[string]structs.CheckServiceNodes{ - "api": { + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", // The remote billing-ap partition is overwritten for all resources with the local default. - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), // The name of the peer "billing" is attached as well. PeerName: peerName, @@ -2117,21 +2133,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2140,27 +2156,27 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2170,7 +2186,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "deleting a service with an empty exported service event", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2179,7 +2195,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2197,41 +2213,43 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, - input: &pbpeerstream.ExportedService{}, - expect: map[string]structs.CheckServiceNodes{ - "api": {}, + inputServiceName: structs.NewServiceName("api", &remoteMeta), + input: &pbpeerstream.ExportedService{}, + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): {}, }, }, { name: "upsert two service instances to different nodes", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, @@ -2240,60 +2258,60 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: &pbservice.Node{ ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2", Node: "node-bar", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ { CheckID: "node-bar-check", Node: "node-bar", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-bar", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, }, }, }, - expect: map[string]structs.CheckServiceNodes{ - "api": { + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "c0f97de9-4e1b-4e80-a1c6-cd8725835ab2", Node: "node-bar", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-bar-check", Node: "node-bar", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-2-check", ServiceID: "api-2", Node: "node-bar", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2304,7 +2322,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Node: "node-foo", // The remote billing-ap partition is overwritten for all resources with the local default. - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), // The name of the peer "billing" is attached as well. PeerName: peerName, @@ -2312,21 +2330,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2336,7 +2354,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "deleting one service name from a node does not delete other service names", - exportedServices: []string{"api", "redis"}, + exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2345,7 +2363,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2369,7 +2387,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2387,37 +2405,38 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), // Nil input is for the "api" service. input: &pbpeerstream.ExportedService{}, - expect: map[string]structs.CheckServiceNodes{ - "api": {}, + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): {}, // Existing redis service was not affected by deletion. - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2435,7 +2454,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2459,7 +2478,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2-sidecar-proxy", Service: "redis-sidecar-proxy", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2483,7 +2502,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2507,7 +2526,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1-sidecar-proxy", Service: "api-sidecar-proxy", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2526,68 +2545,69 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), // Nil input is for the "api" service. input: &pbpeerstream.ExportedService{}, - exportedServices: []string{"redis"}, - expect: map[string]structs.CheckServiceNodes{ + exportedServices: []string{redisLocalSN.String()}, + expect: map[structs.ServiceName]structs.CheckServiceNodes{ // Existing redis service was not affected by deletion. - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, }, }, - "redis-sidecar-proxy": { + structs.NewServiceName("redis-sidecar-proxy", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2-sidecar-proxy", Service: "redis-sidecar-proxy", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ { CheckID: "node-foo-check", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, { CheckID: "redis-2-sidecar-proxy-check", ServiceID: "redis-2-sidecar-proxy", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2597,7 +2617,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "service checks are cleaned up when not present in a response", - exportedServices: []string{"api"}, + exportedServices: []string{apiLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2606,7 +2626,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2624,19 +2644,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ @@ -2645,20 +2666,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, - expect: map[string]structs.CheckServiceNodes{ + expect: map[structs.ServiceName]structs.CheckServiceNodes{ // Service check should be gone - "api": { + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{}, @@ -2668,7 +2689,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "node checks are cleaned up when not present in a response", - exportedServices: []string{"api", "redis"}, + exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2677,7 +2698,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2701,7 +2722,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2719,19 +2740,20 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, Service: &pbservice.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ @@ -2740,27 +2762,27 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, }, }, }, }, - expect: map[string]structs.CheckServiceNodes{ + expect: map[structs.ServiceName]structs.CheckServiceNodes{ // Node check should be gone - "api": { + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2768,24 +2790,24 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "api-1-check", ServiceID: "api-1", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, }, }, - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2793,7 +2815,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2803,7 +2825,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, { name: "replacing a service instance on a node cleans up the old instance", - exportedServices: []string{"api", "redis"}, + exportedServices: []string{apiLocalSN.String(), redisLocalSN.String()}, seed: []*structs.RegisterRequest{ { ID: types.NodeID("af913374-68ea-41e5-82e8-6ffd3dffc461"), @@ -2812,7 +2834,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2836,7 +2858,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { Service: &structs.NodeService{ ID: "api-1", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: structs.HealthChecks{ @@ -2854,20 +2876,21 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, + inputServiceName: structs.NewServiceName("api", &remoteMeta), input: &pbpeerstream.ExportedService{ Nodes: []*pbservice.CheckServiceNode{ { Node: &pbservice.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: remoteMeta.Partition, + Partition: pbRemoteMeta.Partition, PeerName: peerName, }, // New service ID and checks for the api service. Service: &pbservice.NodeService{ ID: "new-api-v2", Service: "api", - EnterpriseMeta: remoteMeta, + EnterpriseMeta: pbRemoteMeta, PeerName: peerName, }, Checks: []*pbservice.HealthCheck{ @@ -2886,19 +2909,19 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, }, - expect: map[string]structs.CheckServiceNodes{ - "api": { + expect: map[structs.ServiceName]structs.CheckServiceNodes{ + structs.NewServiceName("api", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "new-api-v2", Service: "api", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2911,24 +2934,24 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "new-api-v2-check", ServiceID: "new-api-v2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, }, }, - "redis": { + structs.NewServiceName("redis", &localEntMeta): { { Node: &structs.Node{ ID: "af913374-68ea-41e5-82e8-6ffd3dffc461", Node: "node-foo", - Partition: defaultMeta.PartitionOrEmpty(), + Partition: localEntMeta.PartitionOrEmpty(), PeerName: peerName, }, Service: &structs.NodeService{ ID: "redis-2", Service: "redis", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, Checks: []*structs.HealthCheck{ @@ -2941,7 +2964,7 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { CheckID: "redis-2-check", ServiceID: "redis-2", Node: "node-foo", - EnterpriseMeta: defaultMeta, + EnterpriseMeta: localEntMeta, PeerName: peerName, }, }, @@ -2950,12 +2973,13 @@ func Test_processResponse_ExportedServiceUpdates(t *testing.T) { }, }, } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - run(t, tc) - }) - } + srv, store := newTestServer(t, func(c *Config) { + backend := c.Backend.(*testStreamBackend) + backend.leader = func() bool { + return false + } + }) + processResponse_ExportedServiceUpdates(t, srv, store, localEntMeta, peerName, tests) } // TestLogTraceProto tests that all PB trace log helpers redact the diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index a93457b925..abb5a003a3 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -372,9 +372,8 @@ func (s *MutableStatus) SetImportedServices(serviceNames []structs.ServiceName) defer s.mu.Unlock() s.ImportedServices = make([]string, len(serviceNames)) - for i, sn := range serviceNames { - s.ImportedServices[i] = sn.Name + s.ImportedServices[i] = sn.String() } } @@ -392,7 +391,7 @@ func (s *MutableStatus) SetExportedServices(serviceNames []structs.ServiceName) s.ExportedServices = make([]string, len(serviceNames)) for i, sn := range serviceNames { - s.ExportedServices[i] = sn.Name + s.ExportedServices[i] = sn.String() } }