mirror of https://github.com/status-im/consul.git
Add proxycfg state management for terminating-gateways
This commit is contained in:
parent
c9385129ae
commit
24207226ca
|
@ -134,6 +134,7 @@ func (m *Manager) syncState() {
|
||||||
services := m.State.Services(structs.WildcardEnterpriseMeta())
|
services := m.State.Services(structs.WildcardEnterpriseMeta())
|
||||||
for sid, svc := range services {
|
for sid, svc := range services {
|
||||||
if svc.Kind != structs.ServiceKindConnectProxy &&
|
if svc.Kind != structs.ServiceKindConnectProxy &&
|
||||||
|
svc.Kind != structs.ServiceKindTerminatingGateway &&
|
||||||
svc.Kind != structs.ServiceKindMeshGateway &&
|
svc.Kind != structs.ServiceKindMeshGateway &&
|
||||||
svc.Kind != structs.ServiceKindIngressGateway {
|
svc.Kind != structs.ServiceKindIngressGateway {
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -57,11 +57,51 @@ func (c *configSnapshotConnectProxy) IsEmpty() bool {
|
||||||
len(c.PreparedQueryEndpoints) == 0
|
len(c.PreparedQueryEndpoints) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type configSnapshotTerminatingGateway struct {
|
||||||
|
// WatchedServices is a map of service id to a cancel function. This cancel
|
||||||
|
// function is tied to the watch of linked service instances for the given
|
||||||
|
// id. If the linked services watch would indicate the removal of
|
||||||
|
// a service altogether we then cancel watching that service for its endpoints.
|
||||||
|
WatchedServices map[structs.ServiceID]context.CancelFunc
|
||||||
|
|
||||||
|
// WatchedIntentions is a map of service id to a cancel function.
|
||||||
|
// This cancel function is tied to the watch of intentions for linked services.
|
||||||
|
// As with WatchedServices, intention watches will be cancelled when services
|
||||||
|
// are no longer linked to the gateway.
|
||||||
|
WatchedIntentions map[structs.ServiceID]context.CancelFunc
|
||||||
|
|
||||||
|
// WatchedLeaves is a map of ServiceID to a cancel function.
|
||||||
|
// This cancel function is tied to the watch of leaf certs for linked services.
|
||||||
|
// As with WatchedServices, leaf watches will be cancelled when services
|
||||||
|
// are no longer linked to the gateway.
|
||||||
|
WatchedLeaves map[structs.ServiceID]context.CancelFunc
|
||||||
|
|
||||||
|
// ServiceLeaves is a map of ServiceID to a leaf cert.
|
||||||
|
// Terminating gateways will present different certificates depending
|
||||||
|
// on the service that the caller is trying to reach.
|
||||||
|
ServiceLeaves map[structs.ServiceID]*structs.IssuedCert
|
||||||
|
|
||||||
|
// ServiceGroups is a map of service id to the service instances of that
|
||||||
|
// service in the local datacenter.
|
||||||
|
ServiceGroups map[structs.ServiceID]structs.CheckServiceNodes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *configSnapshotTerminatingGateway) IsEmpty() bool {
|
||||||
|
if c == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return len(c.ServiceLeaves) == 0 &&
|
||||||
|
len(c.WatchedLeaves) == 0 &&
|
||||||
|
len(c.WatchedIntentions) == 0 &&
|
||||||
|
len(c.ServiceGroups) == 0 &&
|
||||||
|
len(c.WatchedServices) == 0
|
||||||
|
}
|
||||||
|
|
||||||
type configSnapshotMeshGateway struct {
|
type configSnapshotMeshGateway struct {
|
||||||
// WatchedServices is a map of service id to a cancel function. This cancel
|
// WatchedServices is a map of service id to a cancel function. This cancel
|
||||||
// function is tied to the watch of connect enabled services for the given
|
// function is tied to the watch of connect enabled services for the given
|
||||||
// id. If the main datacenter services watch would indicate the removal of
|
// id. If the main datacenter services watch would indicate the removal of
|
||||||
// a service all together we then cancel watching that service for its
|
// a service altogether we then cancel watching that service for its
|
||||||
// connect endpoints.
|
// connect endpoints.
|
||||||
WatchedServices map[structs.ServiceID]context.CancelFunc
|
WatchedServices map[structs.ServiceID]context.CancelFunc
|
||||||
|
|
||||||
|
@ -177,6 +217,9 @@ type ConfigSnapshot struct {
|
||||||
// connect-proxy specific
|
// connect-proxy specific
|
||||||
ConnectProxy configSnapshotConnectProxy
|
ConnectProxy configSnapshotConnectProxy
|
||||||
|
|
||||||
|
// terminating-gateway specific
|
||||||
|
TerminatingGateway configSnapshotTerminatingGateway
|
||||||
|
|
||||||
// mesh-gateway specific
|
// mesh-gateway specific
|
||||||
MeshGateway configSnapshotMeshGateway
|
MeshGateway configSnapshotMeshGateway
|
||||||
|
|
||||||
|
@ -191,6 +234,8 @@ func (s *ConfigSnapshot) Valid() bool {
|
||||||
switch s.Kind {
|
switch s.Kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
return s.Roots != nil && s.ConnectProxy.Leaf != nil
|
return s.Roots != nil && s.ConnectProxy.Leaf != nil
|
||||||
|
case structs.ServiceKindTerminatingGateway:
|
||||||
|
return s.Roots != nil
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
if s.ServiceMeta[structs.MetaWANFederationKey] == "1" {
|
if s.ServiceMeta[structs.MetaWANFederationKey] == "1" {
|
||||||
if len(s.MeshGateway.ConsulServers) == 0 {
|
if len(s.MeshGateway.ConsulServers) == 0 {
|
||||||
|
@ -221,6 +266,10 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
snap.ConnectProxy.WatchedUpstreams = nil
|
snap.ConnectProxy.WatchedUpstreams = nil
|
||||||
snap.ConnectProxy.WatchedGateways = nil
|
snap.ConnectProxy.WatchedGateways = nil
|
||||||
|
case structs.ServiceKindTerminatingGateway:
|
||||||
|
snap.TerminatingGateway.WatchedServices = nil
|
||||||
|
snap.TerminatingGateway.WatchedIntentions = nil
|
||||||
|
snap.TerminatingGateway.WatchedLeaves = nil
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
snap.MeshGateway.WatchedDatacenters = nil
|
snap.MeshGateway.WatchedDatacenters = nil
|
||||||
snap.MeshGateway.WatchedServices = nil
|
snap.MeshGateway.WatchedServices = nil
|
||||||
|
|
|
@ -109,10 +109,11 @@ func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error
|
||||||
func newState(ns *structs.NodeService, token string) (*state, error) {
|
func newState(ns *structs.NodeService, token string) (*state, error) {
|
||||||
switch ns.Kind {
|
switch ns.Kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
|
case structs.ServiceKindTerminatingGateway:
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
case structs.ServiceKindIngressGateway:
|
case structs.ServiceKindIngressGateway:
|
||||||
default:
|
default:
|
||||||
return nil, errors.New("not a connect-proxy, mesh-gateway, or ingress-gateway")
|
return nil, errors.New("not a connect-proxy, terminating-gateway, mesh-gateway, or ingress-gateway")
|
||||||
}
|
}
|
||||||
|
|
||||||
proxyCfg, err := copyProxyConfig(ns)
|
proxyCfg, err := copyProxyConfig(ns)
|
||||||
|
@ -184,6 +185,8 @@ func (s *state) initWatches() error {
|
||||||
switch s.kind {
|
switch s.kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
return s.initWatchesConnectProxy()
|
return s.initWatchesConnectProxy()
|
||||||
|
case structs.ServiceKindTerminatingGateway:
|
||||||
|
return s.initWatchesTerminatingGateway()
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
return s.initWatchesMeshGateway()
|
return s.initWatchesMeshGateway()
|
||||||
case structs.ServiceKindIngressGateway:
|
case structs.ServiceKindIngressGateway:
|
||||||
|
@ -359,6 +362,37 @@ func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initWatchesTerminatingGateway sets up the initial watches needed based on the terminating-gateway registration
|
||||||
|
func (s *state) initWatchesTerminatingGateway() error {
|
||||||
|
// Watch for root changes
|
||||||
|
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||||
|
Datacenter: s.source.Datacenter,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
Source: *s.source,
|
||||||
|
}, rootsWatchID, s.ch)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Named(logging.TerminatingGateway).
|
||||||
|
Error("failed to register watch for root changes", "error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watch for the terminating-gateway's linked services
|
||||||
|
err = s.cache.Notify(s.ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: s.source.Datacenter,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
ServiceName: s.service,
|
||||||
|
ServiceKind: structs.ServiceKindTerminatingGateway,
|
||||||
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||||
|
}, gatewayServicesWatchID, s.ch)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Named(logging.TerminatingGateway).
|
||||||
|
Error("failed to register watch for linked services", "error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// initWatchesMeshGateway sets up the watches needed based on the current mesh gateway registration
|
// initWatchesMeshGateway sets up the watches needed based on the current mesh gateway registration
|
||||||
func (s *state) initWatchesMeshGateway() error {
|
func (s *state) initWatchesMeshGateway() error {
|
||||||
// Watch for root changes
|
// Watch for root changes
|
||||||
|
@ -498,7 +532,12 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
|
||||||
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
|
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
|
||||||
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
|
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
|
||||||
snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes)
|
snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes)
|
||||||
|
case structs.ServiceKindTerminatingGateway:
|
||||||
|
snap.TerminatingGateway.WatchedServices = make(map[structs.ServiceID]context.CancelFunc)
|
||||||
|
snap.TerminatingGateway.WatchedLeaves = make(map[structs.ServiceID]context.CancelFunc)
|
||||||
|
snap.TerminatingGateway.WatchedIntentions = make(map[structs.ServiceID]context.CancelFunc)
|
||||||
|
snap.TerminatingGateway.ServiceLeaves = make(map[structs.ServiceID]*structs.IssuedCert)
|
||||||
|
snap.TerminatingGateway.ServiceGroups = make(map[structs.ServiceID]structs.CheckServiceNodes)
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
snap.MeshGateway.WatchedServices = make(map[structs.ServiceID]context.CancelFunc)
|
snap.MeshGateway.WatchedServices = make(map[structs.ServiceID]context.CancelFunc)
|
||||||
snap.MeshGateway.WatchedDatacenters = make(map[string]context.CancelFunc)
|
snap.MeshGateway.WatchedDatacenters = make(map[string]context.CancelFunc)
|
||||||
|
@ -611,6 +650,8 @@ func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
switch s.kind {
|
switch s.kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
return s.handleUpdateConnectProxy(u, snap)
|
return s.handleUpdateConnectProxy(u, snap)
|
||||||
|
case structs.ServiceKindTerminatingGateway:
|
||||||
|
return s.handleUpdateTerminatingGateway(u, snap)
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
return s.handleUpdateMeshGateway(u, snap)
|
return s.handleUpdateMeshGateway(u, snap)
|
||||||
case structs.ServiceKindIngressGateway:
|
case structs.ServiceKindIngressGateway:
|
||||||
|
@ -633,7 +674,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
||||||
}
|
}
|
||||||
snap.Roots = roots
|
snap.Roots = roots
|
||||||
case u.CorrelationID == intentionsWatchID:
|
case u.CorrelationID == intentionsWatchID:
|
||||||
// Not in snapshot currently, no op
|
// no-op: Intentions don't get stored in the snapshot, calls to ConnectAuthorize will fetch them from the cache
|
||||||
|
|
||||||
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
|
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
|
||||||
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
|
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
|
||||||
|
@ -842,6 +883,178 @@ func (s *state) resetWatchesFromChain(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
|
if u.Err != nil {
|
||||||
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||||
|
}
|
||||||
|
logger := s.logger.Named(logging.TerminatingGateway)
|
||||||
|
|
||||||
|
switch u.CorrelationID {
|
||||||
|
case rootsWatchID:
|
||||||
|
roots, ok := u.Result.(*structs.IndexedCARoots)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
|
}
|
||||||
|
snap.Roots = roots
|
||||||
|
|
||||||
|
// Update watches based on the current list of services associated with the terminating-gateway
|
||||||
|
case gatewayServicesWatchID:
|
||||||
|
services, ok := u.Result.(*structs.IndexedGatewayServices)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
|
}
|
||||||
|
|
||||||
|
svcMap := make(map[structs.ServiceID]struct{})
|
||||||
|
for _, svc := range services.Services {
|
||||||
|
// Make sure to add every service to this map, we use it to cancel watches below.
|
||||||
|
svcMap[svc.Service] = struct{}{}
|
||||||
|
|
||||||
|
// Watch the health endpoint to discover endpoints for the service
|
||||||
|
if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok {
|
||||||
|
ctx, cancel := context.WithCancel(s.ctx)
|
||||||
|
err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: s.source.Datacenter,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
ServiceName: svc.Service.ID,
|
||||||
|
EnterpriseMeta: svc.Service.EnterpriseMeta,
|
||||||
|
|
||||||
|
// The gateway acts as the service's proxy, so we do NOT want to discover other proxies
|
||||||
|
Connect: false,
|
||||||
|
}, fmt.Sprintf("external-service:%s", svc.Service.String()), s.ch)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to register watch for external-service",
|
||||||
|
"service", svc.Service.String(),
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
snap.TerminatingGateway.WatchedServices[svc.Service] = cancel
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watch intentions with this service as their destination
|
||||||
|
// The gateway will enforce intentions for connections to the service
|
||||||
|
if _, ok := snap.TerminatingGateway.WatchedIntentions[svc.Service]; !ok {
|
||||||
|
ctx, cancel := context.WithCancel(s.ctx)
|
||||||
|
err := s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
|
||||||
|
Datacenter: s.source.Datacenter,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
Match: &structs.IntentionQueryMatch{
|
||||||
|
Type: structs.IntentionMatchDestination,
|
||||||
|
Entries: []structs.IntentionMatchEntry{
|
||||||
|
{
|
||||||
|
Namespace: svc.Service.NamespaceOrDefault(),
|
||||||
|
Name: svc.Service.ID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, fmt.Sprintf("service-intentions:%s", svc.Service.String()), s.ch)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to register watch for service-intentions",
|
||||||
|
"service", svc.Service.String(),
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
snap.TerminatingGateway.WatchedIntentions[svc.Service] = cancel
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watch leaf certificate for the service
|
||||||
|
// This cert is used to terminate mTLS connections on the service's behalf
|
||||||
|
if _, ok := snap.TerminatingGateway.WatchedLeaves[svc.Service]; !ok {
|
||||||
|
ctx, cancel := context.WithCancel(s.ctx)
|
||||||
|
err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
||||||
|
Datacenter: s.source.Datacenter,
|
||||||
|
Token: s.token,
|
||||||
|
Service: svc.Service.ID,
|
||||||
|
EnterpriseMeta: svc.Service.EnterpriseMeta,
|
||||||
|
}, fmt.Sprintf("service-leaf:%s", svc.Service.String()), s.ch)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to register watch for a service-leaf",
|
||||||
|
"service", svc.Service.String(),
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
snap.TerminatingGateway.WatchedLeaves[svc.Service] = cancel
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel service instance watches for services that were not in the update
|
||||||
|
for sid, cancelFn := range snap.TerminatingGateway.WatchedServices {
|
||||||
|
if _, ok := svcMap[sid]; !ok {
|
||||||
|
logger.Debug("canceling watch for service", "service", sid.String())
|
||||||
|
delete(snap.TerminatingGateway.WatchedServices, sid)
|
||||||
|
delete(snap.TerminatingGateway.ServiceGroups, sid)
|
||||||
|
cancelFn()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel leaf cert watches for services that were not in the update
|
||||||
|
for sid, cancelFn := range snap.TerminatingGateway.WatchedLeaves {
|
||||||
|
if _, ok := svcMap[sid]; !ok {
|
||||||
|
logger.Debug("canceling watch for leaf cert", "service", sid.String())
|
||||||
|
delete(snap.TerminatingGateway.WatchedLeaves, sid)
|
||||||
|
delete(snap.TerminatingGateway.ServiceLeaves, sid)
|
||||||
|
cancelFn()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel intention watches for services that were not in the update
|
||||||
|
for sid, cancelFn := range snap.TerminatingGateway.WatchedIntentions {
|
||||||
|
if _, ok := svcMap[sid]; !ok {
|
||||||
|
logger.Debug("canceling watch for intention", "service", sid.String())
|
||||||
|
delete(snap.TerminatingGateway.WatchedIntentions, sid)
|
||||||
|
|
||||||
|
// No additional deletions needed, since intentions aren't stored in snapshot
|
||||||
|
|
||||||
|
cancelFn()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
switch {
|
||||||
|
// Store service instances for watched service
|
||||||
|
case strings.HasPrefix(u.CorrelationID, "external-service:"):
|
||||||
|
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
|
}
|
||||||
|
|
||||||
|
sid := structs.ServiceIDFromString(strings.TrimPrefix(u.CorrelationID, "external-service:"))
|
||||||
|
|
||||||
|
if len(resp.Nodes) > 0 {
|
||||||
|
snap.TerminatingGateway.ServiceGroups[sid] = resp.Nodes
|
||||||
|
} else if _, ok := snap.TerminatingGateway.ServiceGroups[sid]; ok {
|
||||||
|
delete(snap.TerminatingGateway.ServiceGroups, sid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store leaf cert for watched service
|
||||||
|
case strings.HasPrefix(u.CorrelationID, "service-leaf:"):
|
||||||
|
leaf, ok := u.Result.(*structs.IssuedCert)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
|
}
|
||||||
|
|
||||||
|
sid := structs.ServiceIDFromString(strings.TrimPrefix(u.CorrelationID, "service-leaf:"))
|
||||||
|
snap.TerminatingGateway.ServiceLeaves[sid] = leaf
|
||||||
|
|
||||||
|
case strings.HasPrefix(u.CorrelationID, "service-intentions:"):
|
||||||
|
// no-op: Intentions don't get stored in the snapshot, calls to ConnectAuthorize will fetch them from the cache
|
||||||
|
|
||||||
|
default:
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
if u.Err != nil {
|
if u.Err != nil {
|
||||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||||
|
@ -900,6 +1113,8 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
|
||||||
for sid, cancelFn := range snap.MeshGateway.WatchedServices {
|
for sid, cancelFn := range snap.MeshGateway.WatchedServices {
|
||||||
if _, ok := svcMap[sid]; !ok {
|
if _, ok := svcMap[sid]; !ok {
|
||||||
meshLogger.Debug("canceling watch for service", "service", sid.String())
|
meshLogger.Debug("canceling watch for service", "service", sid.String())
|
||||||
|
// TODO (gateways) Should the sid also be deleted from snap.MeshGateway.ServiceGroups?
|
||||||
|
// Do those endpoints get cleaned up some other way?
|
||||||
delete(snap.MeshGateway.WatchedServices, sid)
|
delete(snap.MeshGateway.WatchedServices, sid)
|
||||||
cancelFn()
|
cancelFn()
|
||||||
}
|
}
|
||||||
|
|
|
@ -597,7 +597,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
require.True(t, snap.Valid(), "gateway with empty service list is vaild")
|
require.True(t, snap.Valid(), "gateway with empty service list is valid")
|
||||||
require.True(t, snap.ConnectProxy.IsEmpty())
|
require.True(t, snap.ConnectProxy.IsEmpty())
|
||||||
require.Equal(t, indexedRoots, snap.Roots)
|
require.Equal(t, indexedRoots, snap.Roots)
|
||||||
require.Empty(t, snap.MeshGateway.WatchedServices)
|
require.Empty(t, snap.MeshGateway.WatchedServices)
|
||||||
|
@ -639,7 +639,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
require.True(t, snap.Valid(), "gateway with service list is vaild")
|
require.True(t, snap.Valid(), "gateway with service list is valid")
|
||||||
require.Len(t, snap.MeshGateway.WatchedServices, 1)
|
require.Len(t, snap.MeshGateway.WatchedServices, 1)
|
||||||
require.True(t, snap.MeshGateway.WatchedServicesSet)
|
require.True(t, snap.MeshGateway.WatchedServicesSet)
|
||||||
},
|
},
|
||||||
|
@ -658,7 +658,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
require.True(t, snap.Valid(), "gateway with service list is vaild")
|
require.True(t, snap.Valid(), "gateway with service list is valid")
|
||||||
require.Len(t, snap.MeshGateway.WatchedServices, 2)
|
require.Len(t, snap.MeshGateway.WatchedServices, 2)
|
||||||
require.True(t, snap.MeshGateway.WatchedServicesSet)
|
require.True(t, snap.MeshGateway.WatchedServicesSet)
|
||||||
},
|
},
|
||||||
|
@ -798,6 +798,208 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"terminating-gateway-initial": testCase{
|
||||||
|
ns: structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindTerminatingGateway,
|
||||||
|
ID: "terminating-gateway",
|
||||||
|
Service: "terminating-gateway",
|
||||||
|
Address: "10.0.1.1",
|
||||||
|
},
|
||||||
|
sourceDC: "dc1",
|
||||||
|
stages: []verificationStage{
|
||||||
|
verificationStage{
|
||||||
|
requiredWatches: map[string]verifyWatchRequest{
|
||||||
|
rootsWatchID: genVerifyRootsWatch("dc1"),
|
||||||
|
gatewayServicesWatchID: genVerifyServiceSpecificRequest(gatewayServicesWatchID,
|
||||||
|
"terminating-gateway", "", "dc1", false),
|
||||||
|
},
|
||||||
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
|
require.False(t, snap.Valid(), "gateway without root is not valid")
|
||||||
|
require.True(t, snap.ConnectProxy.IsEmpty())
|
||||||
|
},
|
||||||
|
},
|
||||||
|
verificationStage{
|
||||||
|
events: []cache.UpdateEvent{
|
||||||
|
rootWatchEvent(),
|
||||||
|
},
|
||||||
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
|
require.True(t, snap.Valid(), "gateway without services is valid")
|
||||||
|
require.True(t, snap.ConnectProxy.IsEmpty())
|
||||||
|
require.Equal(t, indexedRoots, snap.Roots)
|
||||||
|
require.Empty(t, snap.TerminatingGateway.WatchedServices)
|
||||||
|
require.Empty(t, snap.TerminatingGateway.WatchedLeaves)
|
||||||
|
require.Empty(t, snap.TerminatingGateway.ServiceGroups)
|
||||||
|
require.Empty(t, snap.TerminatingGateway.WatchedIntentions)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"terminating-gateway-handle-update": testCase{
|
||||||
|
ns: structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindTerminatingGateway,
|
||||||
|
ID: "terminating-gateway",
|
||||||
|
Service: "terminating-gateway",
|
||||||
|
Address: "10.0.1.1",
|
||||||
|
},
|
||||||
|
sourceDC: "dc1",
|
||||||
|
stages: []verificationStage{
|
||||||
|
verificationStage{
|
||||||
|
requiredWatches: map[string]verifyWatchRequest{
|
||||||
|
rootsWatchID: genVerifyRootsWatch("dc1"),
|
||||||
|
gatewayServicesWatchID: genVerifyServiceSpecificRequest(gatewayServicesWatchID,
|
||||||
|
"terminating-gateway", "", "dc1", false),
|
||||||
|
},
|
||||||
|
events: []cache.UpdateEvent{
|
||||||
|
rootWatchEvent(),
|
||||||
|
cache.UpdateEvent{
|
||||||
|
CorrelationID: gatewayServicesWatchID,
|
||||||
|
Result: &structs.IndexedGatewayServices{
|
||||||
|
Services: structs.GatewayServices{
|
||||||
|
{
|
||||||
|
Service: structs.NewServiceID("db", nil),
|
||||||
|
Gateway: structs.NewServiceID("terminating-gateway", nil),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Err: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
|
require.True(t, snap.Valid(), "gateway with service list is valid")
|
||||||
|
require.Len(t, snap.TerminatingGateway.WatchedServices, 1)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
verificationStage{
|
||||||
|
events: []cache.UpdateEvent{
|
||||||
|
cache.UpdateEvent{
|
||||||
|
CorrelationID: gatewayServicesWatchID,
|
||||||
|
Result: &structs.IndexedGatewayServices{
|
||||||
|
Services: structs.GatewayServices{
|
||||||
|
{
|
||||||
|
Service: structs.NewServiceID("db", nil),
|
||||||
|
Gateway: structs.NewServiceID("terminating-gateway", nil),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Service: structs.NewServiceID("billing", nil),
|
||||||
|
Gateway: structs.NewServiceID("terminating-gateway", nil),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Err: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
|
db := structs.NewServiceID("db", nil)
|
||||||
|
billing := structs.NewServiceID("billing", nil)
|
||||||
|
|
||||||
|
require.True(t, snap.Valid(), "gateway with service list is valid")
|
||||||
|
require.Len(t, snap.TerminatingGateway.WatchedServices, 2)
|
||||||
|
require.Contains(t, snap.TerminatingGateway.WatchedServices, db)
|
||||||
|
require.Contains(t, snap.TerminatingGateway.WatchedServices, billing)
|
||||||
|
|
||||||
|
require.Len(t, snap.TerminatingGateway.WatchedIntentions, 2)
|
||||||
|
require.Contains(t, snap.TerminatingGateway.WatchedIntentions, db)
|
||||||
|
require.Contains(t, snap.TerminatingGateway.WatchedIntentions, billing)
|
||||||
|
|
||||||
|
require.Len(t, snap.TerminatingGateway.WatchedLeaves, 2)
|
||||||
|
require.Contains(t, snap.TerminatingGateway.WatchedLeaves, db)
|
||||||
|
require.Contains(t, snap.TerminatingGateway.WatchedLeaves, billing)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
verificationStage{
|
||||||
|
requiredWatches: map[string]verifyWatchRequest{
|
||||||
|
"external-service:db": genVerifyServiceWatch("db", "", "dc1", false),
|
||||||
|
},
|
||||||
|
events: []cache.UpdateEvent{
|
||||||
|
cache.UpdateEvent{
|
||||||
|
CorrelationID: "external-service:db",
|
||||||
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
|
Nodes: structs.CheckServiceNodes{
|
||||||
|
{
|
||||||
|
Node: &structs.Node{
|
||||||
|
Node: "node1",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
},
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: "db",
|
||||||
|
Service: "db",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Err: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
|
require.Len(t, snap.TerminatingGateway.ServiceGroups, 1)
|
||||||
|
require.Equal(t, snap.TerminatingGateway.ServiceGroups[structs.NewServiceID("db", nil)],
|
||||||
|
structs.CheckServiceNodes{
|
||||||
|
{
|
||||||
|
Node: &structs.Node{
|
||||||
|
Node: "node1",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
},
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: "db",
|
||||||
|
Service: "db",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
verificationStage{
|
||||||
|
requiredWatches: map[string]verifyWatchRequest{
|
||||||
|
"service-leaf:db": genVerifyLeafWatch("db", "dc1"),
|
||||||
|
},
|
||||||
|
events: []cache.UpdateEvent{
|
||||||
|
cache.UpdateEvent{
|
||||||
|
CorrelationID: "service-leaf:db",
|
||||||
|
Result: issuedCert,
|
||||||
|
Err: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
|
require.Equal(t, snap.TerminatingGateway.ServiceLeaves[structs.NewServiceID("db", nil)], issuedCert)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
verificationStage{
|
||||||
|
events: []cache.UpdateEvent{
|
||||||
|
cache.UpdateEvent{
|
||||||
|
CorrelationID: gatewayServicesWatchID,
|
||||||
|
Result: &structs.IndexedGatewayServices{
|
||||||
|
Services: structs.GatewayServices{
|
||||||
|
{
|
||||||
|
Service: structs.NewServiceID("billing", nil),
|
||||||
|
Gateway: structs.NewServiceID("terminating-gateway", nil),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Err: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
|
billing := structs.NewServiceID("billing", nil)
|
||||||
|
|
||||||
|
require.True(t, snap.Valid(), "gateway with service list is valid")
|
||||||
|
|
||||||
|
// All three watches should have been cancelled for db
|
||||||
|
require.Len(t, snap.TerminatingGateway.WatchedServices, 1)
|
||||||
|
require.Contains(t, snap.TerminatingGateway.WatchedServices, billing)
|
||||||
|
|
||||||
|
require.Len(t, snap.TerminatingGateway.WatchedIntentions, 1)
|
||||||
|
require.Contains(t, snap.TerminatingGateway.WatchedIntentions, billing)
|
||||||
|
|
||||||
|
require.Len(t, snap.TerminatingGateway.WatchedLeaves, 1)
|
||||||
|
require.Contains(t, snap.TerminatingGateway.WatchedLeaves, billing)
|
||||||
|
|
||||||
|
// There was no update event for billing's leaf/endpoints, so length is 0
|
||||||
|
require.Len(t, snap.TerminatingGateway.ServiceGroups, 0)
|
||||||
|
require.Len(t, snap.TerminatingGateway.ServiceLeaves, 0)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
|
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
|
||||||
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
|
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ const (
|
||||||
Session string = "session"
|
Session string = "session"
|
||||||
Sentinel string = "sentinel"
|
Sentinel string = "sentinel"
|
||||||
Snapshot string = "snapshot"
|
Snapshot string = "snapshot"
|
||||||
|
TerminatingGateway string = "terminating_gateway"
|
||||||
TLSUtil string = "tlsutil"
|
TLSUtil string = "tlsutil"
|
||||||
Transaction string = "txn"
|
Transaction string = "txn"
|
||||||
WAN string = "wan"
|
WAN string = "wan"
|
||||||
|
|
Loading…
Reference in New Issue