From 5a03536040d08087e0d25ef75fd9f3e525c12ce7 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Thu, 19 May 2022 14:21:29 -0500 Subject: [PATCH 1/2] prefactor some functions out of the monolithic file --- agent/rpc/peering/replication.go | 247 +++++++++++++++++++++++++++++++ agent/rpc/peering/service.go | 233 ----------------------------- 2 files changed, 247 insertions(+), 233 deletions(-) create mode 100644 agent/rpc/peering/replication.go diff --git a/agent/rpc/peering/replication.go b/agent/rpc/peering/replication.go new file mode 100644 index 0000000000..e820d959fe --- /dev/null +++ b/agent/rpc/peering/replication.go @@ -0,0 +1,247 @@ +package peering + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "github.com/golang/protobuf/ptypes" + "github.com/hashicorp/go-hclog" + "google.golang.org/genproto/googleapis/rpc/code" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" + "github.com/hashicorp/consul/proto/pbservice" + "github.com/hashicorp/consul/proto/pbstatus" + "github.com/hashicorp/consul/types" +) + +// pushService response handles sending exported service instance updates to the peer cluster. +// Each cache.UpdateEvent will contain all instances for a service name. +// If there are no instances in the event, we consider that to be a de-registration. +func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status *lockableStreamStatus, update cache.UpdateEvent) error { + csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) + if !ok { + logger.Error(fmt.Sprintf("invalid type for response: %T, expected *pbservice.IndexedCheckServiceNodes", update.Result)) + + // Skip this update to avoid locking up peering due to a bad service update. + return nil + } + serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService) + + // If no nodes are present then it's due to one of: + // 1. The service is newly registered or exported and yielded a transient empty update. + // 2. All instances of the service were de-registered. + // 3. The service was un-exported. + // + // We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that. + // Case #1 is a no-op for the importing peer. + if len(csn.Nodes) == 0 { + resp := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Response_{ + Response: &pbpeering.ReplicationMessage_Response{ + ResourceURL: pbpeering.TypeURLService, + // TODO(peering): Nonce management + Nonce: "", + ResourceID: serviceName, + Operation: pbpeering.ReplicationMessage_Response_DELETE, + }, + }, + } + logTraceSend(logger, resp) + if err := stream.Send(resp); err != nil { + status.trackSendError(err.Error()) + return fmt.Errorf("failed to send to stream: %v", err) + } + return nil + } + + // If there are nodes in the response, we push them as an UPSERT operation. + any, err := ptypes.MarshalAny(csn) + if err != nil { + // Log the error and skip this response to avoid locking up peering due to a bad update event. + logger.Error("failed to marshal service endpoints", "error", err) + return nil + } + resp := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Response_{ + Response: &pbpeering.ReplicationMessage_Response{ + ResourceURL: pbpeering.TypeURLService, + // TODO(peering): Nonce management + Nonce: "", + ResourceID: serviceName, + Operation: pbpeering.ReplicationMessage_Response_UPSERT, + Resource: any, + }, + }, + } + logTraceSend(logger, resp) + if err := stream.Send(resp); err != nil { + status.trackSendError(err.Error()) + return fmt.Errorf("failed to send to stream: %v", err) + } + return nil +} + +func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) { + var ( + err error + errCode code.Code + errMsg string + ) + + if resp.ResourceURL != pbpeering.TypeURLService { + errCode = code.Code_INVALID_ARGUMENT + err = fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL) + return makeReply(resp.ResourceURL, resp.Nonce, errCode, err.Error()), err + } + + switch resp.Operation { + case pbpeering.ReplicationMessage_Response_UPSERT: + if resp.Resource == nil { + break + } + err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource) + if err != nil { + errCode = code.Code_INTERNAL + errMsg = err.Error() + } + + case pbpeering.ReplicationMessage_Response_DELETE: + err = handleDelete(resp.ResourceURL, resp.ResourceID) + if err != nil { + errCode = code.Code_INTERNAL + errMsg = err.Error() + } + + default: + errCode = code.Code_INVALID_ARGUMENT + + op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)] + if op == "" { + op = strconv.FormatInt(int64(resp.Operation), 10) + } + errMsg = fmt.Sprintf("unsupported operation: %q", op) + + err = errors.New(errMsg) + } + + return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err +} + +func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error { + csn := &pbservice.IndexedCheckServiceNodes{} + err := ptypes.UnmarshalAny(resource, csn) + if err != nil { + return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + if csn == nil || len(csn.Nodes) == 0 { + return nil + } + + type checkTuple struct { + checkID types.CheckID + serviceID string + nodeID types.NodeID + + acl.EnterpriseMeta + } + + var ( + nodes = make(map[types.NodeID]*structs.Node) + services = make(map[types.NodeID][]*structs.NodeService) + checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck) + ) + + for _, pbinstance := range csn.Nodes { + instance, err := pbservice.CheckServiceNodeToStructs(pbinstance) + if err != nil { + return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + + nodes[instance.Node.ID] = instance.Node + services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service) + + if _, ok := checks[instance.Node.ID]; !ok { + checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck) + } + for _, c := range instance.Checks { + tuple := checkTuple{ + checkID: c.CheckID, + serviceID: c.ServiceID, + nodeID: instance.Node.ID, + EnterpriseMeta: c.EnterpriseMeta, + } + checks[instance.Node.ID][tuple] = c + } + } + + for nodeID, node := range nodes { + // For all nodes, services, and checks we override the peer name and partition to be + // the local partition and local name for the peer. + node.PeerName, node.Partition = peerName, partition + + // First register the node + req := node.ToRegisterRequest() + if err := s.Backend.Apply().CatalogRegister(&req); err != nil { + return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + + // Then register all services on that node + for _, svc := range services[nodeID] { + svc.PeerName = peerName + svc.OverridePartition(partition) + + req.Service = svc + if err := s.Backend.Apply().CatalogRegister(&req); err != nil { + return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + } + req.Service = nil + + // Then register all checks on that node + var chks structs.HealthChecks + for _, c := range checks[nodeID] { + c.PeerName = peerName + c.OverridePartition(partition) + + chks = append(chks, c) + } + + req.Checks = chks + if err := s.Backend.Apply().CatalogRegister(&req); err != nil { + return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + } + return nil +} + +func handleDelete(resourceURL string, resourceID string) error { + // TODO(peering): implement + return nil +} + +func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeering.ReplicationMessage { + var rpcErr *pbstatus.Status + if errCode != code.Code_OK || errMsg != "" { + rpcErr = &pbstatus.Status{ + Code: int32(errCode), + Message: errMsg, + } + } + + msg := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Request_{ + Request: &pbpeering.ReplicationMessage_Request{ + ResourceURL: resourceURL, + Nonce: nonce, + Error: rpcErr, + }, + }, + } + return msg +} diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 9f15960ce3..a3955bb30c 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -5,32 +5,25 @@ import ( "errors" "fmt" "io" - "strconv" "strings" "time" "github.com/armon/go-metrics" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/grpc" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/dns" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" - "github.com/hashicorp/consul/proto/pbservice" - "github.com/hashicorp/consul/proto/pbstatus" - "github.com/hashicorp/consul/types" ) var ( @@ -651,73 +644,6 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { } } -// pushService response handles sending exported service instance updates to the peer cluster. -// Each cache.UpdateEvent will contain all instances for a service name. -// If there are no instances in the event, we consider that to be a de-registration. -func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status *lockableStreamStatus, update cache.UpdateEvent) error { - csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) - if !ok { - logger.Error(fmt.Sprintf("invalid type for response: %T, expected *pbservice.IndexedCheckServiceNodes", update.Result)) - - // Skip this update to avoid locking up peering due to a bad service update. - return nil - } - serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService) - - // If no nodes are present then it's due to one of: - // 1. The service is newly registered or exported and yielded a transient empty update. - // 2. All instances of the service were de-registered. - // 3. The service was un-exported. - // - // We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that. - // Case #1 is a no-op for the importing peer. - if len(csn.Nodes) == 0 { - resp := &pbpeering.ReplicationMessage{ - Payload: &pbpeering.ReplicationMessage_Response_{ - Response: &pbpeering.ReplicationMessage_Response{ - ResourceURL: pbpeering.TypeURLService, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: serviceName, - Operation: pbpeering.ReplicationMessage_Response_DELETE, - }, - }, - } - logTraceSend(logger, resp) - if err := stream.Send(resp); err != nil { - status.trackSendError(err.Error()) - return fmt.Errorf("failed to send to stream: %v", err) - } - return nil - } - - // If there are nodes in the response, we push them as an UPSERT operation. - any, err := ptypes.MarshalAny(csn) - if err != nil { - // Log the error and skip this response to avoid locking up peering due to a bad update event. - logger.Error("failed to marshal service endpoints", "error", err) - return nil - } - resp := &pbpeering.ReplicationMessage{ - Payload: &pbpeering.ReplicationMessage_Response_{ - Response: &pbpeering.ReplicationMessage_Response{ - ResourceURL: pbpeering.TypeURLService, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: serviceName, - Operation: pbpeering.ReplicationMessage_Response_UPSERT, - Resource: any, - }, - }, - } - logTraceSend(logger, resp) - if err := stream.Send(resp); err != nil { - status.trackSendError(err.Error()) - return fmt.Errorf("failed to send to stream: %v", err) - } - return nil -} - func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) { return s.streams.streamStatus(peer) } @@ -727,165 +653,6 @@ func (s *Service) ConnectedStreams() map[string]chan struct{} { return s.streams.connectedStreams() } -func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeering.ReplicationMessage { - var rpcErr *pbstatus.Status - if errCode != code.Code_OK || errMsg != "" { - rpcErr = &pbstatus.Status{ - Code: int32(errCode), - Message: errMsg, - } - } - - msg := &pbpeering.ReplicationMessage{ - Payload: &pbpeering.ReplicationMessage_Request_{ - Request: &pbpeering.ReplicationMessage_Request{ - ResourceURL: resourceURL, - Nonce: nonce, - Error: rpcErr, - }, - }, - } - return msg -} - -func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) { - var ( - err error - errCode code.Code - errMsg string - ) - - if resp.ResourceURL != pbpeering.TypeURLService { - errCode = code.Code_INVALID_ARGUMENT - err = fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL) - return makeReply(resp.ResourceURL, resp.Nonce, errCode, err.Error()), err - } - - switch resp.Operation { - case pbpeering.ReplicationMessage_Response_UPSERT: - if resp.Resource == nil { - break - } - err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource) - if err != nil { - errCode = code.Code_INTERNAL - errMsg = err.Error() - } - - case pbpeering.ReplicationMessage_Response_DELETE: - err = handleDelete(resp.ResourceURL, resp.ResourceID) - if err != nil { - errCode = code.Code_INTERNAL - errMsg = err.Error() - } - - default: - errCode = code.Code_INVALID_ARGUMENT - - op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)] - if op == "" { - op = strconv.FormatInt(int64(resp.Operation), 10) - } - errMsg = fmt.Sprintf("unsupported operation: %q", op) - - err = errors.New(errMsg) - } - - return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err -} - -func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error { - csn := &pbservice.IndexedCheckServiceNodes{} - err := ptypes.UnmarshalAny(resource, csn) - if err != nil { - return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - if csn == nil || len(csn.Nodes) == 0 { - return nil - } - - type checkTuple struct { - checkID types.CheckID - serviceID string - nodeID types.NodeID - - acl.EnterpriseMeta - } - - var ( - nodes = make(map[types.NodeID]*structs.Node) - services = make(map[types.NodeID][]*structs.NodeService) - checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck) - ) - - for _, pbinstance := range csn.Nodes { - instance, err := pbservice.CheckServiceNodeToStructs(pbinstance) - if err != nil { - return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - - nodes[instance.Node.ID] = instance.Node - services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service) - - if _, ok := checks[instance.Node.ID]; !ok { - checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck) - } - for _, c := range instance.Checks { - tuple := checkTuple{ - checkID: c.CheckID, - serviceID: c.ServiceID, - nodeID: instance.Node.ID, - EnterpriseMeta: c.EnterpriseMeta, - } - checks[instance.Node.ID][tuple] = c - } - } - - for nodeID, node := range nodes { - // For all nodes, services, and checks we override the peer name and partition to be - // the local partition and local name for the peer. - node.PeerName, node.Partition = peerName, partition - - // First register the node - req := node.ToRegisterRequest() - if err := s.Backend.Apply().CatalogRegister(&req); err != nil { - return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - - // Then register all services on that node - for _, svc := range services[nodeID] { - svc.PeerName = peerName - svc.OverridePartition(partition) - - req.Service = svc - if err := s.Backend.Apply().CatalogRegister(&req); err != nil { - return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - } - req.Service = nil - - // Then register all checks on that node - var chks structs.HealthChecks - for _, c := range checks[nodeID] { - c.PeerName = peerName - c.OverridePartition(partition) - - chks = append(chks, c) - } - - req.Checks = chks - if err := s.Backend.Apply().CatalogRegister(&req); err != nil { - return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - } - return nil -} - -func handleDelete(resourceURL string, resourceID string) error { - // TODO(peering): implement - return nil -} - func logTraceRecv(logger hclog.Logger, pb proto.Message) { logTraceProto(logger, pb, true) } From 3e4a5228821433d5bd927ffea4cc72b490edec49 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Thu, 19 May 2022 14:21:44 -0500 Subject: [PATCH 2/2] peering: replicate discovery chains information to importing peers Treat each exported service as a "discovery chain" and replicate one synthetic CheckServiceNode for each chain and remote mesh gateway. The health will be a flattened generated check of the checks for that mesh gateway node. --- agent/consul/state/config_entry.go | 31 + agent/consul/state/peering.go | 80 ++- agent/consul/state/peering_test.go | 202 ++++-- agent/rpc/peering/service.go | 8 +- agent/rpc/peering/service_test.go | 9 +- agent/rpc/peering/subscription_blocking.go | 108 +++ agent/rpc/peering/subscription_manager.go | 420 ++++++++++-- .../rpc/peering/subscription_manager_test.go | 644 +++++++++++------- agent/rpc/peering/subscription_state.go | 165 +++++ agent/rpc/peering/subscription_state_test.go | 200 ++++++ agent/rpc/peering/subscription_view.go | 18 +- agent/rpc/peering/subscription_view_test.go | 105 +-- agent/structs/peering.go | 27 + lib/maps/maps.go | 12 + lib/maps/maps_test.go | 41 ++ 15 files changed, 1587 insertions(+), 483 deletions(-) create mode 100644 agent/rpc/peering/subscription_blocking.go create mode 100644 agent/rpc/peering/subscription_state.go create mode 100644 agent/rpc/peering/subscription_state_test.go create mode 100644 lib/maps/maps.go create mode 100644 lib/maps/maps_test.go diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index c54f6889c7..4743e32552 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/lib/maps" ) type ConfigEntryLinkIndex struct { @@ -137,6 +138,36 @@ func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string, entMeta *acl return configEntriesByKindTxn(tx, ws, kind, entMeta) } +func listDiscoveryChainNamesTxn(tx ReadTxn, ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []structs.ServiceName, error) { + // Get the index and watch for updates + idx := maxIndexWatchTxn(tx, ws, tableConfigEntries) + + // List all discovery chain top nodes. + seen := make(map[structs.ServiceName]struct{}) + for _, kind := range []string{ + structs.ServiceRouter, + structs.ServiceSplitter, + structs.ServiceResolver, + } { + iter, err := getConfigEntryKindsWithTxn(tx, kind, &entMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) + } + ws.Add(iter.WatchCh()) + + for v := iter.Next(); v != nil; v = iter.Next() { + entry := v.(structs.ConfigEntry) + sn := structs.NewServiceName(entry.GetName(), entry.GetEnterpriseMeta()) + seen[sn] = struct{}{} + } + } + + results := maps.SliceOfKeys(seen) + structs.ServiceList(results).Sort() + + return idx, results, nil +} + func configEntriesByKindTxn(tx ReadTxn, ws memdb.WatchSet, kind string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ConfigEntry, error) { // Get the index and watch for updates idx := maxIndexWatchTxn(tx, ws, tableConfigEntries) diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index 11610ea108..954ad590c1 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib/maps" "github.com/hashicorp/consul/proto/pbpeering" ) @@ -140,7 +141,10 @@ func peeringReadTxn(tx ReadTxn, ws memdb.WatchSet, q Query) (uint64, *pbpeering. func (s *Store) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) { tx := s.db.ReadTxn() defer tx.Abort() + return s.peeringListTxn(ws, tx, entMeta) +} +func (s *Store) peeringListTxn(ws memdb.WatchSet, tx ReadTxn, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) { var ( iter memdb.ResultIterator err error @@ -281,12 +285,16 @@ func (s *Store) PeeringTerminateByID(idx uint64, id string) error { return tx.Commit() } -// ExportedServicesForPeer returns the list of typical and proxy services exported to a peer. -// TODO(peering): What to do about terminating gateways? Sometimes terminating gateways are the appropriate destination -// to dial for an upstream mesh service. However, that information is handled by observing the terminating gateway's -// config entry, which we wouldn't want to replicate. How would client peers know to route through terminating gateways -// when they're not dialing through a remote mesh gateway? -func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error) { +// ExportedServicesForPeer returns the list of typical and proxy services +// exported to a peer. +// +// TODO(peering): What to do about terminating gateways? Sometimes terminating +// gateways are the appropriate destination to dial for an upstream mesh +// service. However, that information is handled by observing the terminating +// gateway's config entry, which we wouldn't want to replicate. How would +// client peers know to route through terminating gateways when they're not +// dialing through a remote mesh gateway? +func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) { tx := s.db.ReadTxn() defer tx.Abort() @@ -295,9 +303,13 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6 return 0, nil, fmt.Errorf("failed to read peering: %w", err) } if peering == nil { - return 0, nil, nil + return 0, &structs.ExportedServiceList{}, nil } + return s.exportedServicesForPeerTxn(ws, tx, peering) +} + +func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering) (uint64, *structs.ExportedServiceList, error) { maxIdx := peering.ModifyIndex entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition) @@ -309,14 +321,28 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6 maxIdx = idx } if raw == nil { - return maxIdx, nil, nil + return maxIdx, &structs.ExportedServiceList{}, nil } + conf, ok := raw.(*structs.ExportedServicesConfigEntry) if !ok { return 0, nil, fmt.Errorf("expected type *structs.ExportedServicesConfigEntry, got %T", raw) } - set := make(map[structs.ServiceName]struct{}) + var ( + normalSet = make(map[structs.ServiceName]struct{}) + discoSet = make(map[structs.ServiceName]struct{}) + ) + + // TODO(peering): filter the disco chain portion of the results to only be + // things reachable over the mesh to avoid replicating some clutter. + // + // At least one of the following should be true for a name for it to + // replicate: + // + // - are a discovery chain by definition (service-router, service-splitter, service-resolver) + // - have an explicit sidecar kind=connect-proxy + // - use connect native mode for _, svc := range conf.Services { svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace) @@ -325,7 +351,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6 for _, consumer := range svc.Consumers { name := structs.NewServiceName(svc.Name, &svcMeta) - if _, ok := set[name]; ok { + if _, ok := normalSet[name]; ok { // Service was covered by a wildcard that was already accounted for continue } @@ -335,43 +361,47 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6 sawPeer = true if svc.Name != structs.WildcardSpecifier { - set[name] = struct{}{} + normalSet[name] = struct{}{} } } // If the target peer is a consumer, and all services in the namespace are exported, query those service names. if sawPeer && svc.Name == structs.WildcardSpecifier { - var typicalServices []*KindServiceName - idx, typicalServices, err = serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta) + idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta) if err != nil { - return 0, nil, fmt.Errorf("failed to get service names: %w", err) + return 0, nil, fmt.Errorf("failed to get typical service names: %w", err) } if idx > maxIdx { maxIdx = idx } for _, s := range typicalServices { - set[s.Service] = struct{}{} + normalSet[s.Service] = struct{}{} } - var proxyServices []*KindServiceName - idx, proxyServices, err = serviceNamesOfKindTxn(tx, ws, structs.ServiceKindConnectProxy, svcMeta) + // list all config entries of kind service-resolver, service-router, service-splitter? + idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, svcMeta) if err != nil { - return 0, nil, fmt.Errorf("failed to get service names: %w", err) + return 0, nil, fmt.Errorf("failed to get discovery chain names: %w", err) } if idx > maxIdx { maxIdx = idx } - for _, s := range proxyServices { - set[s.Service] = struct{}{} + for _, sn := range discoChains { + discoSet[sn] = struct{}{} } } } - var resp []structs.ServiceName - for svc := range set { - resp = append(resp, svc) - } - return maxIdx, resp, nil + normal := maps.SliceOfKeys(normalSet) + disco := maps.SliceOfKeys(discoSet) + + structs.ServiceList(normal).Sort() + structs.ServiceList(disco).Sort() + + return maxIdx, &structs.ExportedServiceList{ + Services: normal, + DiscoChains: disco, + }, nil } // PeeringsForService returns the list of peerings that are associated with the service name provided in the query. diff --git a/agent/consul/state/peering_test.go b/agent/consul/state/peering_test.go index dd974c8e40..9a93111b55 100644 --- a/agent/consul/state/peering_test.go +++ b/agent/consul/state/peering_test.go @@ -630,25 +630,38 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { var lastIdx uint64 lastIdx++ - err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{ Name: "my-peering", - }) - require.NoError(t, err) + })) - q := Query{Value: "my-peering"} - _, p, err := s.PeeringRead(nil, q) + _, p, err := s.PeeringRead(nil, Query{ + Value: "my-peering", + }) require.NoError(t, err) require.NotNil(t, p) id := p.ID + defaultEntMeta := structs.DefaultEnterpriseMetaInDefaultPartition() + ws := memdb.NewWatchSet() + ensureConfigEntry := func(t *testing.T, entry structs.ConfigEntry) { + t.Helper() + require.NoError(t, entry.Normalize()) + require.NoError(t, entry.Validate()) + + lastIdx++ + require.NoError(t, s.EnsureConfigEntry(lastIdx, entry)) + } + testutil.RunStep(t, "no exported services", func(t *testing.T) { - idx, exported, err := s.ExportedServicesForPeer(ws, id) + expect := &structs.ExportedServiceList{} + + idx, got, err := s.ExportedServicesForPeer(ws, id) require.NoError(t, err) require.Equal(t, lastIdx, idx) - require.Empty(t, exported) + require.Equal(t, expect, got) }) testutil.RunStep(t, "config entry with exact service names", func(t *testing.T) { @@ -658,58 +671,57 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { { Name: "mysql", Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-peering", - }, + {PeerName: "my-peering"}, }, }, { Name: "redis", Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-peering", - }, + {PeerName: "my-peering"}, }, }, { Name: "mongo", Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-other-peering", - }, + {PeerName: "my-other-peering"}, }, }, }, } - lastIdx++ - err = s.EnsureConfigEntry(lastIdx, entry) - require.NoError(t, err) + ensureConfigEntry(t, entry) require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() - expect := []structs.ServiceName{ - { - Name: "mysql", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }, - { - Name: "redis", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + expect := &structs.ExportedServiceList{ + Services: []structs.ServiceName{ + { + Name: "mysql", + EnterpriseMeta: *defaultEntMeta, + }, + { + Name: "redis", + EnterpriseMeta: *defaultEntMeta, + }, }, } + idx, got, err := s.ExportedServicesForPeer(ws, id) require.NoError(t, err) require.Equal(t, lastIdx, idx) - require.ElementsMatch(t, expect, got) + require.Equal(t, expect, got) }) testutil.RunStep(t, "config entry with wildcard service name picks up existing service", func(t *testing.T) { lastIdx++ - require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{Node: "foo", Address: "127.0.0.1"})) + require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{ + Node: "foo", Address: "127.0.0.1", + })) lastIdx++ - require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ID: "billing", Service: "billing", Port: 5000})) + require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ + ID: "billing", Service: "billing", Port: 5000, + })) entry := &structs.ExportedServicesConfigEntry{ Name: "default", @@ -717,24 +729,22 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { { Name: "*", Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-peering", - }, + {PeerName: "my-peering"}, }, }, }, } - lastIdx++ - err = s.EnsureConfigEntry(lastIdx, entry) - require.NoError(t, err) + ensureConfigEntry(t, entry) require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() - expect := []structs.ServiceName{ - { - Name: "billing", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + expect := &structs.ExportedServiceList{ + Services: []structs.ServiceName{ + { + Name: "billing", + EnterpriseMeta: *defaultEntMeta, + }, }, } idx, got, err := s.ExportedServicesForPeer(ws, id) @@ -745,69 +755,127 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { testutil.RunStep(t, "config entry with wildcard service names picks up new registrations", func(t *testing.T) { lastIdx++ - require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ID: "payments", Service: "payments", Port: 5000})) + require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ + ID: "payments", Service: "payments", Port: 5000, + })) + // The proxy will be ignored. lastIdx++ - proxy := structs.NodeService{ + require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, ID: "payments-proxy", Service: "payments-proxy", Port: 5000, - } - require.NoError(t, s.EnsureService(lastIdx, "foo", &proxy)) + })) + + // Ensure everything is L7-capable. + ensureConfigEntry(t, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + EnterpriseMeta: *defaultEntMeta, + }) + + ensureConfigEntry(t, &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "router", + EnterpriseMeta: *defaultEntMeta, + }) + + ensureConfigEntry(t, &structs.ServiceSplitterConfigEntry{ + Kind: structs.ServiceSplitter, + Name: "splitter", + EnterpriseMeta: *defaultEntMeta, + Splits: []structs.ServiceSplit{{Weight: 100}}, + }) + + ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "resolver", + EnterpriseMeta: *defaultEntMeta, + }) require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() - expect := []structs.ServiceName{ - { - Name: "billing", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + expect := &structs.ExportedServiceList{ + Services: []structs.ServiceName{ + { + Name: "billing", + EnterpriseMeta: *defaultEntMeta, + }, + { + Name: "payments", + EnterpriseMeta: *defaultEntMeta, + }, + // NOTE: no payments-proxy here }, - { - Name: "payments", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }, - { - Name: "payments-proxy", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + DiscoChains: []structs.ServiceName{ + { + Name: "resolver", + EnterpriseMeta: *defaultEntMeta, + }, + { + Name: "router", + EnterpriseMeta: *defaultEntMeta, + }, + { + Name: "splitter", + EnterpriseMeta: *defaultEntMeta, + }, }, } idx, got, err := s.ExportedServicesForPeer(ws, id) require.NoError(t, err) require.Equal(t, lastIdx, idx) - require.ElementsMatch(t, expect, got) + require.Equal(t, expect, got) }) testutil.RunStep(t, "config entry with wildcard service names picks up service deletions", func(t *testing.T) { lastIdx++ require.NoError(t, s.DeleteService(lastIdx, "foo", "billing", nil, "")) + lastIdx++ + require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceSplitter, "splitter", nil)) + require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() - expect := []structs.ServiceName{ - { - Name: "payments", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + expect := &structs.ExportedServiceList{ + Services: []structs.ServiceName{ + { + Name: "payments", + EnterpriseMeta: *defaultEntMeta, + }, + // NOTE: no payments-proxy here }, - { - Name: "payments-proxy", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + DiscoChains: []structs.ServiceName{ + { + Name: "resolver", + EnterpriseMeta: *defaultEntMeta, + }, + { + Name: "router", + EnterpriseMeta: *defaultEntMeta, + }, }, } idx, got, err := s.ExportedServicesForPeer(ws, id) require.NoError(t, err) require.Equal(t, lastIdx, idx) - require.ElementsMatch(t, expect, got) + require.Equal(t, expect, got) }) testutil.RunStep(t, "deleting the config entry clears exported services", func(t *testing.T) { - require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", structs.DefaultEnterpriseMetaInDefaultPartition())) - idx, exported, err := s.ExportedServicesForPeer(ws, id) + expect := &structs.ExportedServiceList{} + + require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", defaultEntMeta)) + idx, got, err := s.ExportedServicesForPeer(ws, id) require.NoError(t, err) require.Equal(t, lastIdx, idx) - require.Empty(t, exported) + require.Equal(t, expect, got) }) } diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index a3955bb30c..f6a0532a55 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -101,8 +101,9 @@ type Store interface { PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error) - ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error) + ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) + ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) AbandonCh() <-chan struct{} } @@ -503,7 +504,7 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { defer s.streams.disconnected(req.LocalID) mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend) - subCh := mgr.subscribe(req.Stream.Context(), req.LocalID) + subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.Partition) sub := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ @@ -635,7 +636,8 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { if err := pushServiceResponse(logger, req.Stream, status, update); err != nil { return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err) } - + case strings.HasPrefix(update.CorrelationID, subMeshGateway): + //TODO(Peering): figure out how to sync this separately default: logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID) continue diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 3d86440c96..c661be9259 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -1005,19 +1005,12 @@ func Test_StreamHandler_UpsertServices(t *testing.T) { }, } for _, tc := range tt { - runStep(t, tc.name, func(t *testing.T) { + testutil.RunStep(t, tc.name, func(t *testing.T) { run(t, tc) }) } } -func runStep(t *testing.T, name string, fn func(t *testing.T)) { - t.Helper() - if !t.Run(name, fn) { - t.FailNow() - } -} - // newTestServer is copied from partition/service_test.go, with the addition of certs/cas. // TODO(peering): these are endpoint tests and should live in the agent/consul // package. Instead, these can be written around a mock client (see testing.go) diff --git a/agent/rpc/peering/subscription_blocking.go b/agent/rpc/peering/subscription_blocking.go new file mode 100644 index 0000000000..0a8a07d9b4 --- /dev/null +++ b/agent/rpc/peering/subscription_blocking.go @@ -0,0 +1,108 @@ +package peering + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib/retry" + "github.com/hashicorp/consul/proto/pbservice" +) + +// This file contains direct state store functions that need additional +// management to have them emit events. Ideally these would go through +// streaming machinery instead to be cheaper. + +func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Context, state *subscriptionState, peerID string) { + // syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend + // match the list of services exported to the peer. + m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) { + // Get exported services for peer id + _, list, err := store.ExportedServicesForPeer(ws, peerID) + if err != nil { + return nil, fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err) + } + + return list, nil + }, subExportedServiceList, state.updateCh) +} + +// TODO: add a new streaming subscription type to list-by-kind-and-partition since we're getting evictions +func (m *subscriptionManager) notifyMeshGatewaysForPartition(ctx context.Context, state *subscriptionState, partition string) { + m.syncViaBlockingQuery(ctx, "mesh-gateways", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) { + // Fetch our current list of all mesh gateways. + entMeta := structs.DefaultEnterpriseMetaInPartition(partition) + idx, nodes, err := store.ServiceDump(ws, structs.ServiceKindMeshGateway, true, entMeta, structs.DefaultPeerKeyword) + if err != nil { + return nil, fmt.Errorf("failed to watch mesh gateways services for partition %q: %w", partition, err) + } + if idx == 0 { + idx = 1 + } + + // convert back to a protobuf flavor + result := &pbservice.IndexedCheckServiceNodes{ + Index: idx, + Nodes: make([]*pbservice.CheckServiceNode, len(nodes)), + } + for i, csn := range nodes { + result.Nodes[i] = pbservice.NewCheckServiceNodeFromStructs(&csn) + } + + return result, nil + }, subMeshGateway+partition, state.updateCh) +} + +func (m *subscriptionManager) syncViaBlockingQuery( + ctx context.Context, + queryType string, + queryFn func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error), + correlationID string, + updateCh chan<- cache.UpdateEvent, +) { + waiter := &retry.Waiter{ + MinFailures: 1, + Factor: 500 * time.Millisecond, + MaxWait: 60 * time.Second, + Jitter: retry.NewJitter(100), + } + + logger := m.logger + if queryType != "" { + logger = m.logger.With("queryType", queryType) + } + + store := m.backend.Store() + + for { + ws := memdb.NewWatchSet() + ws.Add(store.AbandonCh()) + ws.Add(ctx.Done()) + + if result, err := queryFn(ctx, store, ws); err != nil { + logger.Error("failed to sync from query", "error", err) + } else { + // Block for any changes to the state store. + updateCh <- cache.UpdateEvent{ + CorrelationID: correlationID, + Result: result, + } + ws.WatchCtx(ctx) + } + + if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + logger.Error("failed to wait before re-trying sync", "error", err) + } + + select { + case <-ctx.Done(): + return + default: + } + } +} diff --git a/agent/rpc/peering/subscription_manager.go b/agent/rpc/peering/subscription_manager.go index 1f59fa4a4f..d198635fb9 100644 --- a/agent/rpc/peering/subscription_manager.go +++ b/agent/rpc/peering/subscription_manager.go @@ -2,17 +2,18 @@ package peering import ( "context" - "errors" "fmt" - "time" + "strings" + "github.com/golang/protobuf/proto" "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/submatview" - "github.com/hashicorp/consul/lib/retry" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbservice" ) @@ -31,9 +32,6 @@ type subscriptionManager struct { logger hclog.Logger viewStore MaterializedViewStore backend SubscriptionBackend - - // watchedServices is a map of exported services to a cancel function for their subscription notifier. - watchedServices map[structs.ServiceName]context.CancelFunc } // TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering. @@ -43,61 +41,187 @@ func newSubscriptionManager(ctx context.Context, logger hclog.Logger, backend Su go store.Run(ctx) return &subscriptionManager{ - logger: logger, - viewStore: store, - backend: backend, - watchedServices: make(map[structs.ServiceName]context.CancelFunc), + logger: logger, + viewStore: store, + backend: backend, } } // subscribe returns a channel that will contain updates to exported service instances for a given peer. -func (m *subscriptionManager) subscribe(ctx context.Context, peerID string) <-chan cache.UpdateEvent { - updateCh := make(chan cache.UpdateEvent, 1) - go m.syncSubscriptions(ctx, peerID, updateCh) +func (m *subscriptionManager) subscribe(ctx context.Context, peerID, partition string) <-chan cache.UpdateEvent { + var ( + updateCh = make(chan cache.UpdateEvent, 1) + publicUpdateCh = make(chan cache.UpdateEvent, 1) + ) - return updateCh + state := newSubscriptionState(partition) + state.publicUpdateCh = publicUpdateCh + state.updateCh = updateCh + + // Wrap our bare state store queries in goroutines that emit events. + go m.notifyExportedServicesForPeerID(ctx, state, peerID) + go m.notifyMeshGatewaysForPartition(ctx, state, state.partition) + + // This goroutine is the only one allowed to manipulate protected + // subscriptionManager fields. + go m.handleEvents(ctx, state, updateCh) + + return publicUpdateCh } -func (m *subscriptionManager) syncSubscriptions(ctx context.Context, peerID string, updateCh chan<- cache.UpdateEvent) { - waiter := &retry.Waiter{ - MinFailures: 1, - Factor: 500 * time.Millisecond, - MaxWait: 60 * time.Second, - Jitter: retry.NewJitter(100), - } - +func (m *subscriptionManager) handleEvents(ctx context.Context, state *subscriptionState, updateCh <-chan cache.UpdateEvent) { for { - if err := m.syncSubscriptionsAndBlock(ctx, peerID, updateCh); err != nil { - m.logger.Error("failed to sync subscriptions", "error", err) - } - - if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { - m.logger.Error("failed to wait before re-trying sync", "error", err) - } + // TODO(peering): exponential backoff select { case <-ctx.Done(): return - default: + case update := <-updateCh: + if err := m.handleEvent(ctx, state, update); err != nil { + m.logger.Error("Failed to handle update from watch", + "id", update.CorrelationID, "error", err, + ) + continue + } } } } -// syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend -// match the list of services exported to the peer. -func (m *subscriptionManager) syncSubscriptionsAndBlock(ctx context.Context, peerID string, updateCh chan<- cache.UpdateEvent) error { - store := m.backend.Store() - - ws := memdb.NewWatchSet() - ws.Add(store.AbandonCh()) - ws.Add(ctx.Done()) - - // Get exported services for peer id - _, services, err := store.ExportedServicesForPeer(ws, peerID) - if err != nil { - return fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err) +func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscriptionState, u cache.UpdateEvent) error { + if u.Err != nil { + return fmt.Errorf("received error event: %w", u.Err) } + // TODO(peering): on initial stream setup, transmit the list of exported + // services for use in differential DELETE/UPSERT. Akin to streaming's snapshot start/end. + switch { + case u.CorrelationID == subExportedServiceList: + // Everything starts with the exported service list coming from + // our state store watchset loop. + evt, ok := u.Result.(*structs.ExportedServiceList) + if !ok { + return fmt.Errorf("invalid type for response: %T", u.Result) + } + + state.exportList = evt + + pending := &pendingPayload{} + m.syncNormalServices(ctx, state, pending, evt.Services) + m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains()) + state.sendPendingEvents(ctx, m.logger, pending) + + // cleanup event versions too + state.cleanupEventVersions(m.logger) + + case strings.HasPrefix(u.CorrelationID, subExportedService): + csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes) + if !ok { + return fmt.Errorf("invalid type for response: %T", u.Result) + } + + // TODO(peering): is it safe to edit these protobufs in place? + + // Clear this raft index before exporting. + csn.Index = 0 + + // Ensure that connect things are scrubbed so we don't mix-and-match + // with the synthetic entries that point to mesh gateways. + filterConnectReferences(csn) + + // Flatten health checks + for _, instance := range csn.Nodes { + instance.Checks = flattenChecks( + instance.Node.Node, + instance.Service.ID, + instance.Service.Service, + instance.Service.EnterpriseMeta, + instance.Checks, + ) + } + + id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService) + + // Just ferry this one directly along to the destination. + pending := &pendingPayload{} + if err := pending.Add(id, u.CorrelationID, csn); err != nil { + return err + } + state.sendPendingEvents(ctx, m.logger, pending) + + case strings.HasPrefix(u.CorrelationID, subMeshGateway): + csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes) + if !ok { + return fmt.Errorf("invalid type for response: %T", u.Result) + } + + partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway) + + if !acl.EqualPartitions(partition, state.partition) { + return nil // ignore event + } + + // Clear this raft index before exporting. + csn.Index = 0 + + state.meshGateway = csn + + pending := &pendingPayload{} + + // Directly replicate information about our mesh gateways to the consuming side. + // TODO(peering): should we scrub anything before replicating this? + if err := pending.Add(meshGatewayPayloadID, u.CorrelationID, csn); err != nil { + return err + } + + if state.exportList != nil { + // Trigger public events for all synthetic discovery chain replies. + for chainName := range state.connectServices { + m.emitEventForDiscoveryChain(ctx, state, pending, chainName) + } + } + + // TODO(peering): should we ship this down verbatim to the consumer? + state.sendPendingEvents(ctx, m.logger, pending) + + default: + return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID) + } + return nil +} + +func filterConnectReferences(orig *pbservice.IndexedCheckServiceNodes) { + newNodes := make([]*pbservice.CheckServiceNode, 0, len(orig.Nodes)) + for i := range orig.Nodes { + csn := orig.Nodes[i] + + if csn.Service.Kind != string(structs.ServiceKindTypical) { + continue // skip non-typical services + } + + if strings.HasSuffix(csn.Service.Service, syntheticProxyNameSuffix) { + // Skip things that might LOOK like a proxy so we don't get a + // collision with the ones we generate. + continue + } + + // Remove connect things like native mode. + if csn.Service.Connect != nil || csn.Service.Proxy != nil { + csn = proto.Clone(csn).(*pbservice.CheckServiceNode) + csn.Service.Connect = nil + csn.Service.Proxy = nil + } + + newNodes = append(newNodes, csn) + } + orig.Nodes = newNodes +} + +func (m *subscriptionManager) syncNormalServices( + ctx context.Context, + state *subscriptionState, + pending *pendingPayload, + services []structs.ServiceName, +) { // seen contains the set of exported service names and is used to reconcile the list of watched services. seen := make(map[structs.ServiceName]struct{}) @@ -105,45 +229,223 @@ func (m *subscriptionManager) syncSubscriptionsAndBlock(ctx context.Context, pee for _, svc := range services { seen[svc] = struct{}{} - if _, ok := m.watchedServices[svc]; ok { + if _, ok := state.watchedServices[svc]; ok { // Exported service is already being watched, nothing to do. continue } notifyCtx, cancel := context.WithCancel(ctx) - m.watchedServices[svc] = cancel - - if err := m.Notify(notifyCtx, svc, updateCh); err != nil { + if err := m.NotifyStandardService(notifyCtx, svc, state.updateCh); err != nil { + cancel() m.logger.Error("failed to subscribe to service", "service", svc.String()) continue } + + state.watchedServices[svc] = cancel } // For every subscription without an exported service, call the associated cancel fn. - for svc, cancel := range m.watchedServices { + for svc, cancel := range state.watchedServices { if _, ok := seen[svc]; !ok { cancel() + delete(state.watchedServices, svc) + // Send an empty event to the stream handler to trigger sending a DELETE message. // Cancelling the subscription context above is necessary, but does not yield a useful signal on its own. - updateCh <- cache.UpdateEvent{ - CorrelationID: subExportedService + svc.String(), - Result: &pbservice.IndexedCheckServiceNodes{}, + err := pending.Add( + servicePayloadIDPrefix+svc.String(), + subExportedService+svc.String(), + &pbservice.IndexedCheckServiceNodes{}, + ) + if err != nil { + m.logger.Error("failed to send event for service", "service", svc.String(), "error", err) + continue } } } +} - // Block for any changes to the state store. - ws.WatchCtx(ctx) - return nil +func (m *subscriptionManager) syncDiscoveryChains( + ctx context.Context, + state *subscriptionState, + pending *pendingPayload, + chainsByName map[structs.ServiceName]struct{}, +) { + // if it was newly added, then try to emit an UPDATE event + for chainName := range chainsByName { + if _, ok := state.connectServices[chainName]; ok { + continue + } + + state.connectServices[chainName] = struct{}{} + + m.emitEventForDiscoveryChain(ctx, state, pending, chainName) + } + + // if it was dropped, try to emit an DELETE event + for chainName := range state.connectServices { + if _, ok := chainsByName[chainName]; ok { + continue + } + + delete(state.connectServices, chainName) + + if state.meshGateway != nil { + // Only need to clean this up if we know we may have ever sent it in the first place. + proxyName := generateProxyNameForDiscoveryChain(chainName) + err := pending.Add( + discoveryChainPayloadIDPrefix+chainName.String(), + subExportedService+proxyName.String(), + &pbservice.IndexedCheckServiceNodes{}, + ) + if err != nil { + m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err) + continue + } + } + } +} + +func (m *subscriptionManager) emitEventForDiscoveryChain( + ctx context.Context, + state *subscriptionState, + pending *pendingPayload, + chainName structs.ServiceName, +) { + if _, ok := state.connectServices[chainName]; !ok { + return // not found + } + + if state.exportList == nil || state.meshGateway == nil { + return // skip because we don't have the data to do it yet + } + + // Emit event with fake data + proxyName := generateProxyNameForDiscoveryChain(chainName) + + err := pending.Add( + discoveryChainPayloadIDPrefix+chainName.String(), + subExportedService+proxyName.String(), + createDiscoChainHealth( + chainName, + state.meshGateway, + ), + ) + if err != nil { + m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err) + } +} + +func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckServiceNodes) *pbservice.IndexedCheckServiceNodes { + fakeProxyName := sn.Name + syntheticProxyNameSuffix + + newNodes := make([]*pbservice.CheckServiceNode, 0, len(pb.Nodes)) + for i := range pb.Nodes { + gwNode := pb.Nodes[i].Node + gwService := pb.Nodes[i].Service + gwChecks := pb.Nodes[i].Checks + + pbEntMeta := pbcommon.NewEnterpriseMetaFromStructs(sn.EnterpriseMeta) + + fakeProxyID := fakeProxyName + if gwService.ID != "" { + // This is only going to be relevant if multiple mesh gateways are + // on the same exporting node. + fakeProxyID = fmt.Sprintf("%s-instance-%d", fakeProxyName, i) + } + + csn := &pbservice.CheckServiceNode{ + Node: gwNode, + Service: &pbservice.NodeService{ + Kind: string(structs.ServiceKindConnectProxy), + Service: fakeProxyName, + ID: fakeProxyID, + EnterpriseMeta: pbEntMeta, + PeerName: structs.DefaultPeerKeyword, + Proxy: &pbservice.ConnectProxyConfig{ + DestinationServiceName: sn.Name, + DestinationServiceID: sn.Name, + }, + // direct + Address: gwService.Address, + TaggedAddresses: gwService.TaggedAddresses, + Port: gwService.Port, + SocketPath: gwService.SocketPath, + Weights: gwService.Weights, + }, + Checks: flattenChecks(gwNode.Node, fakeProxyID, fakeProxyName, pbEntMeta, gwChecks), + } + newNodes = append(newNodes, csn) + } + + return &pbservice.IndexedCheckServiceNodes{ + Index: 0, + Nodes: newNodes, + } +} + +func flattenChecks( + nodeName string, + serviceID string, + serviceName string, + entMeta *pbcommon.EnterpriseMeta, + checks []*pbservice.HealthCheck, +) []*pbservice.HealthCheck { + if len(checks) == 0 { + return nil + } + + healthStatus := api.HealthPassing + for _, chk := range checks { + if chk.Status != api.HealthPassing { + healthStatus = chk.Status + } + } + + if serviceID == "" { + serviceID = serviceName + } + + return []*pbservice.HealthCheck{ + { + CheckID: serviceID + ":overall-check", + Name: "overall-check", + Status: healthStatus, + Node: nodeName, + ServiceID: serviceID, + ServiceName: serviceName, + EnterpriseMeta: entMeta, + PeerName: structs.DefaultPeerKeyword, + }, + } } const ( - subExportedService = "exported-service:" + subExportedServiceList = "exported-service-list" + subExportedService = "exported-service:" + subMeshGateway = "mesh-gateway:" ) -// Notify the given channel when there are updates to the requested service. -func (m *subscriptionManager) Notify(ctx context.Context, svc structs.ServiceName, updateCh chan<- cache.UpdateEvent) error { - sr := newExportedServiceRequest(m.logger, svc, m.backend) +// NotifyStandardService will notify the given channel when there are updates +// to the requested service of the same name in the catalog. +func (m *subscriptionManager) NotifyStandardService( + ctx context.Context, + svc structs.ServiceName, + updateCh chan<- cache.UpdateEvent, +) error { + sr := newExportedStandardServiceRequest(m.logger, svc, m.backend) return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh) } + +// syntheticProxyNameSuffix is the suffix to add to synthetic proxies we +// replicate to route traffic to an exported discovery chain through the mesh +// gateways. +// +// This name was chosen to match existing "sidecar service" generation logic +// and similar logic in the Service Identity synthetic ACL policies. +const syntheticProxyNameSuffix = "-sidecar-proxy" + +func generateProxyNameForDiscoveryChain(sn structs.ServiceName) structs.ServiceName { + return structs.NewServiceName(sn.Name+syntheticProxyNameSuffix, &sn.EnterpriseMeta) +} diff --git a/agent/rpc/peering/subscription_manager_test.go b/agent/rpc/peering/subscription_manager_test.go index 6204cb161a..b1530fc70c 100644 --- a/agent/rpc/peering/subscription_manager_test.go +++ b/agent/rpc/peering/subscription_manager_test.go @@ -2,81 +2,77 @@ package peering import ( "context" + "sort" "testing" "time" - "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/sdk/testutil/retry" ) -type testSubscriptionBackend struct { - state.EventPublisher - store *state.Store -} - -func (b *testSubscriptionBackend) Store() Store { - return b.store -} - func TestSubscriptionManager_RegisterDeregister(t *testing.T) { - publisher := stream.NewEventPublisher(10 * time.Second) - store := newStateStore(t, publisher) + backend := newTestSubscriptionBackend(t) + // initialCatalogIdx := backend.lastIdx - backend := testSubscriptionBackend{ - EventPublisher: publisher, - store: store, - } - - ctx := context.Background() - mgr := newSubscriptionManager(ctx, hclog.New(nil), &backend) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Create a peering - var lastIdx uint64 = 1 - err := store.PeeringWrite(lastIdx, &pbpeering.Peering{ - Name: "my-peering", + _, id := backend.ensurePeering(t, "my-peering") + partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() + + mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend) + subCh := mgr.subscribe(ctx, id, partition) + + var ( + gatewayCorrID = subMeshGateway + partition + + mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String() + + mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String() + ) + + // Expect just the empty mesh gateway event to replicate. + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, gatewayCorrID, 0) }) - require.NoError(t, err) - _, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"}) - require.NoError(t, err) - require.NotNil(t, p) - - id := p.ID - - subCh := mgr.subscribe(ctx, id) - - entry := &structs.ExportedServicesConfigEntry{ - Name: "default", - Services: []structs.ExportedService{ - { - Name: "mysql", - Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-peering", + testutil.RunStep(t, "initial export syncs empty instance lists", func(t *testing.T) { + backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "mysql", + Consumers: []structs.ServiceConsumer{ + {PeerName: "my-peering"}, + }, + }, + { + Name: "mongo", + Consumers: []structs.ServiceConsumer{ + {PeerName: "my-other-peering"}, }, }, }, - { - Name: "mongo", - Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-other-peering", - }, - }, + }) + + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlCorrID, 0) }, - }, - } - lastIdx++ - err = store.EnsureConfigEntry(lastIdx, entry) - require.NoError(t, err) + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlProxyCorrID, 0) + }, + ) + }) mysql1 := &structs.CheckServiceNode{ Node: &structs.Node{Node: "foo", Address: "10.0.0.1"}, @@ -87,34 +83,40 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { } testutil.RunStep(t, "registering exported service instance yields update", func(t *testing.T) { + backend.ensureNode(t, mysql1.Node) + backend.ensureService(t, "foo", mysql1.Service) - lastIdx++ - require.NoError(t, store.EnsureNode(lastIdx, mysql1.Node)) + // We get one update for the service + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) - lastIdx++ - require.NoError(t, store.EnsureService(lastIdx, "foo", mysql1.Service)) + require.Len(t, res.Nodes, 1) + node := res.Nodes[0] + require.NotNil(t, node.Node) + require.Equal(t, "foo", node.Node.Node) + require.NotNil(t, node.Service) + require.Equal(t, "mysql-1", node.Service.ID) + require.Len(t, node.Checks, 0) + }) - lastIdx++ - require.NoError(t, store.EnsureCheck(lastIdx, mysql1.Checks[0])) + backend.ensureCheck(t, mysql1.Checks[0]) - // Receive in a retry loop so that eventually we converge onto the expected CheckServiceNode. - retry.Run(t, func(r *retry.R) { - select { - case update := <-subCh: - nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) - require.True(r, ok) - require.Equal(r, uint64(5), nodes.Index) + // and one for the check + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) - require.Len(r, nodes.Nodes, 1) - require.Equal(r, "foo", nodes.Nodes[0].Node.Node) - require.Equal(r, "mysql-1", nodes.Nodes[0].Service.ID) - - require.Len(r, nodes.Nodes[0].Checks, 1) - require.Equal(r, "mysql-check", nodes.Nodes[0].Checks[0].CheckID) - - default: - r.Fatalf("invalid update") - } + require.Len(t, res.Nodes, 1) + node := res.Nodes[0] + require.NotNil(t, node.Node) + require.Equal(t, "foo", node.Node.Node) + require.NotNil(t, node.Service) + require.Equal(t, "mysql-1", node.Service.ID) + require.Len(t, node.Checks, 1) + require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID) }) }) @@ -127,237 +129,409 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { } testutil.RunStep(t, "additional instances are returned when registered", func(t *testing.T) { - lastIdx++ - require.NoError(t, store.EnsureNode(lastIdx, mysql2.Node)) + backend.ensureNode(t, mysql2.Node) + backend.ensureService(t, "bar", mysql2.Service) - lastIdx++ - require.NoError(t, store.EnsureService(lastIdx, "bar", mysql2.Service)) + // We get one update for the service + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) - lastIdx++ - require.NoError(t, store.EnsureCheck(lastIdx, mysql2.Checks[0])) + require.Len(t, res.Nodes, 2) + { + node := res.Nodes[0] + require.NotNil(t, node.Node) + require.Equal(t, "bar", node.Node.Node) + require.NotNil(t, node.Service) + require.Equal(t, "mysql-2", node.Service.ID) + require.Len(t, node.Checks, 0) + } + { + node := res.Nodes[1] + require.NotNil(t, node.Node) + require.Equal(t, "foo", node.Node.Node) + require.NotNil(t, node.Service) + require.Len(t, node.Checks, 1) + require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID) + } + }) - retry.Run(t, func(r *retry.R) { - select { - case update := <-subCh: - nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) - require.True(r, ok) - require.Equal(r, uint64(8), nodes.Index) + backend.ensureCheck(t, mysql2.Checks[0]) - require.Len(r, nodes.Nodes, 2) - require.Equal(r, "bar", nodes.Nodes[0].Node.Node) - require.Equal(r, "mysql-2", nodes.Nodes[0].Service.ID) + // and one for the check + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) - require.Len(r, nodes.Nodes[0].Checks, 1) - require.Equal(r, "mysql-2-check", nodes.Nodes[0].Checks[0].CheckID) - - require.Equal(r, "foo", nodes.Nodes[1].Node.Node) - require.Equal(r, "mysql-1", nodes.Nodes[1].Service.ID) - - require.Len(r, nodes.Nodes[1].Checks, 1) - require.Equal(r, "mysql-check", nodes.Nodes[1].Checks[0].CheckID) - - default: - r.Fatalf("invalid update") + require.Len(t, res.Nodes, 2) + { + node := res.Nodes[0] + require.NotNil(t, node.Node) + require.Equal(t, "bar", node.Node.Node) + require.NotNil(t, node.Service) + require.Equal(t, "mysql-2", node.Service.ID) + require.Len(t, node.Checks, 1) + require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID) + } + { + node := res.Nodes[1] + require.NotNil(t, node.Node) + require.Equal(t, "foo", node.Node.Node) + require.NotNil(t, node.Service) + require.Len(t, node.Checks, 1) + require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID) } }) }) + mongo := &structs.CheckServiceNode{ + Node: &structs.Node{Node: "zip", Address: "10.0.0.3"}, + Service: &structs.NodeService{ID: "mongo", Service: "mongo", Port: 5000}, + Checks: structs.HealthChecks{ + &structs.HealthCheck{CheckID: "mongo-check", ServiceID: "mongo", Node: "zip"}, + }, + } + testutil.RunStep(t, "no updates are received for services not exported to my-peering", func(t *testing.T) { - mongo := &structs.CheckServiceNode{ - Node: &structs.Node{Node: "zip", Address: "10.0.0.3"}, - Service: &structs.NodeService{ID: "mongo", Service: "mongo", Port: 5000}, - Checks: structs.HealthChecks{ - &structs.HealthCheck{CheckID: "mongo-check", ServiceID: "mongo", Node: "zip"}, - }, - } + backend.ensureNode(t, mongo.Node) + backend.ensureService(t, "zip", mongo.Service) + backend.ensureCheck(t, mongo.Checks[0]) - lastIdx++ - require.NoError(t, store.EnsureNode(lastIdx, mongo.Node)) - - lastIdx++ - require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service)) - - lastIdx++ - require.NoError(t, store.EnsureCheck(lastIdx, mongo.Checks[0])) - - // Receive from subCh times out. The retry in the last step already consumed all the mysql events. - select { - case update := <-subCh: - nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) - - if ok && len(nodes.Nodes) > 0 && nodes.Nodes[0].Node.Node == "zip" { - t.Fatalf("received update for mongo node zip") - } - - case <-time.After(100 * time.Millisecond): - // Expect this to fire - } + // Receive from subCh times out. + expectEvents(t, subCh) }) testutil.RunStep(t, "deregister an instance and it gets removed from the output", func(t *testing.T) { - lastIdx++ - require.NoError(t, store.DeleteService(lastIdx, "foo", mysql1.Service.ID, nil, "")) + backend.deleteService(t, "foo", mysql1.Service.ID) - select { - case update := <-subCh: - nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) - require.True(t, ok) - require.Equal(t, uint64(12), nodes.Index) + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) - require.Len(t, nodes.Nodes, 1) - require.Equal(t, "bar", nodes.Nodes[0].Node.Node) - require.Equal(t, "mysql-2", nodes.Nodes[0].Service.ID) + require.Len(t, res.Nodes, 1) - require.Len(t, nodes.Nodes[0].Checks, 1) - require.Equal(t, "mysql-2-check", nodes.Nodes[0].Checks[0].CheckID) - - case <-time.After(100 * time.Millisecond): - t.Fatalf("timed out waiting for update") - } + node := res.Nodes[0] + require.NotNil(t, node.Node) + require.Equal(t, "bar", node.Node.Node) + require.NotNil(t, node.Service) + require.Equal(t, "mysql-2", node.Service.ID) + require.Len(t, node.Checks, 1) + require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID) + }) }) testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) { - lastIdx++ - require.NoError(t, store.DeleteService(lastIdx, "bar", mysql2.Service.ID, nil, "")) + backend.deleteService(t, "bar", mysql2.Service.ID) - select { - case update := <-subCh: - nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) - require.True(t, ok) - require.Equal(t, uint64(13), nodes.Index) - require.Len(t, nodes.Nodes, 0) + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) - case <-time.After(100 * time.Millisecond): - t.Fatalf("timed out waiting for update") - } + require.Len(t, res.Nodes, 0) + }) }) } func TestSubscriptionManager_InitialSnapshot(t *testing.T) { - publisher := stream.NewEventPublisher(10 * time.Second) - store := newStateStore(t, publisher) + backend := newTestSubscriptionBackend(t) + // initialCatalogIdx := backend.lastIdx - backend := testSubscriptionBackend{ - EventPublisher: publisher, - store: store, - } - - ctx := context.Background() - mgr := newSubscriptionManager(ctx, hclog.New(nil), &backend) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Create a peering - var lastIdx uint64 = 1 - err := store.PeeringWrite(lastIdx, &pbpeering.Peering{ - Name: "my-peering", - }) - require.NoError(t, err) + _, id := backend.ensurePeering(t, "my-peering") + partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() - _, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"}) - require.NoError(t, err) - require.NotNil(t, p) - - id := p.ID - - subCh := mgr.subscribe(ctx, id) + mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend) + subCh := mgr.subscribe(ctx, id, partition) // Register two services that are not yet exported mysql := &structs.CheckServiceNode{ Node: &structs.Node{Node: "foo", Address: "10.0.0.1"}, Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000}, } - - lastIdx++ - require.NoError(t, store.EnsureNode(lastIdx, mysql.Node)) - - lastIdx++ - require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service)) + backend.ensureNode(t, mysql.Node) + backend.ensureService(t, "foo", mysql.Service) mongo := &structs.CheckServiceNode{ Node: &structs.Node{Node: "zip", Address: "10.0.0.3"}, Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000}, } + backend.ensureNode(t, mongo.Node) + backend.ensureService(t, "zip", mongo.Service) - lastIdx++ - require.NoError(t, store.EnsureNode(lastIdx, mongo.Node)) + var ( + gatewayCorrID = subMeshGateway + partition - lastIdx++ - require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service)) + mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String() + mongoCorrID = subExportedService + structs.NewServiceName("mongo", nil).String() + chainCorrID = subExportedService + structs.NewServiceName("chain", nil).String() - // No updates should be received, because neither service is exported. - select { - case update := <-subCh: - nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) + mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String() + mongoProxyCorrID = subExportedService + structs.NewServiceName("mongo-sidecar-proxy", nil).String() + chainProxyCorrID = subExportedService + structs.NewServiceName("chain-sidecar-proxy", nil).String() + ) - if ok && len(nodes.Nodes) > 0 { - t.Fatalf("received unexpected update") - } - - case <-time.After(100 * time.Millisecond): - // Expect this to fire - } + // Expect just the empty mesh gateway event to replicate. + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, gatewayCorrID, 0) + }) + // At this point in time we'll have a mesh-gateway notification with no + // content stored and handled. testutil.RunStep(t, "exporting the two services yields an update for both", func(t *testing.T) { - entry := &structs.ExportedServicesConfigEntry{ + backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{ Name: "default", Services: []structs.ExportedService{ { Name: "mysql", Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-peering", - }, + {PeerName: "my-peering"}, }, }, { Name: "mongo", Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-peering", - }, + {PeerName: "my-peering"}, + }, + }, + { + Name: "chain", + Consumers: []structs.ServiceConsumer{ + {PeerName: "my-peering"}, }, }, }, - } - lastIdx++ - err = store.EnsureConfigEntry(lastIdx, entry) - require.NoError(t, err) - - var ( - sawMySQL bool - sawMongo bool - ) - - retry.Run(t, func(r *retry.R) { - select { - case update := <-subCh: - nodes, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) - require.True(r, ok) - require.Len(r, nodes.Nodes, 1) - - switch nodes.Nodes[0].Service.Service { - case "mongo": - sawMongo = true - case "mysql": - sawMySQL = true - } - if !sawMySQL || !sawMongo { - r.Fatalf("missing an update") - } - default: - r.Fatalf("invalid update") - } }) + + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, chainCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, chainProxyCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical)) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mongoProxyCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical)) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlProxyCorrID, 0) + }, + ) + }) + + testutil.RunStep(t, "registering a mesh gateway triggers connect replies", func(t *testing.T) { + gateway := &structs.CheckServiceNode{ + Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"}, + Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443}, + // TODO: checks + } + backend.ensureNode(t, gateway.Node) + backend.ensureService(t, "mgw", gateway.Service) + + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, chainProxyCorrID, 1, "chain-sidecar-proxy", string(structs.ServiceKindConnectProxy)) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mongoProxyCorrID, 1, "mongo-sidecar-proxy", string(structs.ServiceKindConnectProxy)) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlProxyCorrID, 1, "mysql-sidecar-proxy", string(structs.ServiceKindConnectProxy)) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, gatewayCorrID, 1, "gateway", string(structs.ServiceKindMeshGateway)) + }, + ) }) } +type testSubscriptionBackend struct { + state.EventPublisher + store *state.Store + + lastIdx uint64 +} + +func newTestSubscriptionBackend(t *testing.T) *testSubscriptionBackend { + publisher := stream.NewEventPublisher(10 * time.Second) + store := newStateStore(t, publisher) + + backend := &testSubscriptionBackend{ + EventPublisher: publisher, + store: store, + } + + // Create some placeholder data to ensure raft index > 0 + // + // TODO(peering): is there some extremely subtle max-index table reading bug in play? + placeholder := &structs.CheckServiceNode{ + Node: &structs.Node{Node: "placeholder", Address: "10.0.0.1"}, + Service: &structs.NodeService{ID: "placeholder-1", Service: "placeholder", Port: 5000}, + } + backend.ensureNode(t, placeholder.Node) + backend.ensureService(t, "placeholder", placeholder.Service) + + return backend +} + +func (b *testSubscriptionBackend) Store() Store { + return b.store +} + +func (b *testSubscriptionBackend) ensurePeering(t *testing.T, name string) (uint64, string) { + b.lastIdx++ + return b.lastIdx, setupTestPeering(t, b.store, name, b.lastIdx) +} + +func (b *testSubscriptionBackend) ensureConfigEntry(t *testing.T, entry structs.ConfigEntry) uint64 { + require.NoError(t, entry.Normalize()) + require.NoError(t, entry.Validate()) + + b.lastIdx++ + require.NoError(t, b.store.EnsureConfigEntry(b.lastIdx, entry)) + return b.lastIdx +} + +func (b *testSubscriptionBackend) ensureNode(t *testing.T, node *structs.Node) uint64 { + b.lastIdx++ + require.NoError(t, b.store.EnsureNode(b.lastIdx, node)) + return b.lastIdx +} + +func (b *testSubscriptionBackend) ensureService(t *testing.T, node string, svc *structs.NodeService) uint64 { + b.lastIdx++ + require.NoError(t, b.store.EnsureService(b.lastIdx, node, svc)) + return b.lastIdx +} + +func (b *testSubscriptionBackend) ensureCheck(t *testing.T, hc *structs.HealthCheck) uint64 { + b.lastIdx++ + require.NoError(t, b.store.EnsureCheck(b.lastIdx, hc)) + return b.lastIdx +} + +func (b *testSubscriptionBackend) deleteService(t *testing.T, nodeName, serviceID string) uint64 { + b.lastIdx++ + require.NoError(t, b.store.DeleteService(b.lastIdx, nodeName, serviceID, nil, "")) + return b.lastIdx +} + +func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string { + err := store.PeeringWrite(index, &pbpeering.Peering{ + Name: name, + }) + require.NoError(t, err) + + _, p, err := store.PeeringRead(nil, state.Query{Value: name}) + require.NoError(t, err) + require.NotNil(t, p) + + return p.ID +} + func newStateStore(t *testing.T, publisher *stream.EventPublisher) *state.Store { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) require.NoError(t, err) store := state.NewStateStoreWithEventPublisher(gc, publisher) require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot)) require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealthConnect, store.ServiceHealthSnapshot)) - go publisher.Run(context.Background()) + go publisher.Run(ctx) return store } + +func expectEvents( + t *testing.T, + ch <-chan cache.UpdateEvent, + checkFns ...func(t *testing.T, got cache.UpdateEvent), +) { + t.Helper() + + num := len(checkFns) + + var out []cache.UpdateEvent + + if num == 0 { + // No updates should be received. + select { + case <-ch: + t.Fatalf("received unexpected update") + case <-time.After(100 * time.Millisecond): + // Expect this to fire + } + return + } + + const timeout = 10 * time.Second + timeoutCh := time.After(timeout) + + for len(out) < num { + select { + case <-timeoutCh: + t.Fatalf("timed out with %d of %d events after %v", len(out), num, timeout) + case evt := <-ch: + out = append(out, evt) + } + } + + select { + case <-time.After(100 * time.Millisecond): + case evt := <-ch: + t.Fatalf("expected only %d events but got more; prev %+v; next %+v;", num, out, evt) + } + + require.Len(t, out, num) + + sort.SliceStable(out, func(i, j int) bool { + return out[i].CorrelationID < out[j].CorrelationID + }) + + for i := 0; i < num; i++ { + checkFns[i](t, out[i]) + } +} + +func checkEvent( + t *testing.T, + got cache.UpdateEvent, + correlationID string, + expectNodes int, + serviceKindPairs ...string) { + t.Helper() + + require.True(t, len(serviceKindPairs) == 2*expectNodes, "sanity check") + + require.Equal(t, correlationID, got.CorrelationID) + + evt := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), evt.Index) + + if expectNodes == 0 { + require.Len(t, evt.Nodes, 0) + } else { + require.Len(t, evt.Nodes, expectNodes) + + for i := 0; i < expectNodes; i++ { + expectName := serviceKindPairs[i*2] + expectKind := serviceKindPairs[i*2+1] + require.Equal(t, expectName, evt.Nodes[i].Service.Service) + require.Equal(t, expectKind, evt.Nodes[i].Service.Kind) + } + } +} diff --git a/agent/rpc/peering/subscription_state.go b/agent/rpc/peering/subscription_state.go new file mode 100644 index 0000000000..c17fbc4d6f --- /dev/null +++ b/agent/rpc/peering/subscription_state.go @@ -0,0 +1,165 @@ +package peering + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "strings" + + "github.com/golang/protobuf/proto" + "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbservice" +) + +// subscriptionState is a collection of working state tied to a peerID subscription. +type subscriptionState struct { + // partition is immutable + partition string + + // plain data + exportList *structs.ExportedServiceList + + watchedServices map[structs.ServiceName]context.CancelFunc + connectServices map[structs.ServiceName]struct{} + + // eventVersions is a duplicate event suppression system keyed by the "id" + // not the "correlationID" + eventVersions map[string]string + + meshGateway *pbservice.IndexedCheckServiceNodes + + // updateCh is an internal implementation detail for the machinery of the + // manager. + updateCh chan<- cache.UpdateEvent + + // publicUpdateCh is the channel the manager uses to pass data back to the + // caller. + publicUpdateCh chan<- cache.UpdateEvent +} + +func newSubscriptionState(partition string) *subscriptionState { + return &subscriptionState{ + partition: partition, + watchedServices: make(map[structs.ServiceName]context.CancelFunc), + connectServices: make(map[structs.ServiceName]struct{}), + eventVersions: make(map[string]string), + } +} + +func (s *subscriptionState) sendPendingEvents( + ctx context.Context, + logger hclog.Logger, + pending *pendingPayload, +) { + for _, pendingEvt := range pending.Events { + cID := pendingEvt.CorrelationID + newVersion := pendingEvt.Version + + oldVersion, ok := s.eventVersions[pendingEvt.ID] + if ok && newVersion == oldVersion { + logger.Trace("skipping send of duplicate public event", "correlationID", cID) + continue + } + + logger.Trace("sending public event", "correlationID", cID) + s.eventVersions[pendingEvt.ID] = newVersion + + evt := cache.UpdateEvent{ + CorrelationID: cID, + Result: pendingEvt.Result, + } + + select { + case s.publicUpdateCh <- evt: + case <-ctx.Done(): + } + } +} + +func (s *subscriptionState) cleanupEventVersions(logger hclog.Logger) { + for id := range s.eventVersions { + keep := false + switch { + case id == meshGatewayPayloadID: + keep = true + + case strings.HasPrefix(id, servicePayloadIDPrefix): + name := strings.TrimPrefix(id, servicePayloadIDPrefix) + sn := structs.ServiceNameFromString(name) + + if _, ok := s.watchedServices[sn]; ok { + keep = true + } + + case strings.HasPrefix(id, discoveryChainPayloadIDPrefix): + name := strings.TrimPrefix(id, discoveryChainPayloadIDPrefix) + sn := structs.ServiceNameFromString(name) + + if _, ok := s.connectServices[sn]; ok { + keep = true + } + } + + if !keep { + logger.Trace("cleaning up unreferenced event id version", "id", id) + delete(s.eventVersions, id) + } + } +} + +type pendingPayload struct { + Events []pendingEvent +} + +type pendingEvent struct { + ID string + CorrelationID string + Result proto.Message + Version string +} + +const ( + meshGatewayPayloadID = "mesh-gateway" + servicePayloadIDPrefix = "service:" + discoveryChainPayloadIDPrefix = "chain:" +) + +func (p *pendingPayload) Add(id string, correlationID string, raw interface{}) error { + result, ok := raw.(proto.Message) + if !ok { + return fmt.Errorf("invalid type for %q event: %T", correlationID, raw) + } + + version, err := hashProtobuf(result) + if err != nil { + return fmt.Errorf("error hashing %q event: %w", correlationID, err) + } + + p.Events = append(p.Events, pendingEvent{ + ID: id, + CorrelationID: correlationID, + Result: result, + Version: version, + }) + + return nil +} + +func hashProtobuf(res proto.Message) (string, error) { + h := sha256.New() + buffer := proto.NewBuffer(nil) + buffer.SetDeterministic(true) + + err := buffer.Marshal(res) + if err != nil { + return "", err + } + h.Write(buffer.Bytes()) + buffer.Reset() + + return hex.EncodeToString(h.Sum(nil)), nil +} diff --git a/agent/rpc/peering/subscription_state_test.go b/agent/rpc/peering/subscription_state_test.go new file mode 100644 index 0000000000..d71fea4256 --- /dev/null +++ b/agent/rpc/peering/subscription_state_test.go @@ -0,0 +1,200 @@ +package peering + +import ( + "context" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbservice" + "github.com/hashicorp/consul/sdk/testutil" +) + +func TestSubscriptionState_Events(t *testing.T) { + logger := hclog.NewNullLogger() + + partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() + + state := newSubscriptionState(partition) + + testutil.RunStep(t, "empty", func(t *testing.T) { + pending := &pendingPayload{} + + ch := make(chan cache.UpdateEvent, 1) + state.publicUpdateCh = ch + go func() { + state.sendPendingEvents(context.Background(), logger, pending) + close(ch) + }() + + got := drainEvents(t, ch) + require.Len(t, got, 0) + }) + + meshNode1 := &pbservice.CheckServiceNode{ + Node: &pbservice.Node{Node: "foo"}, + Service: &pbservice.NodeService{ID: "mgw-1", Service: "mgw", Kind: "mesh-gateway"}, + } + + testutil.RunStep(t, "one", func(t *testing.T) { + pending := &pendingPayload{} + require.NoError(t, pending.Add( + meshGatewayPayloadID, + subMeshGateway+partition, + &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + proto.Clone(meshNode1).(*pbservice.CheckServiceNode), + }, + }, + )) + + ch := make(chan cache.UpdateEvent, 1) + state.publicUpdateCh = ch + go func() { + state.sendPendingEvents(context.Background(), logger, pending) + close(ch) + }() + + got := drainEvents(t, ch) + require.Len(t, got, 1) + + evt := got[0] + require.Equal(t, subMeshGateway+partition, evt.CorrelationID) + require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 1) + }) + + testutil.RunStep(t, "a duplicate is omitted", func(t *testing.T) { + pending := &pendingPayload{} + require.NoError(t, pending.Add( + meshGatewayPayloadID, + subMeshGateway+partition, + &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + proto.Clone(meshNode1).(*pbservice.CheckServiceNode), + }, + }, + )) + + ch := make(chan cache.UpdateEvent, 1) + state.publicUpdateCh = ch + go func() { + state.sendPendingEvents(context.Background(), logger, pending) + close(ch) + }() + + got := drainEvents(t, ch) + require.Len(t, got, 0) + }) + + webNode1 := &pbservice.CheckServiceNode{ + Node: &pbservice.Node{Node: "zim"}, + Service: &pbservice.NodeService{ID: "web-1", Service: "web"}, + } + + webSN := structs.NewServiceName("web", nil) + + testutil.RunStep(t, "a duplicate is omitted even if mixed", func(t *testing.T) { + pending := &pendingPayload{} + require.NoError(t, pending.Add( + meshGatewayPayloadID, + subMeshGateway+partition, + &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + proto.Clone(meshNode1).(*pbservice.CheckServiceNode), + }, + }, + )) + require.NoError(t, pending.Add( + servicePayloadIDPrefix+webSN.String(), + subExportedService+webSN.String(), + &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + proto.Clone(webNode1).(*pbservice.CheckServiceNode), + }, + }, + )) + + ch := make(chan cache.UpdateEvent, 1) + state.publicUpdateCh = ch + go func() { + state.sendPendingEvents(context.Background(), logger, pending) + close(ch) + }() + + got := drainEvents(t, ch) + require.Len(t, got, 1) + + evt := got[0] + require.Equal(t, subExportedService+webSN.String(), evt.CorrelationID) + require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 1) + }) + + meshNode2 := &pbservice.CheckServiceNode{ + Node: &pbservice.Node{Node: "bar"}, + Service: &pbservice.NodeService{ID: "mgw-2", Service: "mgw", Kind: "mesh-gateway"}, + } + + testutil.RunStep(t, "an update to an existing item is published", func(t *testing.T) { + pending := &pendingPayload{} + require.NoError(t, pending.Add( + meshGatewayPayloadID, + subMeshGateway+partition, + &pbservice.IndexedCheckServiceNodes{ + Nodes: []*pbservice.CheckServiceNode{ + proto.Clone(meshNode1).(*pbservice.CheckServiceNode), + proto.Clone(meshNode2).(*pbservice.CheckServiceNode), + }, + }, + )) + + ch := make(chan cache.UpdateEvent, 1) + state.publicUpdateCh = ch + go func() { + state.sendPendingEvents(context.Background(), logger, pending) + close(ch) + }() + + got := drainEvents(t, ch) + require.Len(t, got, 1) + + evt := got[0] + require.Equal(t, subMeshGateway+partition, evt.CorrelationID) + require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 2) + }) +} + +func drainEvents(t *testing.T, ch <-chan cache.UpdateEvent) []cache.UpdateEvent { + var out []cache.UpdateEvent + + for { + select { + case evt, ok := <-ch: + if !ok { + return out + } + out = append(out, evt) + case <-time.After(100 * time.Millisecond): + t.Fatalf("channel did not close in time") + } + } +} + +func testNewSubscriptionState(partition string) ( + *subscriptionState, + chan cache.UpdateEvent, +) { + var ( + publicUpdateCh = make(chan cache.UpdateEvent, 1) + ) + + state := newSubscriptionState(partition) + state.publicUpdateCh = publicUpdateCh + + return state, publicUpdateCh +} diff --git a/agent/rpc/peering/subscription_view.go b/agent/rpc/peering/subscription_view.go index a6f351b4f5..f009872a25 100644 --- a/agent/rpc/peering/subscription_view.go +++ b/agent/rpc/peering/subscription_view.go @@ -24,12 +24,10 @@ type exportedServiceRequest struct { sub Subscriber } -func newExportedServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest { +func newExportedStandardServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest { req := structs.ServiceSpecificRequest{ - // TODO(peering): Need to subscribe to both Connect and not - Connect: false, - ServiceName: svc.Name, + Connect: false, EnterpriseMeta: svc.EnterpriseMeta, } return &exportedServiceRequest{ @@ -46,10 +44,12 @@ func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo { // NewMaterializer implements submatview.Request func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) { + if e.req.Connect { + return nil, fmt.Errorf("connect views are not supported") + } reqFn := func(index uint64) *pbsubscribe.SubscribeRequest { - // TODO(peering): We need to be able to receive both connect proxies and typical service instances for a given name. - // Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name. - r := &pbsubscribe.SubscribeRequest{ + return &pbsubscribe.SubscribeRequest{ + // Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name. Topic: pbsubscribe.Topic_ServiceHealth, Key: e.req.ServiceName, Token: e.req.Token, @@ -58,10 +58,6 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err Namespace: e.req.EnterpriseMeta.NamespaceOrEmpty(), Partition: e.req.EnterpriseMeta.PartitionOrEmpty(), } - if e.req.Connect { - r.Topic = pbsubscribe.Topic_ServiceHealthConnect - } - return r } deps := submatview.Deps{ View: newExportedServicesView(), diff --git a/agent/rpc/peering/subscription_view_test.go b/agent/rpc/peering/subscription_view_test.go index cbb9d071f3..6165545645 100644 --- a/agent/rpc/peering/subscription_view_test.go +++ b/agent/rpc/peering/subscription_view_test.go @@ -2,6 +2,7 @@ package peering import ( "context" + "fmt" "math/rand" "sort" "sync" @@ -38,87 +39,36 @@ func TestExportedServiceSubscription(t *testing.T) { apiSN := structs.NewServiceName("api", nil) webSN := structs.NewServiceName("web", nil) + newRegisterHealthEvent := func(id, service string) stream.Event { + return stream.Event{ + Topic: pbsubscribe.Topic_ServiceHealth, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ + Service: &structs.NodeService{ + ID: id, + Service: service, + }, + }, + }, + } + } + // List of updates to the state store: // - api: {register api-1, register api-2, register api-3} // - web: {register web-1, deregister web-1, register web-2}1 events := []map[string]stream.Event{ { - apiSN.String(): stream.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Payload: state.EventPayloadCheckServiceNode{ - Op: pbsubscribe.CatalogOp_Register, - Value: &structs.CheckServiceNode{ - Service: &structs.NodeService{ - ID: "api-1", - Service: "api", - }, - }, - }, - }, - webSN.String(): stream.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Payload: state.EventPayloadCheckServiceNode{ - Op: pbsubscribe.CatalogOp_Register, - Value: &structs.CheckServiceNode{ - Service: &structs.NodeService{ - ID: "web-1", - Service: "web", - }, - }, - }, - }, + apiSN.String(): newRegisterHealthEvent("api-1", "api"), + webSN.String(): newRegisterHealthEvent("web-1", "web"), }, { - apiSN.String(): stream.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Payload: state.EventPayloadCheckServiceNode{ - Op: pbsubscribe.CatalogOp_Register, - Value: &structs.CheckServiceNode{ - Service: &structs.NodeService{ - ID: "api-2", - Service: "api", - }, - }, - }, - }, - webSN.String(): stream.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Payload: state.EventPayloadCheckServiceNode{ - Op: pbsubscribe.CatalogOp_Deregister, - Value: &structs.CheckServiceNode{ - Service: &structs.NodeService{ - ID: "web-1", - Service: "web", - }, - }, - }, - }, + apiSN.String(): newRegisterHealthEvent("api-2", "api"), + webSN.String(): newRegisterHealthEvent("web-1", "web"), }, { - apiSN.String(): stream.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Payload: state.EventPayloadCheckServiceNode{ - Op: pbsubscribe.CatalogOp_Register, - Value: &structs.CheckServiceNode{ - Service: &structs.NodeService{ - ID: "api-3", - Service: "api", - }, - }, - }, - }, - webSN.String(): stream.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Payload: state.EventPayloadCheckServiceNode{ - Op: pbsubscribe.CatalogOp_Register, - Value: &structs.CheckServiceNode{ - Service: &structs.NodeService{ - ID: "web-2", - Service: "web", - }, - }, - }, - }, + apiSN.String(): newRegisterHealthEvent("api-3", "api"), + webSN.String(): newRegisterHealthEvent("web-2", "web"), }, } @@ -224,9 +174,10 @@ func (s *store) simulateUpdates(ctx context.Context, events []map[string]stream. switch payload.Op { case pbsubscribe.CatalogOp_Register: svcState.current[payload.Value.Service.ID] = payload.Value - default: - // If not a registration it must be a deregistration: + case pbsubscribe.CatalogOp_Deregister: delete(svcState.current, payload.Value.Service.ID) + default: + panic(fmt.Sprintf("unable to handle op type %v", payload.Op)) } svcState.idsByIndex[idx] = serviceIDsFromMap(svcState.current) @@ -305,7 +256,11 @@ func (c *consumer) consume(ctx context.Context, service string, countExpected in updateCh := make(chan cache.UpdateEvent, 10) group.Go(func() error { - sr := newExportedServiceRequest(hclog.New(nil), structs.NewServiceName(service, nil), c.publisher) + sr := newExportedStandardServiceRequest( + hclog.New(nil), + structs.NewServiceName(service, nil), + c.publisher, + ) return c.viewStore.Notify(gctx, sr, "", updateCh) }) group.Go(func() error { diff --git a/agent/structs/peering.go b/agent/structs/peering.go index ac7828ad57..afde382694 100644 --- a/agent/structs/peering.go +++ b/agent/structs/peering.go @@ -13,3 +13,30 @@ type PeeredService struct { Name ServiceName PeerName string } + +// NOTE: this is not serialized via msgpack so it can be changed without concern. +type ExportedServiceList struct { + // Services is a list of exported services that apply to both standard + // service discovery and service mesh. + Services []ServiceName + + // DiscoChains is a list of exported service that ONLY apply to service mesh. + DiscoChains []ServiceName +} + +// ListAllDiscoveryChains returns all discovery chains (union of Services and +// DiscoChains). +func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]struct{} { + chainsByName := make(map[ServiceName]struct{}) + if list == nil { + return chainsByName + } + + for _, svc := range list.Services { + chainsByName[svc] = struct{}{} + } + for _, chainName := range list.DiscoChains { + chainsByName[chainName] = struct{}{} + } + return chainsByName +} diff --git a/lib/maps/maps.go b/lib/maps/maps.go new file mode 100644 index 0000000000..f65c06619a --- /dev/null +++ b/lib/maps/maps.go @@ -0,0 +1,12 @@ +package maps + +func SliceOfKeys[K comparable, V any](m map[K]V) []K { + if len(m) == 0 { + return nil + } + res := make([]K, 0, len(m)) + for k := range m { + res = append(res, k) + } + return res +} diff --git a/lib/maps/maps_test.go b/lib/maps/maps_test.go new file mode 100644 index 0000000000..365abb5974 --- /dev/null +++ b/lib/maps/maps_test.go @@ -0,0 +1,41 @@ +package maps + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSliceOfKeys(t *testing.T) { + t.Run("string to int", func(t *testing.T) { + m := make(map[string]int) + require.Equal(t, []string(nil), SliceOfKeys(m)) + m["foo"] = 5 + m["bar"] = 6 + require.ElementsMatch(t, []string{"foo", "bar"}, SliceOfKeys(m)) + }) + + type blah struct { + V string + } + + t.Run("int to struct", func(t *testing.T) { + m := make(map[int]blah) + require.Equal(t, []int(nil), SliceOfKeys(m)) + m[5] = blah{V: "foo"} + m[6] = blah{V: "bar"} + require.ElementsMatch(t, []int{5, 6}, SliceOfKeys(m)) + }) + + type id struct { + Name string + } + + t.Run("struct to struct pointer", func(t *testing.T) { + m := make(map[id]*blah) + require.Equal(t, []id(nil), SliceOfKeys(m)) + m[id{Name: "foo"}] = &blah{V: "oof"} + m[id{Name: "bar"}] = &blah{V: "rab"} + require.ElementsMatch(t, []id{{Name: "foo"}, {Name: "bar"}}, SliceOfKeys(m)) + }) +}