diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index c16b100551..628b7d4e32 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -9,6 +9,7 @@ import ( // It is meant to be point-in-time coherent and is used to deliver the current // config state to observers who need it to be pushed in (e.g. XDS server). type ConfigSnapshot struct { + Kind structs.ServiceKind ProxyID string Address string Port int @@ -22,7 +23,12 @@ type ConfigSnapshot struct { // Valid returns whether or not the snapshot has all required fields filled yet. func (s *ConfigSnapshot) Valid() bool { - return s.Roots != nil && s.Leaf != nil + switch s.Kind { + case structs.ServiceKindConnectProxy: + return s.Roots != nil && s.Leaf != nil + default: + return false + } } // Clone makes a deep copy of the snapshot we can send to other goroutines diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 76e1b84e86..9f7c7352ee 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -38,6 +38,7 @@ type state struct { ctx context.Context cancel func() + kind structs.ServiceKind proxyID string address string port int @@ -72,6 +73,7 @@ func newState(ns *structs.NodeService, token string) (*state, error) { } return &state{ + kind: ns.Kind, proxyID: ns.ID, address: ns.Address, port: ns.Port, @@ -116,9 +118,19 @@ func (s *state) Close() error { return nil } -// initWatches sets up the watches needed based on current proxy registration -// state. +// initWatches sets up the watches needed for the particular service func (s *state) initWatches() error { + switch s.kind { + case structs.ServiceKindConnectProxy: + return s.initWatchesConnectProxy() + default: + return fmt.Errorf("Unsupported service kind") + } +} + +// initWatchesConnectProxy sets up the watches needed based on current proxy registration +// state. +func (s *state) initWatchesConnectProxy() error { // Watch for root changes err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ Datacenter: s.source.Datacenter, @@ -203,12 +215,18 @@ func (s *state) run() { defer close(s.snapCh) snap := ConfigSnapshot{ - ProxyID: s.proxyID, - Address: s.address, - Port: s.port, - Proxy: s.proxyCfg, - UpstreamEndpoints: make(map[string]structs.CheckServiceNodes), + Kind: s.kind, + ProxyID: s.proxyID, + Address: s.address, + Port: s.port, + Proxy: s.proxyCfg, } + + switch s.kind { + case structs.ServiceKindConnectProxy: + snap.UpstreamEndpoints = make(map[string]structs.CheckServiceNodes) + } + // This turns out to be really fiddly/painful by just using time.Timer.C // directly in the code below since you can't detect when a timer is stopped // vs waiting in order to know to reset it. So just use a chan to send @@ -282,6 +300,15 @@ func (s *state) run() { } func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error { + switch s.kind { + case structs.ServiceKindConnectProxy: + return s.handleUpdateConnectProxy(u, snap) + default: + return fmt.Errorf("Unsupported service kind") + } +} + +func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error { switch u.CorrelationID { case rootsWatchID: roots, ok := u.Result.(*structs.IndexedCARoots) @@ -340,7 +367,7 @@ func (s *state) Changed(ns *structs.NodeService, token string) bool { if ns == nil { return true } - return ns.Kind != structs.ServiceKindConnectProxy || + return ns.Kind != s.kind || s.proxyID != ns.ID || s.address != ns.Address || s.port != ns.Port || diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index 7202d04973..41f8cf2970 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -19,9 +19,23 @@ import ( "github.com/hashicorp/consul/agent/structs" ) +// clustersFromSnapshot returns the xDS API representation of the "clusters" in the snapshot. +func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { + if cfgSnap == nil { + return nil, errors.New("nil config given") + } + + switch cfgSnap.Kind { + case structs.ServiceKindConnectProxy: + return s.clustersFromSnapshotConnectProxy(cfgSnap, token) + default: + return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind) + } +} + // clustersFromSnapshot returns the xDS API representation of the "clusters" // (upstreams) in the snapshot. -func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { +func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { if cfgSnap == nil { return nil, errors.New("nil config given") } diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index dacc447e75..33319e24a8 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -2,6 +2,7 @@ package xds import ( "errors" + "fmt" envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" @@ -14,11 +15,25 @@ import ( ) // endpointsFromSnapshot returns the xDS API representation of the "endpoints" -// (upstream instances) in the snapshot. func endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { if cfgSnap == nil { return nil, errors.New("nil config given") } + + switch cfgSnap.Kind { + case structs.ServiceKindConnectProxy: + return endpointsFromSnapshotConnectProxy(cfgSnap, token) + default: + return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind) + } +} + +// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints" +// (upstream instances) in the snapshot. +func endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { + if cfgSnap == nil { + return nil, errors.New("nil config given") + } resources := make([]proto.Message, 0, len(cfgSnap.UpstreamEndpoints)) for id, endpoints := range cfgSnap.UpstreamEndpoints { la := makeLoadAssignment(id, endpoints) diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 6c4064b900..0f92fa938e 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -24,13 +24,27 @@ import ( "github.com/hashicorp/consul/agent/structs" ) -// listenersFromSnapshot returns the xDS API representation of the "listeners" -// in the snapshot. +// listenersFromSnapshot returns the xDS API representation of the "listeners" in the snapshot. func (s *Server) listenersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { if cfgSnap == nil { return nil, errors.New("nil config given") } + switch cfgSnap.Kind { + case structs.ServiceKindConnectProxy: + return s.listenersFromSnapshotConnectProxy(cfgSnap, token) + default: + return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind) + } +} + +// listenersFromSnapshotConnectProxy returns the xDS API representation of the "listeners" +// in the snapshot. +func (s *Server) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { + if cfgSnap == nil { + return nil, errors.New("nil config given") + } + // One listener for each upstream plus the public one resources := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1) diff --git a/agent/xds/server.go b/agent/xds/server.go index 609ed3d851..5fae83108c 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -235,8 +235,13 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) return err } - if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) { - return status.Errorf(codes.PermissionDenied, "permission denied") + switch cfgSnap.Kind { + case structs.ServiceKindConnectProxy: + if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) { + return status.Errorf(codes.PermissionDenied, "permission denied") + } + default: + return status.Errorf(codes.Internal, "Invalid service kind") } // Authed OK!