mirror of https://github.com/status-im/consul.git
784 lines
23 KiB
Go
784 lines
23 KiB
Go
package peering
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
"github.com/hashicorp/consul/agent/connect"
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/submatview"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/proto/pbcommon"
|
|
"github.com/hashicorp/consul/proto/pbpeering"
|
|
"github.com/hashicorp/consul/proto/pbservice"
|
|
)
|
|
|
|
type MaterializedViewStore interface {
|
|
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
|
|
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
|
|
}
|
|
|
|
type SubscriptionBackend interface {
|
|
Subscriber
|
|
Store() Store
|
|
}
|
|
|
|
// subscriptionManager handlers requests to subscribe to events from an events publisher.
|
|
type subscriptionManager struct {
|
|
logger hclog.Logger
|
|
config Config
|
|
trustDomain string
|
|
viewStore MaterializedViewStore
|
|
backend SubscriptionBackend
|
|
}
|
|
|
|
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
|
|
func newSubscriptionManager(
|
|
ctx context.Context,
|
|
logger hclog.Logger,
|
|
config Config,
|
|
trustDomain string,
|
|
backend SubscriptionBackend,
|
|
) *subscriptionManager {
|
|
logger = logger.Named("subscriptions")
|
|
store := submatview.NewStore(logger.Named("viewstore"))
|
|
go store.Run(ctx)
|
|
|
|
return &subscriptionManager{
|
|
logger: logger,
|
|
config: config,
|
|
trustDomain: trustDomain,
|
|
viewStore: store,
|
|
backend: backend,
|
|
}
|
|
}
|
|
|
|
// subscribe returns a channel that will contain updates to exported service instances for a given peer.
|
|
func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, partition string) <-chan cache.UpdateEvent {
|
|
var (
|
|
updateCh = make(chan cache.UpdateEvent, 1)
|
|
publicUpdateCh = make(chan cache.UpdateEvent, 1)
|
|
)
|
|
|
|
state := newSubscriptionState(peerName, partition)
|
|
state.publicUpdateCh = publicUpdateCh
|
|
state.updateCh = updateCh
|
|
|
|
// Wrap our bare state store queries in goroutines that emit events.
|
|
go m.notifyExportedServicesForPeerID(ctx, state, peerID)
|
|
if !m.config.DisableMeshGatewayMode && m.config.ConnectEnabled {
|
|
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
|
|
}
|
|
|
|
// If connect is enabled, watch for updates to CA roots.
|
|
if m.config.ConnectEnabled {
|
|
go m.notifyRootCAUpdates(ctx, state.updateCh)
|
|
}
|
|
|
|
// This goroutine is the only one allowed to manipulate protected
|
|
// subscriptionManager fields.
|
|
go m.handleEvents(ctx, state, updateCh)
|
|
|
|
return publicUpdateCh
|
|
}
|
|
|
|
func (m *subscriptionManager) handleEvents(ctx context.Context, state *subscriptionState, updateCh <-chan cache.UpdateEvent) {
|
|
for {
|
|
// TODO(peering): exponential backoff
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case update := <-updateCh:
|
|
if err := m.handleEvent(ctx, state, update); err != nil {
|
|
m.logger.Error("Failed to handle update from watch",
|
|
"id", update.CorrelationID, "error", err,
|
|
)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscriptionState, u cache.UpdateEvent) error {
|
|
if u.Err != nil {
|
|
return fmt.Errorf("received error event: %w", u.Err)
|
|
}
|
|
|
|
// TODO(peering): on initial stream setup, transmit the list of exported
|
|
// services for use in differential DELETE/UPSERT. Akin to streaming's snapshot start/end.
|
|
switch {
|
|
case u.CorrelationID == subExportedServiceList:
|
|
// Everything starts with the exported service list coming from
|
|
// our state store watchset loop.
|
|
evt, ok := u.Result.(*structs.ExportedServiceList)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
|
|
state.exportList = evt
|
|
|
|
pending := &pendingPayload{}
|
|
m.syncNormalServices(ctx, state, pending, evt.Services)
|
|
if m.config.DisableMeshGatewayMode {
|
|
m.syncProxyServices(ctx, state, pending, evt.Services)
|
|
} else {
|
|
if m.config.ConnectEnabled {
|
|
m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains())
|
|
}
|
|
}
|
|
state.sendPendingEvents(ctx, m.logger, pending)
|
|
|
|
// cleanup event versions too
|
|
state.cleanupEventVersions(m.logger)
|
|
|
|
case strings.HasPrefix(u.CorrelationID, subExportedService):
|
|
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
|
|
// TODO(peering): is it safe to edit these protobufs in place?
|
|
|
|
// Clear this raft index before exporting.
|
|
csn.Index = 0
|
|
|
|
if !m.config.DisableMeshGatewayMode {
|
|
// Ensure that connect things are scrubbed so we don't mix-and-match
|
|
// with the synthetic entries that point to mesh gateways.
|
|
filterConnectReferences(csn)
|
|
|
|
// Flatten health checks
|
|
for _, instance := range csn.Nodes {
|
|
instance.Checks = flattenChecks(
|
|
instance.Node.Node,
|
|
instance.Service.ID,
|
|
instance.Service.Service,
|
|
instance.Service.EnterpriseMeta,
|
|
instance.Checks,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Scrub raft indexes
|
|
for _, instance := range csn.Nodes {
|
|
instance.Node.RaftIndex = nil
|
|
instance.Service.RaftIndex = nil
|
|
if m.config.DisableMeshGatewayMode {
|
|
for _, chk := range instance.Checks {
|
|
chk.RaftIndex = nil
|
|
}
|
|
}
|
|
// skip checks since we just generated one from scratch
|
|
}
|
|
|
|
id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService)
|
|
|
|
// Just ferry this one directly along to the destination.
|
|
pending := &pendingPayload{}
|
|
if err := pending.Add(id, u.CorrelationID, csn); err != nil {
|
|
return err
|
|
}
|
|
state.sendPendingEvents(ctx, m.logger, pending)
|
|
|
|
case strings.HasPrefix(u.CorrelationID, subExportedProxyService):
|
|
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
|
|
if !m.config.DisableMeshGatewayMode {
|
|
return nil // ignore event
|
|
}
|
|
|
|
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, subExportedProxyService))
|
|
spiffeID := connect.SpiffeIDService{
|
|
Host: m.trustDomain,
|
|
Partition: sn.PartitionOrDefault(),
|
|
Namespace: sn.NamespaceOrDefault(),
|
|
Datacenter: m.config.Datacenter,
|
|
Service: sn.Name,
|
|
}
|
|
peerMeta := &pbservice.PeeringServiceMeta{
|
|
SpiffeID: []string{spiffeID.URI().String()},
|
|
}
|
|
|
|
// skip checks since we just generated one from scratch
|
|
// Set peerMeta on all instances and scrub the raft indexes.
|
|
for _, instance := range csn.Nodes {
|
|
instance.Service.Connect.PeerMeta = peerMeta
|
|
|
|
instance.Node.RaftIndex = nil
|
|
instance.Service.RaftIndex = nil
|
|
if m.config.DisableMeshGatewayMode {
|
|
for _, chk := range instance.Checks {
|
|
chk.RaftIndex = nil
|
|
}
|
|
}
|
|
}
|
|
csn.Index = 0
|
|
|
|
id := proxyServicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedProxyService)
|
|
|
|
// Just ferry this one directly along to the destination.
|
|
pending := &pendingPayload{}
|
|
if err := pending.Add(id, u.CorrelationID, csn); err != nil {
|
|
return err
|
|
}
|
|
state.sendPendingEvents(ctx, m.logger, pending)
|
|
|
|
case strings.HasPrefix(u.CorrelationID, subMeshGateway):
|
|
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
|
|
partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway)
|
|
|
|
if m.config.DisableMeshGatewayMode || !m.config.ConnectEnabled {
|
|
return nil // ignore event
|
|
}
|
|
|
|
if !acl.EqualPartitions(partition, state.partition) {
|
|
return nil // ignore event
|
|
}
|
|
|
|
// Clear this raft index before exporting.
|
|
csn.Index = 0
|
|
|
|
// Flatten health checks
|
|
for _, instance := range csn.Nodes {
|
|
instance.Checks = flattenChecks(
|
|
instance.Node.Node,
|
|
instance.Service.ID,
|
|
instance.Service.Service,
|
|
instance.Service.EnterpriseMeta,
|
|
instance.Checks,
|
|
)
|
|
}
|
|
|
|
// Scrub raft indexes
|
|
for _, instance := range csn.Nodes {
|
|
instance.Node.RaftIndex = nil
|
|
instance.Service.RaftIndex = nil
|
|
// skip checks since we just generated one from scratch
|
|
|
|
// Remove connect things like native mode.
|
|
if instance.Service.Connect != nil || instance.Service.Proxy != nil {
|
|
instance.Service.Connect = nil
|
|
instance.Service.Proxy = nil
|
|
}
|
|
}
|
|
|
|
state.meshGateway = csn
|
|
|
|
pending := &pendingPayload{}
|
|
|
|
// Directly replicate information about our mesh gateways to the consuming side.
|
|
// TODO(peering): should we scrub anything before replicating this?
|
|
if err := pending.Add(meshGatewayPayloadID, u.CorrelationID, csn); err != nil {
|
|
return err
|
|
}
|
|
|
|
if state.exportList != nil {
|
|
// Trigger public events for all synthetic discovery chain replies.
|
|
for chainName, protocol := range state.connectServices {
|
|
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol)
|
|
}
|
|
}
|
|
|
|
// TODO(peering): should we ship this down verbatim to the consumer?
|
|
state.sendPendingEvents(ctx, m.logger, pending)
|
|
|
|
case u.CorrelationID == subCARoot:
|
|
roots, ok := u.Result.(*pbpeering.PeeringTrustBundle)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
pending := &pendingPayload{}
|
|
if err := pending.Add(caRootsPayloadID, u.CorrelationID, roots); err != nil {
|
|
return err
|
|
}
|
|
|
|
state.sendPendingEvents(ctx, m.logger, pending)
|
|
|
|
default:
|
|
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func filterConnectReferences(orig *pbservice.IndexedCheckServiceNodes) {
|
|
newNodes := make([]*pbservice.CheckServiceNode, 0, len(orig.Nodes))
|
|
for i := range orig.Nodes {
|
|
csn := orig.Nodes[i]
|
|
|
|
if csn.Service.Kind != string(structs.ServiceKindTypical) {
|
|
continue // skip non-typical services
|
|
}
|
|
|
|
if strings.HasSuffix(csn.Service.Service, syntheticProxyNameSuffix) {
|
|
// Skip things that might LOOK like a proxy so we don't get a
|
|
// collision with the ones we generate.
|
|
continue
|
|
}
|
|
|
|
// Remove connect things like native mode.
|
|
if csn.Service.Connect != nil || csn.Service.Proxy != nil {
|
|
csn = proto.Clone(csn).(*pbservice.CheckServiceNode)
|
|
csn.Service.Connect = nil
|
|
csn.Service.Proxy = nil
|
|
}
|
|
|
|
newNodes = append(newNodes, csn)
|
|
}
|
|
orig.Nodes = newNodes
|
|
}
|
|
|
|
func (m *subscriptionManager) notifyRootCAUpdates(ctx context.Context, updateCh chan<- cache.UpdateEvent) {
|
|
var idx uint64
|
|
// TODO(peering): retry logic; fail past a threshold
|
|
for {
|
|
var err error
|
|
// Typically, this function will block inside `m.subscribeCARoots` and only return on error.
|
|
// Errors are logged and the watch is retried.
|
|
idx, err = m.subscribeCARoots(ctx, idx, updateCh)
|
|
if errors.Is(err, stream.ErrSubForceClosed) {
|
|
m.logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt resume")
|
|
} else if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
|
m.logger.Warn("failed to subscribe to CA roots, will attempt resume", "error", err.Error())
|
|
} else {
|
|
m.logger.Trace(err.Error())
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// subscribeCARoots subscribes to state.EventTopicCARoots for changes to CA roots.
|
|
// Upon receiving an event it will send the payload in updateCh.
|
|
func (m *subscriptionManager) subscribeCARoots(ctx context.Context, idx uint64, updateCh chan<- cache.UpdateEvent) (uint64, error) {
|
|
// following code adapted from connectca/watch_roots.go
|
|
sub, err := m.backend.Subscribe(&stream.SubscribeRequest{
|
|
Topic: state.EventTopicCARoots,
|
|
Subject: stream.SubjectNone,
|
|
Token: "", // using anonymous token for now
|
|
Index: idx,
|
|
})
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to subscribe to CA Roots events: %w", err)
|
|
}
|
|
defer sub.Unsubscribe()
|
|
|
|
for {
|
|
event, err := sub.Next(ctx)
|
|
switch {
|
|
case errors.Is(err, stream.ErrSubForceClosed):
|
|
// If the subscription was closed because the state store was abandoned (e.g.
|
|
// following a snapshot restore) reset idx to ensure we don't skip over the
|
|
// new store's events.
|
|
select {
|
|
case <-m.backend.Store().AbandonCh():
|
|
idx = 0
|
|
default:
|
|
}
|
|
return idx, err
|
|
case errors.Is(err, context.Canceled):
|
|
return 0, err
|
|
case errors.Is(err, context.DeadlineExceeded):
|
|
return 0, err
|
|
case err != nil:
|
|
return idx, fmt.Errorf("failed to read next event: %w", err)
|
|
}
|
|
|
|
// Note: this check isn't strictly necessary because the event publishing
|
|
// machinery will ensure the index increases monotonically, but it can be
|
|
// tricky to faithfully reproduce this in tests (e.g. the EventPublisher
|
|
// garbage collects topic buffers and snapshots aggressively when streams
|
|
// disconnect) so this avoids a bunch of confusing setup code.
|
|
if event.Index <= idx {
|
|
continue
|
|
}
|
|
|
|
idx = event.Index
|
|
|
|
// We do not send framing events (e.g. EndOfSnapshot, NewSnapshotToFollow)
|
|
// because we send a full list of roots on every event, rather than expecting
|
|
// clients to maintain a state-machine in the way they do for service health.
|
|
if event.IsFramingEvent() {
|
|
continue
|
|
}
|
|
|
|
payload, ok := event.Payload.(state.EventPayloadCARoots)
|
|
if !ok {
|
|
return 0, fmt.Errorf("unexpected event payload type: %T", payload)
|
|
}
|
|
|
|
var rootPems []string
|
|
for _, root := range payload.CARoots {
|
|
rootPems = append(rootPems, root.RootCert)
|
|
}
|
|
|
|
updateCh <- cache.UpdateEvent{
|
|
CorrelationID: subCARoot,
|
|
Result: &pbpeering.PeeringTrustBundle{
|
|
TrustDomain: m.trustDomain,
|
|
RootPEMs: rootPems,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
const subCARoot = "roots"
|
|
|
|
func (m *subscriptionManager) syncNormalServices(
|
|
ctx context.Context,
|
|
state *subscriptionState,
|
|
pending *pendingPayload,
|
|
services []structs.ServiceName,
|
|
) {
|
|
// seen contains the set of exported service names and is used to reconcile the list of watched services.
|
|
seen := make(map[structs.ServiceName]struct{})
|
|
|
|
// Ensure there is a subscription for each service exported to the peer.
|
|
for _, svc := range services {
|
|
seen[svc] = struct{}{}
|
|
|
|
if _, ok := state.watchedServices[svc]; ok {
|
|
// Exported service is already being watched, nothing to do.
|
|
continue
|
|
}
|
|
|
|
notifyCtx, cancel := context.WithCancel(ctx)
|
|
if err := m.NotifyStandardService(notifyCtx, svc, state.updateCh); err != nil {
|
|
cancel()
|
|
m.logger.Error("failed to subscribe to service", "service", svc.String())
|
|
continue
|
|
}
|
|
|
|
state.watchedServices[svc] = cancel
|
|
}
|
|
|
|
// For every subscription without an exported service, call the associated cancel fn.
|
|
for svc, cancel := range state.watchedServices {
|
|
if _, ok := seen[svc]; !ok {
|
|
cancel()
|
|
|
|
delete(state.watchedServices, svc)
|
|
|
|
// Send an empty event to the stream handler to trigger sending a DELETE message.
|
|
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
|
|
err := pending.Add(
|
|
servicePayloadIDPrefix+svc.String(),
|
|
subExportedService+svc.String(),
|
|
&pbservice.IndexedCheckServiceNodes{},
|
|
)
|
|
if err != nil {
|
|
m.logger.Error("failed to send event for service", "service", svc.String(), "error", err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO(peering): remove
|
|
func (m *subscriptionManager) syncProxyServices(
|
|
ctx context.Context,
|
|
state *subscriptionState,
|
|
pending *pendingPayload,
|
|
services []structs.ServiceName,
|
|
) {
|
|
// seen contains the set of exported service names and is used to reconcile the list of watched services.
|
|
seen := make(map[structs.ServiceName]struct{})
|
|
|
|
// Ensure there is a subscription for each service exported to the peer.
|
|
for _, svc := range services {
|
|
seen[svc] = struct{}{}
|
|
|
|
if _, ok := state.watchedProxyServices[svc]; ok {
|
|
// Exported service is already being watched, nothing to do.
|
|
continue
|
|
}
|
|
|
|
notifyCtx, cancel := context.WithCancel(ctx)
|
|
if err := m.NotifyConnectProxyService(notifyCtx, svc, state.updateCh); err != nil {
|
|
cancel()
|
|
m.logger.Error("failed to subscribe to proxy service", "service", svc.String())
|
|
continue
|
|
}
|
|
|
|
state.watchedProxyServices[svc] = cancel
|
|
}
|
|
|
|
// For every subscription without an exported service, call the associated cancel fn.
|
|
for svc, cancel := range state.watchedProxyServices {
|
|
if _, ok := seen[svc]; !ok {
|
|
cancel()
|
|
|
|
delete(state.watchedProxyServices, svc)
|
|
|
|
// Send an empty event to the stream handler to trigger sending a DELETE message.
|
|
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
|
|
err := pending.Add(
|
|
proxyServicePayloadIDPrefix+svc.String(),
|
|
subExportedProxyService+svc.String(),
|
|
&pbservice.IndexedCheckServiceNodes{},
|
|
)
|
|
if err != nil {
|
|
m.logger.Error("failed to send event for proxy service", "service", svc.String(), "error", err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *subscriptionManager) syncDiscoveryChains(
|
|
ctx context.Context,
|
|
state *subscriptionState,
|
|
pending *pendingPayload,
|
|
chainsByName map[structs.ServiceName]string, // TODO(peering):rename variable
|
|
) {
|
|
// if it was newly added, then try to emit an UPDATE event
|
|
for chainName, protocol := range chainsByName {
|
|
if oldProtocol, ok := state.connectServices[chainName]; ok && protocol == oldProtocol {
|
|
continue
|
|
}
|
|
|
|
state.connectServices[chainName] = protocol
|
|
|
|
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol)
|
|
}
|
|
|
|
// if it was dropped, try to emit an DELETE event
|
|
for chainName := range state.connectServices {
|
|
if _, ok := chainsByName[chainName]; ok {
|
|
continue
|
|
}
|
|
|
|
delete(state.connectServices, chainName)
|
|
|
|
if state.meshGateway != nil {
|
|
// Only need to clean this up if we know we may have ever sent it in the first place.
|
|
proxyName := generateProxyNameForDiscoveryChain(chainName)
|
|
err := pending.Add(
|
|
discoveryChainPayloadIDPrefix+chainName.String(),
|
|
subExportedService+proxyName.String(),
|
|
&pbservice.IndexedCheckServiceNodes{},
|
|
)
|
|
if err != nil {
|
|
m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *subscriptionManager) emitEventForDiscoveryChain(
|
|
ctx context.Context,
|
|
state *subscriptionState,
|
|
pending *pendingPayload,
|
|
chainName structs.ServiceName,
|
|
protocol string,
|
|
) {
|
|
if _, ok := state.connectServices[chainName]; !ok {
|
|
return // not found
|
|
}
|
|
|
|
if state.exportList == nil || state.meshGateway == nil {
|
|
return // skip because we don't have the data to do it yet
|
|
}
|
|
|
|
// Emit event with fake data
|
|
proxyName := generateProxyNameForDiscoveryChain(chainName)
|
|
|
|
err := pending.Add(
|
|
discoveryChainPayloadIDPrefix+chainName.String(),
|
|
subExportedService+proxyName.String(),
|
|
createDiscoChainHealth(
|
|
state.peerName,
|
|
m.config.Datacenter,
|
|
m.trustDomain,
|
|
chainName,
|
|
protocol,
|
|
state.meshGateway,
|
|
),
|
|
)
|
|
if err != nil {
|
|
m.logger.Error("failed to send event for discovery chain", "service", chainName.String(), "error", err)
|
|
}
|
|
}
|
|
|
|
func createDiscoChainHealth(
|
|
peerName string,
|
|
datacenter, trustDomain string,
|
|
sn structs.ServiceName,
|
|
protocol string,
|
|
pb *pbservice.IndexedCheckServiceNodes,
|
|
) *pbservice.IndexedCheckServiceNodes {
|
|
fakeProxyName := sn.Name + syntheticProxyNameSuffix
|
|
|
|
var peerMeta *pbservice.PeeringServiceMeta
|
|
{
|
|
spiffeID := connect.SpiffeIDService{
|
|
Host: trustDomain,
|
|
Partition: sn.PartitionOrDefault(),
|
|
Namespace: sn.NamespaceOrDefault(),
|
|
Datacenter: datacenter,
|
|
Service: sn.Name,
|
|
}
|
|
|
|
sni := connect.PeeredServiceSNI(
|
|
sn.Name,
|
|
sn.NamespaceOrDefault(),
|
|
sn.PartitionOrDefault(),
|
|
peerName,
|
|
trustDomain,
|
|
)
|
|
|
|
// Create common peer meta.
|
|
//
|
|
// TODO(peering): should this be replicated by service and not by instance?
|
|
peerMeta = &pbservice.PeeringServiceMeta{
|
|
SNI: []string{sni},
|
|
SpiffeID: []string{spiffeID.URI().String()},
|
|
Protocol: protocol,
|
|
}
|
|
}
|
|
|
|
newNodes := make([]*pbservice.CheckServiceNode, 0, len(pb.Nodes))
|
|
for i := range pb.Nodes {
|
|
gwNode := pb.Nodes[i].Node
|
|
gwService := pb.Nodes[i].Service
|
|
gwChecks := pb.Nodes[i].Checks
|
|
|
|
pbEntMeta := pbcommon.NewEnterpriseMetaFromStructs(sn.EnterpriseMeta)
|
|
|
|
fakeProxyID := fakeProxyName
|
|
destServiceID := sn.Name
|
|
if gwService.ID != "" {
|
|
// This is only going to be relevant if multiple mesh gateways are
|
|
// on the same exporting node.
|
|
fakeProxyID = fmt.Sprintf("%s-instance-%d", fakeProxyName, i)
|
|
destServiceID = fmt.Sprintf("%s-instance-%d", sn.Name, i)
|
|
}
|
|
|
|
csn := &pbservice.CheckServiceNode{
|
|
Node: gwNode,
|
|
Service: &pbservice.NodeService{
|
|
Kind: string(structs.ServiceKindConnectProxy),
|
|
Service: fakeProxyName,
|
|
ID: fakeProxyID,
|
|
EnterpriseMeta: pbEntMeta,
|
|
PeerName: structs.DefaultPeerKeyword,
|
|
Proxy: &pbservice.ConnectProxyConfig{
|
|
DestinationServiceName: sn.Name,
|
|
DestinationServiceID: destServiceID,
|
|
},
|
|
// direct
|
|
Address: gwService.Address,
|
|
TaggedAddresses: gwService.TaggedAddresses,
|
|
Port: gwService.Port,
|
|
SocketPath: gwService.SocketPath,
|
|
Weights: gwService.Weights,
|
|
Connect: &pbservice.ServiceConnect{
|
|
PeerMeta: peerMeta,
|
|
},
|
|
},
|
|
Checks: flattenChecks(gwNode.Node, fakeProxyID, fakeProxyName, pbEntMeta, gwChecks),
|
|
}
|
|
newNodes = append(newNodes, csn)
|
|
}
|
|
|
|
return &pbservice.IndexedCheckServiceNodes{
|
|
Index: 0,
|
|
Nodes: newNodes,
|
|
}
|
|
}
|
|
|
|
func flattenChecks(
|
|
nodeName string,
|
|
serviceID string,
|
|
serviceName string,
|
|
entMeta *pbcommon.EnterpriseMeta,
|
|
checks []*pbservice.HealthCheck,
|
|
) []*pbservice.HealthCheck {
|
|
if len(checks) == 0 {
|
|
return nil
|
|
}
|
|
|
|
healthStatus := api.HealthPassing
|
|
for _, chk := range checks {
|
|
if chk.Status != api.HealthPassing {
|
|
healthStatus = chk.Status
|
|
}
|
|
}
|
|
|
|
if serviceID == "" {
|
|
serviceID = serviceName
|
|
}
|
|
|
|
return []*pbservice.HealthCheck{
|
|
{
|
|
CheckID: serviceID + ":overall-check",
|
|
Name: "overall-check",
|
|
Status: healthStatus,
|
|
Node: nodeName,
|
|
ServiceID: serviceID,
|
|
ServiceName: serviceName,
|
|
EnterpriseMeta: entMeta,
|
|
PeerName: structs.DefaultPeerKeyword,
|
|
},
|
|
}
|
|
}
|
|
|
|
const (
|
|
subExportedServiceList = "exported-service-list"
|
|
subExportedService = "exported-service:"
|
|
subExportedProxyService = "exported-proxy-service:"
|
|
subMeshGateway = "mesh-gateway:"
|
|
)
|
|
|
|
// NotifyStandardService will notify the given channel when there are updates
|
|
// to the requested service of the same name in the catalog.
|
|
func (m *subscriptionManager) NotifyStandardService(
|
|
ctx context.Context,
|
|
svc structs.ServiceName,
|
|
updateCh chan<- cache.UpdateEvent,
|
|
) error {
|
|
sr := newExportedStandardServiceRequest(m.logger, svc, m.backend)
|
|
return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh)
|
|
}
|
|
func (m *subscriptionManager) NotifyConnectProxyService(
|
|
ctx context.Context,
|
|
svc structs.ServiceName,
|
|
updateCh chan<- cache.UpdateEvent,
|
|
) error {
|
|
sr := newExportedConnectProxyServiceRequest(m.logger, svc, m.backend)
|
|
return m.viewStore.Notify(ctx, sr, subExportedProxyService+svc.String(), updateCh)
|
|
}
|
|
|
|
// syntheticProxyNameSuffix is the suffix to add to synthetic proxies we
|
|
// replicate to route traffic to an exported discovery chain through the mesh
|
|
// gateways.
|
|
//
|
|
// This name was chosen to match existing "sidecar service" generation logic
|
|
// and similar logic in the Service Identity synthetic ACL policies.
|
|
const syntheticProxyNameSuffix = "-sidecar-proxy"
|
|
|
|
func generateProxyNameForDiscoveryChain(sn structs.ServiceName) structs.ServiceName {
|
|
return structs.NewServiceName(sn.Name+syntheticProxyNameSuffix, &sn.EnterpriseMeta)
|
|
}
|