From 4bdb690a257e46f7e925ef829f2b2c7a6c5663e3 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Mon, 1 Jul 2019 22:10:51 -0500 Subject: [PATCH] activate most discovery chain features in xDS for envoy (#6024) --- agent/agent.go | 9 + agent/cache-types/discovery_chain.go | 52 ++++ agent/consul/config_endpoint.go | 51 ++++ agent/consul/discoverychain/compile.go | 6 + agent/proxycfg/manager.go | 2 +- agent/proxycfg/manager_test.go | 52 +++- agent/proxycfg/snapshot.go | 12 +- agent/proxycfg/state.go | 270 ++++++++++++++--- agent/proxycfg/testing.go | 28 +- agent/structs/config_entry_discoverychain.go | 49 ++++ agent/structs/discovery_chain.go | 4 +- agent/xds/clusters.go | 116 +++++++- agent/xds/endpoints.go | 219 ++++++++++---- agent/xds/endpoints_test.go | 27 +- agent/xds/listeners.go | 120 ++++++-- agent/xds/response.go | 4 + agent/xds/routes.go | 288 ++++++++++++++++++- test/integration/connect/envoy/.gitignore | 1 + test/integration/connect/envoy/helpers.bash | 13 + 19 files changed, 1179 insertions(+), 144 deletions(-) create mode 100644 agent/cache-types/discovery_chain.go create mode 100644 test/integration/connect/envoy/.gitignore diff --git a/agent/agent.go b/agent/agent.go index 86f0e6c5c8..0199f5cba9 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -3945,6 +3945,15 @@ func (a *Agent) registerCache() { RefreshTimer: 0 * time.Second, RefreshTimeout: 10 * time.Minute, }) + + a.cache.RegisterType(cachetype.CompiledDiscoveryChainName, &cachetype.CompiledDiscoveryChain{ + RPC: a, + }, &cache.RegisterOptions{ + // Maintain a blocking query, retry dropped connections quickly + Refresh: true, + RefreshTimer: 0 * time.Second, + RefreshTimeout: 10 * time.Minute, + }) } // defaultProxyCommand returns the default Connect managed proxy command. diff --git a/agent/cache-types/discovery_chain.go b/agent/cache-types/discovery_chain.go new file mode 100644 index 0000000000..41d5633a32 --- /dev/null +++ b/agent/cache-types/discovery_chain.go @@ -0,0 +1,52 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// Recommended name for registration. +const CompiledDiscoveryChainName = "compiled-discovery-chain" + +// CompiledDiscoveryChain supports fetching the complete discovery chain for a +// service and caching its compilation. +type CompiledDiscoveryChain struct { + RPC RPC +} + +func (c *CompiledDiscoveryChain) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a DiscoveryChainRequest. + reqReal, ok := req.(*structs.DiscoveryChainRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Always allow stale - there's no point in hitting leader if the request is + // going to be served from cache and endup arbitrarily stale anyway. This + // allows cached compiled-discovery-chain to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply structs.DiscoveryChainResponse + if err := c.RPC.RPC("ConfigEntry.ReadDiscoveryChain", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} + +func (c *CompiledDiscoveryChain) SupportsBlocking() bool { + return true +} diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index d3ab49d67b..680c16535d 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -6,6 +6,7 @@ import ( metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" @@ -312,3 +313,53 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r return nil }) } + +func (c *ConfigEntry) ReadDiscoveryChain(args *structs.DiscoveryChainRequest, reply *structs.DiscoveryChainResponse) error { + if done, err := c.srv.forward("ConfigEntry.ReadDiscoveryChain", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"config_entry", "read_discovery_chain"}, time.Now()) + + // Fetch the ACL token, if any. + rule, err := c.srv.ResolveToken(args.Token) + if err != nil { + return err + } + if rule != nil && !rule.ServiceRead(args.Name) { + return acl.ErrPermissionDenied + } + + if args.Name == "" { + return fmt.Errorf("Must provide service name") + } + + const currentNamespace = "default" + + return c.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name) + if err != nil { + return err + } + + // Then we compile it into something useful. + chain, err := discoverychain.Compile(discoverychain.CompileRequest{ + ServiceName: args.Name, + CurrentNamespace: currentNamespace, + CurrentDatacenter: c.srv.config.Datacenter, + InferDefaults: true, + Entries: entries, + }) + if err != nil { + return err + } + + reply.Index = index + reply.ConfigEntries = entries + reply.Chain = chain + + return nil + }) +} diff --git a/agent/consul/discoverychain/compile.go b/agent/consul/discoverychain/compile.go index 462c356e37..11fe5f1f26 100644 --- a/agent/consul/discoverychain/compile.go +++ b/agent/consul/discoverychain/compile.go @@ -519,6 +519,12 @@ RESOLVE_AGAIN: } groupResolver := groupResolverNode.GroupResolver + // Digest mesh gateway settings. + if serviceDefault := c.entries.GetService(resolver.Name); serviceDefault != nil { + groupResolver.MeshGateway = serviceDefault.MeshGateway + } + // TODO(rb): thread proxy-defaults version through here as well + // Retain this target even if we may not retain the group resolver. c.targets[target] = struct{}{} diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index a967b91ef2..539743a63c 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -168,7 +168,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string return nil } - // We are updating the proxy, close it's old state + // We are updating the proxy, close its old state state.Close() } diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 9722cd8c9b..90a10c92f1 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -47,6 +47,42 @@ func TestManager_BasicLifecycle(t *testing.T) { roots, leaf := TestCerts(t) + // Initialize a default group resolver for "db" + dbResolverEntry := &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "db", + } + dbTarget := structs.DiscoveryTarget{ + Service: "db", + Namespace: "default", + Datacenter: "dc1", + } + dbResolverNode := &structs.DiscoveryGraphNode{ + Type: structs.DiscoveryGraphNodeTypeGroupResolver, + Name: "db", + GroupResolver: &structs.DiscoveryGroupResolver{ + Definition: dbResolverEntry, + Default: true, + Target: dbTarget, + }, + } + dbChain := &structs.CompiledDiscoveryChain{ + ServiceName: "db", + Namespace: "default", + Datacenter: "dc1", + Protocol: "tcp", + Node: dbResolverNode, + GroupResolverNodes: map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode{ + dbTarget: dbResolverNode, + }, + Resolvers: map[string]*structs.ServiceResolverConfigEntry{ + "db": dbResolverEntry, + }, + Targets: []structs.DiscoveryTarget{ + dbTarget, + }, + } + // Setup initial values types.roots.value.Store(roots) types.leaf.value.Store(leaf) @@ -55,6 +91,11 @@ func TestManager_BasicLifecycle(t *testing.T) { &structs.IndexedCheckServiceNodes{ Nodes: TestUpstreamNodes(t), }) + types.compiledChain.value.Store( + &structs.DiscoveryChainResponse{ + Chain: dbChain, + }, + ) logger := log.New(os.Stderr, "", log.LstdFlags) state := local.NewState(local.Config{}, logger, &token.Store{}) @@ -116,9 +157,16 @@ func TestManager_BasicLifecycle(t *testing.T) { Roots: roots, ConnectProxy: configSnapshotConnectProxy{ Leaf: leaf, - UpstreamEndpoints: map[string]structs.CheckServiceNodes{ - "db": TestUpstreamNodes(t), + DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ + "db": dbChain, }, + WatchedUpstreams: nil, // Clone() clears this out + WatchedUpstreamEndpoints: map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes{ + "db": { + dbTarget: TestUpstreamNodes(t), + }, + }, + UpstreamEndpoints: map[string]structs.CheckServiceNodes{}, }, Datacenter: "dc1", } diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index fd0853874d..0e2b0dff50 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -8,8 +8,11 @@ import ( ) type configSnapshotConnectProxy struct { - Leaf *structs.IssuedCert - UpstreamEndpoints map[string]structs.CheckServiceNodes + Leaf *structs.IssuedCert + DiscoveryChain map[string]*structs.CompiledDiscoveryChain // this is keyed by the Upstream.Identifier(), not the chain name + WatchedUpstreams map[string]map[structs.DiscoveryTarget]context.CancelFunc + WatchedUpstreamEndpoints map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes + UpstreamEndpoints map[string]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints } type configSnapshotMeshGateway struct { @@ -46,6 +49,7 @@ type ConfigSnapshot struct { func (s *ConfigSnapshot) Valid() bool { switch s.Kind { case structs.ServiceKindConnectProxy: + // TODO(rb): sanity check discovery chain things here? return s.Roots != nil && s.ConnectProxy.Leaf != nil case structs.ServiceKindMeshGateway: // TODO (mesh-gateway) - what happens if all the connect services go away @@ -65,9 +69,11 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) { snap := snapCopy.(*ConfigSnapshot) + // nil these out as anything receiving one of these clones does not need them and should never "cancel" our watches switch s.Kind { + case structs.ServiceKindConnectProxy: + snap.ConnectProxy.WatchedUpstreams = nil case structs.ServiceKindMeshGateway: - // nil these out as anything receiving one of these clones does not need them and should never "cancel" our watches snap.MeshGateway.WatchedDatacenters = nil snap.MeshGateway.WatchedServices = nil } diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 32819226c6..0ae2fd7335 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -59,7 +59,7 @@ type state struct { // goroutine later without reasoning about races with the NodeService passed // (especially for embedded fields like maps and slices). // -// The returned state needs it's required dependencies to be set before Watch +// The returned state needs its required dependencies to be set before Watch // can be called. func newState(ns *structs.NodeService, token string) (*state, error) { if ns.Kind != structs.ServiceKindConnectProxy && ns.Kind != structs.ServiceKindMeshGateway { @@ -141,17 +141,17 @@ func (s *state) initWatches() error { } } -func (s *state) watchConnectProxyService(correlationId string, service string, dc string, filter string, meshGatewayMode structs.MeshGatewayMode) error { +func (s *state) watchConnectProxyService(ctx context.Context, correlationId string, service string, dc string, filter string, meshGatewayMode structs.MeshGatewayMode) error { switch meshGatewayMode { case structs.MeshGatewayModeRemote: - return s.cache.Notify(s.ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{ + return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{ Datacenter: dc, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceKind: structs.ServiceKindMeshGateway, UseServiceKind: true, }, correlationId, s.ch) case structs.MeshGatewayModeLocal: - return s.cache.Notify(s.ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{ + return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceKind: structs.ServiceKindMeshGateway, @@ -159,7 +159,7 @@ func (s *state) watchConnectProxyService(correlationId string, service string, d }, correlationId, s.ch) default: // This includes both the None and Default modes on purpose - return s.cache.Notify(s.ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ + return s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ Datacenter: dc, QueryOptions: structs.QueryOptions{ Token: s.token, @@ -218,6 +218,7 @@ func (s *state) initWatchesConnectProxy() error { for _, u := range s.proxyCfg.Upstreams { dc := s.source.Datacenter if u.Datacenter != "" { + // TODO(rb): if we ASK for a specific datacenter, do we still use the chain? dc = u.Datacenter } @@ -232,15 +233,47 @@ func (s *state) initWatchesConnectProxy() error { case structs.UpstreamDestTypeService: fallthrough case "": // Treat unset as the default Service type - meshGateway := structs.MeshGatewayModeNone - // TODO (mesh-gateway)- maybe allow using a gateway within a datacenter at some point + // Determine if this should use a discovery chain. + // + // TODO(rb): reduce this list of exceptions + var shouldUseDiscoveryChain bool if dc != s.source.Datacenter { - meshGateway = u.MeshGateway.Mode + shouldUseDiscoveryChain = false + } else if u.DestinationNamespace != "" && u.DestinationNamespace != "default" { + shouldUseDiscoveryChain = false + } else { + shouldUseDiscoveryChain = true } - if err := s.watchConnectProxyService("upstream:"+serviceIDPrefix+u.Identifier(), u.DestinationName, dc, "", meshGateway); err != nil { - return err + if shouldUseDiscoveryChain { + // Watch for discovery chain configuration updates + err = s.cache.Notify(s.ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{ + Datacenter: dc, + QueryOptions: structs.QueryOptions{Token: s.token}, + Name: u.DestinationName, + }, "discovery-chain:"+u.Identifier(), s.ch) + if err != nil { + return err + } + } else { + meshGateway := structs.MeshGatewayModeNone + + // TODO (mesh-gateway)- maybe allow using a gateway within a datacenter at some point + if dc != s.source.Datacenter { + meshGateway = u.MeshGateway.Mode + } + + if err := s.watchConnectProxyService( + s.ctx, + "upstream:"+serviceIDPrefix+u.Identifier(), + u.DestinationName, + dc, + "", + meshGateway, + ); err != nil { + return err + } } default: @@ -307,7 +340,10 @@ func (s *state) run() { switch s.kind { case structs.ServiceKindConnectProxy: - snap.ConnectProxy.UpstreamEndpoints = make(map[string]structs.CheckServiceNodes) + snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain) + snap.ConnectProxy.WatchedUpstreams = make(map[string]map[structs.DiscoveryTarget]context.CancelFunc) + snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes) + snap.ConnectProxy.UpstreamEndpoints = make(map[string]structs.CheckServiceNodes) // TODO(rb): deprecated case structs.ServiceKindMeshGateway: snap.MeshGateway.WatchedServices = make(map[string]context.CancelFunc) snap.MeshGateway.WatchedDatacenters = make(map[string]context.CancelFunc) @@ -400,44 +436,212 @@ func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error { } func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error { - switch u.CorrelationID { - case rootsWatchID: + switch { + case u.CorrelationID == rootsWatchID: roots, ok := u.Result.(*structs.IndexedCARoots) if !ok { return fmt.Errorf("invalid type for roots response: %T", u.Result) } snap.Roots = roots - case leafWatchID: + + case u.CorrelationID == leafWatchID: leaf, ok := u.Result.(*structs.IssuedCert) if !ok { return fmt.Errorf("invalid type for leaf response: %T", u.Result) } snap.ConnectProxy.Leaf = leaf - case intentionsWatchID: + + case u.CorrelationID == intentionsWatchID: // Not in snapshot currently, no op + + case strings.HasPrefix(u.CorrelationID, "discovery-chain:"): + resp, ok := u.Result.(*structs.DiscoveryChainResponse) + if !ok { + return fmt.Errorf("invalid type for service response: %T", u.Result) + } + svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:") + snap.ConnectProxy.DiscoveryChain[svc] = resp.Chain + + if err := s.resetWatchesFromChain(svc, resp.Chain, snap); err != nil { + return err + } + + case strings.HasPrefix(u.CorrelationID, "upstream-target:"): + resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) + if !ok { + return fmt.Errorf("invalid type for service response: %T", u.Result) + } + correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:") + encTarget, svc, ok := removeColonPrefix(correlationID) + if !ok { + return fmt.Errorf("invalid correlation id %q", u.CorrelationID) + } + + target := structs.DiscoveryTarget{} + if err := target.UnmarshalText([]byte(encTarget)); err != nil { + return fmt.Errorf("invalid correlation id %q: %v", u.CorrelationID, err) + } + + // TODO(rb): do we have to do onlypassing filters here? + + m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc] + if !ok { + m = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes) + snap.ConnectProxy.WatchedUpstreamEndpoints[svc] = m + } + snap.ConnectProxy.WatchedUpstreamEndpoints[svc][target] = resp.Nodes + + case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix): + resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) + if !ok { + return fmt.Errorf("invalid type for service response: %T", u.Result) + } + svc := strings.TrimPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix) + snap.ConnectProxy.UpstreamEndpoints[svc] = resp.Nodes + + case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix): + resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse) + if !ok { + return fmt.Errorf("invalid type for prepared query response: %T", u.Result) + } + pq := strings.TrimPrefix(u.CorrelationID, "upstream:") + snap.ConnectProxy.UpstreamEndpoints[pq] = resp.Nodes + default: - // Service discovery result, figure out which type - switch { - case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix): - resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) - if !ok { - return fmt.Errorf("invalid type for service response: %T", u.Result) - } - svc := strings.TrimPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix) - snap.ConnectProxy.UpstreamEndpoints[svc] = resp.Nodes + return errors.New("unknown correlation ID") + } + return nil +} - case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix): - resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse) - if !ok { - return fmt.Errorf("invalid type for prepared query response: %T", u.Result) - } - pq := strings.TrimPrefix(u.CorrelationID, "upstream:") - snap.ConnectProxy.UpstreamEndpoints[pq] = resp.Nodes +func removeColonPrefix(s string) (string, string, bool) { + idx := strings.Index(s, ":") + if idx == -1 { + return "", "", false + } + return s[0:idx], s[idx+1:], true +} - default: - return errors.New("unknown correlation ID") +func (s *state) resetWatchesFromChain( + id string, + chain *structs.CompiledDiscoveryChain, + snap *ConfigSnapshot, +) error { + if chain == nil { + return fmt.Errorf("not possible to arrive here with no discovery chain") + } + + // Collect all sorts of catalog queries we'll have to run. + targets := make(map[structs.DiscoveryTarget]*structs.ServiceResolverConfigEntry) + addTarget := func(target structs.DiscoveryTarget) error { + resolver, ok := chain.Resolvers[target.Service] + if !ok { + return fmt.Errorf("missing resolver %q for target %s", target.Service, target) + } + + targets[target] = resolver + return nil + } + + // NOTE: We will NEVER see a missing chain, because we always request it with defaulting enabled. + meshGatewayModes := make(map[structs.DiscoveryTarget]structs.MeshGatewayMode) + for _, group := range chain.GroupResolverNodes { + groupResolver := group.GroupResolver + + meshGatewayModes[groupResolver.Target] = groupResolver.MeshGateway.Mode + + if err := addTarget(groupResolver.Target); err != nil { + return err + } + if groupResolver.Failover != nil { + for _, target := range groupResolver.Failover.Targets { + if err := addTarget(target); err != nil { + return err + } + } } } + + // Initialize relevant sub maps. + if _, ok := snap.ConnectProxy.WatchedUpstreams[id]; !ok { + snap.ConnectProxy.WatchedUpstreams[id] = make(map[structs.DiscoveryTarget]context.CancelFunc) + } + if _, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[id]; !ok { + // TODO(rb): does this belong here? + snap.ConnectProxy.WatchedUpstreamEndpoints[id] = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes) + } + + // We could invalidate this selectively based on a hash of the relevant + // resolver information, but for now just reset anything about this + // upstream when the chain changes in any way. + // + // TODO(rb): content hash based add/remove + for target, cancelFn := range snap.ConnectProxy.WatchedUpstreams[id] { + s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: stopping watch of target %s", id, chain.ServiceName, target) + delete(snap.ConnectProxy.WatchedUpstreams[id], target) + delete(snap.ConnectProxy.WatchedUpstreamEndpoints[id], target) // TODO(rb): safe? + cancelFn() + } + + for target, resolver := range targets { + if target.Service != resolver.Name { + panic(target.Service + " != " + resolver.Name) // TODO(rb): remove + } + s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target) + + // snap.WatchedUpstreams[name] + + // delete(snap.WatchedUpstreams[name], target) + // delete(snap.WatchedUpstreamEndpoint[name], target) + + // TODO(rb): augment the health rpc so we can get the health information to pass to envoy directly + + // TODO(rb): make sure the cross-dc request properly fills in the alternate datacenters + + // TODO(rb): handle subset.onlypassing + var subset structs.ServiceResolverSubset + if target.ServiceSubset != "" { + var ok bool + subset, ok = resolver.Subsets[target.ServiceSubset] + if !ok { + // Not possible really. + return fmt.Errorf("target %s cannot be resolved; service %q does not have a subset named %q", target, target.Service, target.ServiceSubset) + } + } + + encodedTarget, err := target.MarshalText() + if err != nil { + return fmt.Errorf("target %s cannot be converted into a cache key string: %v", target, err) + } + + ctx, cancel := context.WithCancel(s.ctx) + + meshGateway := structs.MeshGatewayModeNone + if target.Datacenter != s.source.Datacenter { + meshGateway = meshGatewayModes[target] + if meshGateway == structs.MeshGatewayModeDefault { + meshGateway = structs.MeshGatewayModeNone + } + } else { + meshGateway = structs.MeshGatewayModeNone + } + + // TODO(rb): update the health endpoint to allow returning even unhealthy endpoints + err = s.watchConnectProxyService( + ctx, + "upstream-target:"+string(encodedTarget)+":"+id, + target.Service, + target.Datacenter, + subset.Filter, + meshGateway, + ) + if err != nil { + cancel() + return err + } + + snap.ConnectProxy.WatchedUpstreams[id][target] = cancel + } + return nil } diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index c8b9e33aba..2fe6e6b006 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -17,11 +17,12 @@ import ( // TestCacheTypes encapsulates all the different cache types proxycfg.State will // watch/request for controlling one during testing. type TestCacheTypes struct { - roots *ControllableCacheType - leaf *ControllableCacheType - intentions *ControllableCacheType - health *ControllableCacheType - query *ControllableCacheType + roots *ControllableCacheType + leaf *ControllableCacheType + intentions *ControllableCacheType + health *ControllableCacheType + query *ControllableCacheType + compiledChain *ControllableCacheType } // NewTestCacheTypes creates a set of ControllableCacheTypes for all types that @@ -29,11 +30,12 @@ type TestCacheTypes struct { func NewTestCacheTypes(t testing.T) *TestCacheTypes { t.Helper() ct := &TestCacheTypes{ - roots: NewControllableCacheType(t), - leaf: NewControllableCacheType(t), - intentions: NewControllableCacheType(t), - health: NewControllableCacheType(t), - query: NewControllableCacheType(t), + roots: NewControllableCacheType(t), + leaf: NewControllableCacheType(t), + intentions: NewControllableCacheType(t), + health: NewControllableCacheType(t), + query: NewControllableCacheType(t), + compiledChain: NewControllableCacheType(t), } ct.query.blocking = false return ct @@ -66,6 +68,12 @@ func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache { c.RegisterType(cachetype.PreparedQueryName, types.query, &cache.RegisterOptions{ Refresh: false, }) + c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain, &cache.RegisterOptions{ + Refresh: true, + RefreshTimer: 0, + RefreshTimeout: 10 * time.Minute, + }) + return c } diff --git a/agent/structs/config_entry_discoverychain.go b/agent/structs/config_entry_discoverychain.go index 0d7174aeb9..d9bae99353 100644 --- a/agent/structs/config_entry_discoverychain.go +++ b/agent/structs/config_entry_discoverychain.go @@ -4,9 +4,12 @@ import ( "fmt" "math" "sort" + "strconv" "time" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" + "github.com/mitchellh/hashstructure" ) // ServiceRouterConfigEntry defines L7 (e.g. http) routing rules for a named @@ -917,6 +920,52 @@ func (e *DiscoveryChainConfigEntries) IsChainEmpty() bool { return len(e.Routers) == 0 && len(e.Splitters) == 0 && len(e.Resolvers) == 0 } +// DiscoveryChainRequest is used when requesting the discovery chain for a +// service. +type DiscoveryChainRequest struct { + Name string + Datacenter string + // Source QuerySource + + QueryOptions +} + +func (r *DiscoveryChainRequest) RequestDatacenter() string { + return r.Datacenter +} + +func (r *DiscoveryChainRequest) CacheInfo() cache.RequestInfo { + info := cache.RequestInfo{ + Token: r.Token, + Datacenter: r.Datacenter, + MinIndex: r.MinQueryIndex, + Timeout: r.MaxQueryTime, + MaxAge: r.MaxAge, + MustRevalidate: r.MustRevalidate, + } + + v, err := hashstructure.Hash(struct { + Name string + }{ + Name: r.Name, + }, nil) + if err == nil { + // If there is an error, we don't set the key. A blank key forces + // no cache for this request so the request is forwarded directly + // to the server. + info.Key = strconv.FormatUint(v, 10) + } + + return info +} + +// TODO(rb): either fix the compiled results, or take the derived data and stash it here in a json/msgpack-friendly way? +type DiscoveryChainResponse struct { + ConfigEntries *DiscoveryChainConfigEntries `json:",omitempty"` // TODO(rb): remove these? + Chain *CompiledDiscoveryChain `json:",omitempty"` + QueryMeta +} + type ConfigEntryGraphError struct { // one of Message or Err should be set Message string diff --git a/agent/structs/discovery_chain.go b/agent/structs/discovery_chain.go index 3b36307e43..f6ee92d947 100644 --- a/agent/structs/discovery_chain.go +++ b/agent/structs/discovery_chain.go @@ -73,6 +73,7 @@ type DiscoveryGroupResolver struct { Definition *ServiceResolverConfigEntry `json:",omitempty"` Default bool `json:",omitempty"` ConnectTimeout time.Duration `json:",omitempty"` + MeshGateway MeshGatewayConfig `json:",omitempty"` Target DiscoveryTarget `json:",omitempty"` Failover *DiscoveryFailover `json:",omitempty"` } @@ -90,6 +91,7 @@ type DiscoverySplit struct { } // compiled form of ServiceResolverFailover +// TODO(rb): figure out how to get mesh gateways in here type DiscoveryFailover struct { Definition *ServiceResolverFailover `json:",omitempty"` Targets []DiscoveryTarget `json:",omitempty"` @@ -212,7 +214,7 @@ func (t DiscoveryTarget) String() string { if t.Namespace != "" { b.WriteString(t.Namespace) } else { - b.WriteString("default") + b.WriteString("") } b.WriteRune('.') diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index 2b78554f37..df203e2004 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -11,6 +11,7 @@ import ( envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster" envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + envoytype "github.com/envoyproxy/go-control-plane/envoy/type" "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" @@ -38,19 +39,41 @@ func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token st // clustersFromSnapshot returns the xDS API representation of the "clusters" // (upstreams) in the snapshot. func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { - // Include the "app" cluster for the public listener + // TODO(rb): this sizing is a low bound. clusters := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1) - var err error - clusters[0], err = s.makeAppCluster(cfgSnap) + // Include the "app" cluster for the public listener + appCluster, err := s.makeAppCluster(cfgSnap) if err != nil { return nil, err } - for idx, upstream := range cfgSnap.Proxy.Upstreams { - clusters[idx+1], err = s.makeUpstreamCluster(upstream, cfgSnap) - if err != nil { - return nil, err + clusters = append(clusters, appCluster) + + for _, u := range cfgSnap.Proxy.Upstreams { + id := u.Identifier() + var chain *structs.CompiledDiscoveryChain + if u.DestinationType != structs.UpstreamDestTypePreparedQuery { + chain = cfgSnap.ConnectProxy.DiscoveryChain[id] + } + + if chain == nil || chain.IsDefault() { + // Either old-school upstream or prepared query. + upstreamCluster, err := s.makeUpstreamCluster(u, cfgSnap) + if err != nil { + return nil, err + } + clusters = append(clusters, upstreamCluster) + + } else { + upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, chain, cfgSnap) + if err != nil { + return nil, err + } + + for _, cluster := range upstreamClusters { + clusters = append(clusters, cluster) + } } } @@ -197,6 +220,85 @@ func (s *Server) makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycf return c, nil } +func (s *Server) makeUpstreamClustersForDiscoveryChain( + upstreamID string, + chain *structs.CompiledDiscoveryChain, + cfgSnap *proxycfg.ConfigSnapshot, +) ([]*envoy.Cluster, error) { + if chain == nil { + panic("chain must be provided") + } + + // TODO(rb): make escape hatches work with chains + + var out []*envoy.Cluster + for target, node := range chain.GroupResolverNodes { + groupResolver := node.GroupResolver + // TODO(rb): failover + // Failover *DiscoveryFailover `json:",omitempty"` // sad path + + clusterName := makeClusterName(upstreamID, target, cfgSnap.Datacenter) + c := &envoy.Cluster{ + Name: clusterName, + AltStatName: clusterName, // TODO(rb): change this? + ConnectTimeout: groupResolver.ConnectTimeout, + ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_EDS}, + CommonLbConfig: &envoy.Cluster_CommonLbConfig{ + HealthyPanicThreshold: &envoytype.Percent{ + Value: 0, // disable panic threshold + }, + }, + // TODO(rb): adjust load assignment + EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{ + EdsConfig: &envoycore.ConfigSource{ + ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{ + Ads: &envoycore.AggregatedConfigSource{}, + }, + }, + }, + // Having an empty config enables outlier detection with default config. + OutlierDetection: &envoycluster.OutlierDetection{}, + } + if chain.Protocol == "http2" || chain.Protocol == "grpc" { + c.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{} + } + + // Enable TLS upstream with the configured client certificate. + c.TlsContext = &envoyauth.UpstreamTlsContext{ + CommonTlsContext: makeCommonTLSContext(cfgSnap), + } + + out = append(out, c) + } + + return out, nil +} + +// makeClusterName returns a string representation that uniquely identifies the +// cluster in a canonical but human readable way. +func makeClusterName(upstreamID string, target structs.DiscoveryTarget, currentDatacenter string) string { + var name string + if target.ServiceSubset != "" { + name = target.Service + "/" + target.ServiceSubset + } else { + name = target.Service + } + + if target.Namespace != "" && target.Namespace != "default" { + name = target.Namespace + "/" + name + } + if target.Datacenter != "" && target.Datacenter != currentDatacenter { + name += "?dc=" + target.Datacenter + } + + if upstreamID == target.Service { + // In the common case don't stutter. + return name + } + + return upstreamID + "//" + name +} + // makeClusterFromUserConfig returns the listener config decoded from an // arbitrary proto3 json format string or an error if it's invalid. // diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 14c2985970..9250a9061f 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -33,11 +33,94 @@ func (s *Server) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token s // endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints" // (upstream instances) in the snapshot. func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { + // TODO(rb): this sizing is a low bound. resources := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.UpstreamEndpoints)) - for id, endpoints := range cfgSnap.ConnectProxy.UpstreamEndpoints { - la := makeLoadAssignment(id, endpoints, cfgSnap.Datacenter) - resources = append(resources, la) + + // TODO(rb): should naming from 1.5 -> 1.6 for clusters remain unchanged? + + for _, u := range cfgSnap.Proxy.Upstreams { + id := u.Identifier() + + var chain *structs.CompiledDiscoveryChain + if u.DestinationType != structs.UpstreamDestTypePreparedQuery { + chain = cfgSnap.ConnectProxy.DiscoveryChain[id] + } + + if chain == nil { + // We ONLY want this branch for prepared queries. + + endpoints, ok := cfgSnap.ConnectProxy.UpstreamEndpoints[id] + if ok { + la := makeLoadAssignment( + id, + 0, + []structs.CheckServiceNodes{endpoints}, + cfgSnap.Datacenter, + ) + resources = append(resources, la) + } + + } else { + // Newfangled discovery chain plumbing. + + chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id] + if !ok { + continue // TODO(rb): whaaaa? + } + + for target, node := range chain.GroupResolverNodes { + groupResolver := node.GroupResolver + failover := groupResolver.Failover + + endpoints, ok := chainEndpointMap[target] + if !ok { + continue // TODO(rb): whaaaa? + } + + var ( + priorityEndpoints []structs.CheckServiceNodes + overprovisioningFactor int + ) + + if failover != nil && len(failover.Targets) > 0 { + priorityEndpoints = make([]structs.CheckServiceNodes, 0, len(failover.Targets)+1) + + priorityEndpoints = append(priorityEndpoints, endpoints) + + if failover.Definition.OverprovisioningFactor > 0 { + overprovisioningFactor = failover.Definition.OverprovisioningFactor + } + if overprovisioningFactor <= 0 { + // We choose such a large value here that the failover math should + // in effect not happen until zero instances are healthy. + overprovisioningFactor = 100000 + } + + for _, failTarget := range failover.Targets { + failEndpoints, ok := chainEndpointMap[failTarget] + if ok { + priorityEndpoints = append(priorityEndpoints, failEndpoints) + } + } + } else { + priorityEndpoints = []structs.CheckServiceNodes{ + endpoints, + } + } + + clusterName := makeClusterName(id, target, cfgSnap.Datacenter) + + la := makeLoadAssignment( + clusterName, + overprovisioningFactor, + priorityEndpoints, + cfgSnap.Datacenter, + ) + resources = append(resources, la) + } + } } + return resources, nil } @@ -47,14 +130,28 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh // generate the endpoints for the gateways in the remote datacenters for dc, endpoints := range cfgSnap.MeshGateway.GatewayGroups { clusterName := DatacenterSNI(dc, cfgSnap) - la := makeLoadAssignment(clusterName, endpoints, cfgSnap.Datacenter) + la := makeLoadAssignment( + clusterName, + 0, + []structs.CheckServiceNodes{ + endpoints, + }, + cfgSnap.Datacenter, + ) resources = append(resources, la) } // generate the endpoints for the local service groups for svc, endpoints := range cfgSnap.MeshGateway.ServiceGroups { clusterName := ServiceSNI(svc, "default", cfgSnap.Datacenter, cfgSnap) - la := makeLoadAssignment(clusterName, endpoints, cfgSnap.Datacenter) + la := makeLoadAssignment( + clusterName, + 0, + []structs.CheckServiceNodes{ + endpoints, + }, + cfgSnap.Datacenter, + ) resources = append(resources, la) } @@ -63,56 +160,78 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint { return envoyendpoint.LbEndpoint{ - HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{Endpoint: &envoyendpoint.Endpoint{ - Address: makeAddressPtr(host, port), + HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{ + Endpoint: &envoyendpoint.Endpoint{ + Address: makeAddressPtr(host, port), + }, }, - }} + } } -func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes, localDatacenter string) *envoy.ClusterLoadAssignment { - es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints)) - for _, ep := range endpoints { - // TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc? - addr, port := ep.BestAddress(localDatacenter != ep.Node.Datacenter) - healthStatus := envoycore.HealthStatus_HEALTHY - weight := 1 - if ep.Service.Weights != nil { - weight = ep.Service.Weights.Passing +func makeLoadAssignment( + clusterName string, + overprovisioningFactor int, + priorityEndpoints []structs.CheckServiceNodes, + localDatacenter string, +) *envoy.ClusterLoadAssignment { + cla := &envoy.ClusterLoadAssignment{ + ClusterName: clusterName, + Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(priorityEndpoints)), + } + if overprovisioningFactor > 0 { + cla.Policy = &envoy.ClusterLoadAssignment_Policy{ + OverprovisioningFactor: makeUint32Value(overprovisioningFactor), + } + } + + for priority, endpoints := range priorityEndpoints { + es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints)) + + for _, ep := range endpoints { + // TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc? + addr, port := ep.BestAddress(localDatacenter != ep.Node.Datacenter) + healthStatus := envoycore.HealthStatus_HEALTHY + weight := 1 + if ep.Service.Weights != nil { + weight = ep.Service.Weights.Passing + } + + for _, chk := range ep.Checks { + if chk.Status == api.HealthCritical { + // This can't actually happen now because health always filters critical + // but in the future it may not so set this correctly! + healthStatus = envoycore.HealthStatus_UNHEALTHY + } + if chk.Status == api.HealthWarning && ep.Service.Weights != nil { + weight = ep.Service.Weights.Warning + } + } + // Make weights fit Envoy's limits. A zero weight means that either Warning + // (likely) or Passing (weirdly) weight has been set to 0 effectively making + // this instance unhealthy and should not be sent traffic. + if weight < 1 { + healthStatus = envoycore.HealthStatus_UNHEALTHY + weight = 1 + } + if weight > 128 { + weight = 128 + } + es = append(es, envoyendpoint.LbEndpoint{ + HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{ + Endpoint: &envoyendpoint.Endpoint{ + Address: makeAddressPtr(addr, port), + }, + }, + HealthStatus: healthStatus, + LoadBalancingWeight: makeUint32Value(weight), + }) } - for _, chk := range ep.Checks { - if chk.Status == api.HealthCritical { - // This can't actually happen now because health always filters critical - // but in the future it may not so set this correctly! - healthStatus = envoycore.HealthStatus_UNHEALTHY - } - if chk.Status == api.HealthWarning && ep.Service.Weights != nil { - weight = ep.Service.Weights.Warning - } - } - // Make weights fit Envoy's limits. A zero weight means that either Warning - // (likely) or Passing (weirdly) weight has been set to 0 effectively making - // this instance unhealthy and should not be sent traffic. - if weight < 1 { - healthStatus = envoycore.HealthStatus_UNHEALTHY - weight = 1 - } - if weight > 128 { - weight = 128 - } - es = append(es, envoyendpoint.LbEndpoint{ - HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{ - Endpoint: &envoyendpoint.Endpoint{ - Address: makeAddressPtr(addr, port), - }}, - HealthStatus: healthStatus, - LoadBalancingWeight: makeUint32Value(weight), + cla.Endpoints = append(cla.Endpoints, envoyendpoint.LocalityLbEndpoints{ + Priority: uint32(priority), + LbEndpoints: es, }) } - return &envoy.ClusterLoadAssignment{ - ClusterName: clusterName, - Endpoints: []envoyendpoint.LocalityLbEndpoints{{ - LbEndpoints: es, - }}, - } + + return cla } diff --git a/agent/xds/endpoints_test.go b/agent/xds/endpoints_test.go index ba3039a5ab..b55c39c756 100644 --- a/agent/xds/endpoints_test.go +++ b/agent/xds/endpoints_test.go @@ -94,15 +94,18 @@ func Test_makeLoadAssignment(t *testing.T) { testWarningCheckServiceNodes[1].Checks[0].Status = "warning" tests := []struct { - name string - clusterName string - endpoints structs.CheckServiceNodes - want *envoy.ClusterLoadAssignment + name string + clusterName string + overprovisioningFactor int + endpoints []structs.CheckServiceNodes + want *envoy.ClusterLoadAssignment }{ { name: "no instances", clusterName: "service:test", - endpoints: structs.CheckServiceNodes{}, + endpoints: []structs.CheckServiceNodes{ + {}, + }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", Endpoints: []envoyendpoint.LocalityLbEndpoints{{ @@ -113,7 +116,9 @@ func Test_makeLoadAssignment(t *testing.T) { { name: "instances, no weights", clusterName: "service:test", - endpoints: testCheckServiceNodes, + endpoints: []structs.CheckServiceNodes{ + testCheckServiceNodes, + }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", Endpoints: []envoyendpoint.LocalityLbEndpoints{{ @@ -141,7 +146,9 @@ func Test_makeLoadAssignment(t *testing.T) { { name: "instances, healthy weights", clusterName: "service:test", - endpoints: testWeightedCheckServiceNodes, + endpoints: []structs.CheckServiceNodes{ + testWeightedCheckServiceNodes, + }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", Endpoints: []envoyendpoint.LocalityLbEndpoints{{ @@ -169,7 +176,9 @@ func Test_makeLoadAssignment(t *testing.T) { { name: "instances, warning weights", clusterName: "service:test", - endpoints: testWarningCheckServiceNodes, + endpoints: []structs.CheckServiceNodes{ + testWarningCheckServiceNodes, + }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", Endpoints: []envoyendpoint.LocalityLbEndpoints{{ @@ -197,7 +206,7 @@ func Test_makeLoadAssignment(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := makeLoadAssignment(tt.clusterName, tt.endpoints, "dc1") + got := makeLoadAssignment(tt.clusterName, tt.overprovisioningFactor, tt.endpoints, "dc1") require.Equal(t, tt.want, got) }) } diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 6611f0c591..f40c212a82 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -52,10 +52,23 @@ func (s *Server) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps return nil, err } for i, u := range cfgSnap.Proxy.Upstreams { - resources[i+1], err = s.makeUpstreamListener(&u) + id := u.Identifier() + + var chain *structs.CompiledDiscoveryChain + if u.DestinationType != structs.UpstreamDestTypePreparedQuery { + chain = cfgSnap.ConnectProxy.DiscoveryChain[id] + } + + var upstreamListener proto.Message + if chain == nil || chain.IsDefault() { + upstreamListener, err = s.makeUpstreamListener(&u, cfgSnap) + } else { + upstreamListener, err = s.makeUpstreamListenerForDiscoveryChain(&u, chain, cfgSnap) + } if err != nil { return nil, err } + resources[i+1] = upstreamListener } return resources, nil } @@ -226,7 +239,7 @@ func (s *Server) makePublicListener(cfgSnap *proxycfg.ConfigSnapshot, token stri } l = makeListener(PublicListenerName, addr, cfgSnap.Port) - filter, err := makeListenerFilter(cfg.Protocol, "public_listener", LocalAppClusterName, "", true) + filter, err := makeListenerFilter(false, cfg.Protocol, "public_listener", LocalAppClusterName, "", true) if err != nil { return nil, err } @@ -243,7 +256,7 @@ func (s *Server) makePublicListener(cfgSnap *proxycfg.ConfigSnapshot, token stri return l, err } -func (s *Server) makeUpstreamListener(u *structs.Upstream) (proto.Message, error) { +func (s *Server) makeUpstreamListener(u *structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (proto.Message, error) { cfg, err := ParseUpstreamConfig(u.Config) if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns @@ -260,11 +273,16 @@ func (s *Server) makeUpstreamListener(u *structs.Upstream) (proto.Message, error addr = "127.0.0.1" } - l := makeListener(u.Identifier(), addr, u.LocalBindPort) - filter, err := makeListenerFilter(cfg.Protocol, u.Identifier(), u.Identifier(), "upstream_", false) + upstreamID := u.Identifier() + + clusterName := upstreamID + + l := makeListener(upstreamID, addr, u.LocalBindPort) + filter, err := makeListenerFilter(false, cfg.Protocol, upstreamID, clusterName, "upstream_", false) if err != nil { return nil, err } + l.FilterChains = []envoylistener.FilterChain{ { Filters: []envoylistener.Filter{ @@ -330,14 +348,44 @@ func (s *Server) makeGatewayListener(name, addr string, port int, cfgSnap *proxy return l, nil } -func makeListenerFilter(protocol, filterName, cluster, statPrefix string, ingress bool) (envoylistener.Filter, error) { +func (s *Server) makeUpstreamListenerForDiscoveryChain( + u *structs.Upstream, + chain *structs.CompiledDiscoveryChain, + cfgSnap *proxycfg.ConfigSnapshot, +) (proto.Message, error) { + // TODO(rb): make the listener escape hatch work again + + addr := u.LocalBindAddress + if addr == "" { + addr = "127.0.0.1" + } + + upstreamID := u.Identifier() + + l := makeListener(upstreamID, addr, u.LocalBindPort) + filter, err := makeListenerFilter(true, chain.Protocol, upstreamID, "", "upstream_", false) + if err != nil { + return nil, err + } + + l.FilterChains = []envoylistener.FilterChain{ + { + Filters: []envoylistener.Filter{ + filter, + }, + }, + } + return l, nil +} + +func makeListenerFilter(useRDS bool, protocol, filterName, cluster, statPrefix string, ingress bool) (envoylistener.Filter, error) { switch protocol { case "grpc": - return makeHTTPFilter(filterName, cluster, statPrefix, ingress, true, true) + return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, true, true) case "http2": - return makeHTTPFilter(filterName, cluster, statPrefix, ingress, false, true) + return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, false, true) case "http": - return makeHTTPFilter(filterName, cluster, statPrefix, ingress, false, false) + return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, false, false) case "tcp": fallthrough default: @@ -375,7 +423,11 @@ func makeStatPrefix(protocol, prefix, filterName string) string { return fmt.Sprintf("%s%s_%s", prefix, strings.Replace(filterName, ":", "_", -1), protocol) } -func makeHTTPFilter(filterName, cluster, statPrefix string, ingress, grpc, http2 bool) (envoylistener.Filter, error) { +func makeHTTPFilter( + useRDS bool, + filterName, cluster, statPrefix string, + ingress, grpc, http2 bool, +) (envoylistener.Filter, error) { op := envoyhttp.INGRESS if !ingress { op = envoyhttp.EGRESS @@ -387,7 +439,39 @@ func makeHTTPFilter(filterName, cluster, statPrefix string, ingress, grpc, http2 cfg := &envoyhttp.HttpConnectionManager{ StatPrefix: makeStatPrefix(proto, statPrefix, filterName), CodecType: envoyhttp.AUTO, - RouteSpecifier: &envoyhttp.HttpConnectionManager_RouteConfig{ + HttpFilters: []*envoyhttp.HttpFilter{ + &envoyhttp.HttpFilter{ + Name: "envoy.router", + }, + }, + Tracing: &envoyhttp.HttpConnectionManager_Tracing{ + OperationName: op, + // Don't trace any requests by default unless the client application + // explicitly propagates trace headers that indicate this should be + // sampled. + RandomSampling: &envoytype.Percent{Value: 0.0}, + }, + } + + if useRDS { + if cluster != "" { + return envoylistener.Filter{}, fmt.Errorf("cannot specify cluster name when using RDS") + } + cfg.RouteSpecifier = &envoyhttp.HttpConnectionManager_Rds{ + Rds: &envoyhttp.Rds{ + RouteConfigName: filterName, + ConfigSource: envoycore.ConfigSource{ + ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{ + Ads: &envoycore.AggregatedConfigSource{}, + }, + }, + }, + } + } else { + if cluster == "" { + return envoylistener.Filter{}, fmt.Errorf("must specify cluster name when not using RDS") + } + cfg.RouteSpecifier = &envoyhttp.HttpConnectionManager_RouteConfig{ RouteConfig: &envoy.RouteConfiguration{ Name: filterName, VirtualHosts: []envoyroute.VirtualHost{ @@ -418,19 +502,7 @@ func makeHTTPFilter(filterName, cluster, statPrefix string, ingress, grpc, http2 }, }, }, - }, - HttpFilters: []*envoyhttp.HttpFilter{ - &envoyhttp.HttpFilter{ - Name: "envoy.router", - }, - }, - Tracing: &envoyhttp.HttpConnectionManager_Tracing{ - OperationName: op, - // Don't trace any requests by default unless the client application - // explicitly propagates trace headers that indicate this should be - // sampled. - RandomSampling: &envoytype.Percent{Value: 0.0}, - }, + } } if http2 { diff --git a/agent/xds/response.go b/agent/xds/response.go index b4131ece41..38b395b2a0 100644 --- a/agent/xds/response.go +++ b/agent/xds/response.go @@ -57,3 +57,7 @@ func makeAddressPtr(ip string, port int) *envoycore.Address { func makeUint32Value(n int) *prototypes.UInt32Value { return &prototypes.UInt32Value{Value: uint32(n)} } + +func makeBoolValue(n bool) *prototypes.BoolValue { + return &prototypes.BoolValue{Value: n} +} diff --git a/agent/xds/routes.go b/agent/xds/routes.go index 0d904e741a..5d5585d40a 100644 --- a/agent/xds/routes.go +++ b/agent/xds/routes.go @@ -2,18 +2,298 @@ package xds import ( "errors" + "fmt" "github.com/gogo/protobuf/proto" + envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" + envoyroute "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" ) -// routesFromSnapshot returns the xDS API representation of the "routes" -// in the snapshot. +// routesFromSnapshot returns the xDS API representation of the "routes" in the +// snapshot. func routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { if cfgSnap == nil { return nil, errors.New("nil config given") } - // We don't support routes yet but probably will later - return nil, nil + + switch cfgSnap.Kind { + case structs.ServiceKindConnectProxy: + return routesFromSnapshotConnectProxy(cfgSnap, token) + default: + return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind) + } +} + +// routesFromSnapshotConnectProxy returns the xDS API representation of the +// "routes" in the snapshot. +func routesFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { + if cfgSnap == nil { + return nil, errors.New("nil config given") + } + + var resources []proto.Message + + for _, u := range cfgSnap.Proxy.Upstreams { + upstreamID := u.Identifier() + + var chain *structs.CompiledDiscoveryChain + if u.DestinationType != structs.UpstreamDestTypePreparedQuery { + chain = cfgSnap.ConnectProxy.DiscoveryChain[upstreamID] + } + + if chain == nil || chain.IsDefault() { + // TODO(rb): make this do the old school stuff too + } else { + upstreamRoute, err := makeUpstreamRouteForDiscoveryChain(&u, chain, cfgSnap) + if err != nil { + return nil, err + } + if upstreamRoute != nil { + resources = append(resources, upstreamRoute) + } + } + } + + // TODO(rb): make sure we don't generate an empty result + + return resources, nil +} + +func makeUpstreamRouteForDiscoveryChain( + u *structs.Upstream, + chain *structs.CompiledDiscoveryChain, + cfgSnap *proxycfg.ConfigSnapshot, +) (*envoy.RouteConfiguration, error) { + upstreamID := u.Identifier() + routeName := upstreamID + + var routes []envoyroute.Route + + switch chain.Node.Type { + case structs.DiscoveryGraphNodeTypeRouter: + routes = make([]envoyroute.Route, 0, len(chain.Node.Routes)) + + for _, discoveryRoute := range chain.Node.Routes { + routeMatch := makeRouteMatchForDiscoveryRoute(discoveryRoute, chain.Protocol) + + // TODO(rb): handle PrefixRewrite + // TODO(rb): handle RequestTimeout + // TODO(rb): handle Retries + + var ( + routeAction *envoyroute.Route_Route + err error + ) + + next := discoveryRoute.DestinationNode + if next.Type == structs.DiscoveryGraphNodeTypeSplitter { + routeAction, err = makeRouteActionForSplitter(upstreamID, cfgSnap.Datacenter, next.Splits) + if err != nil { + return nil, err + } + + } else if next.Type == structs.DiscoveryGraphNodeTypeGroupResolver { + groupResolver := next.GroupResolver + routeAction = makeRouteActionForSingleCluster(upstreamID, cfgSnap.Datacenter, groupResolver.Target) + + } else { + return nil, fmt.Errorf("unexpected graph node after route %q", next.Type) + } + + routes = append(routes, envoyroute.Route{ + Match: routeMatch, + Action: routeAction, + }) + } + + case structs.DiscoveryGraphNodeTypeSplitter: + routeAction, err := makeRouteActionForSplitter(upstreamID, cfgSnap.Datacenter, chain.Node.Splits) + if err != nil { + return nil, err + } + + defaultRoute := envoyroute.Route{ + Match: makeDefaultRouteMatch(), + Action: routeAction, + } + + routes = []envoyroute.Route{defaultRoute} + + case structs.DiscoveryGraphNodeTypeGroupResolver: + groupResolver := chain.Node.GroupResolver + + routeAction := makeRouteActionForSingleCluster(upstreamID, cfgSnap.Datacenter, groupResolver.Target) + + defaultRoute := envoyroute.Route{ + Match: makeDefaultRouteMatch(), + Action: routeAction, + } + + routes = []envoyroute.Route{defaultRoute} + + default: + panic("unknown top node in discovery chain of type: " + chain.Node.Type) + } + + return &envoy.RouteConfiguration{ + Name: routeName, + VirtualHosts: []envoyroute.VirtualHost{ + envoyroute.VirtualHost{ + Name: routeName, + Domains: []string{"*"}, + Routes: routes, + }, + }, + // ValidateClusters defaults to true when defined statically and false + // when done via RDS. Re-set the sane value of true to prevent + // null-routing traffic. + ValidateClusters: makeBoolValue(true), + }, nil +} + +func makeRouteMatchForDiscoveryRoute(discoveryRoute *structs.DiscoveryRoute, protocol string) envoyroute.RouteMatch { + switch protocol { + case "http", "http2": + // The only match stanza is HTTP. + default: + return makeDefaultRouteMatch() + } + + match := discoveryRoute.Definition.Match + if match == nil || match.IsEmpty() { + return makeDefaultRouteMatch() + } + + em := envoyroute.RouteMatch{} + + switch { + case match.HTTP.PathExact != "": + em.PathSpecifier = &envoyroute.RouteMatch_Path{Path: "/"} + case match.HTTP.PathPrefix != "": + em.PathSpecifier = &envoyroute.RouteMatch_Prefix{Prefix: "/"} + case match.HTTP.PathRegex != "": + em.PathSpecifier = &envoyroute.RouteMatch_Regex{Regex: "/"} + default: + em.PathSpecifier = &envoyroute.RouteMatch_Prefix{Prefix: "/"} + } + + if len(match.HTTP.Header) > 0 { + em.Headers = make([]*envoyroute.HeaderMatcher, 0, len(match.HTTP.Header)) + for _, hdr := range match.HTTP.Header { + eh := &envoyroute.HeaderMatcher{ + Name: hdr.Name, + } + + switch { + case hdr.Exact != "": + eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_ExactMatch{ + ExactMatch: hdr.Exact, + } + case hdr.Regex != "": + eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_RegexMatch{ + RegexMatch: hdr.Regex, + } + case hdr.Prefix != "": + eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_PrefixMatch{ + PrefixMatch: hdr.Prefix, + } + case hdr.Suffix != "": + eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_SuffixMatch{ + SuffixMatch: hdr.Suffix, + } + case hdr.Present: + eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_PresentMatch{ + PresentMatch: true, + } + case hdr.Invert: // THIS HAS TO BE LAST + eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_PresentMatch{ + // We set this to the misleading value of 'true' here + // because we'll generically invert it next. + PresentMatch: true, + } + default: + continue // skip this impossible situation + } + + if hdr.Invert { + eh.InvertMatch = true + } + + em.Headers = append(em.Headers, eh) + } + } + + if len(match.HTTP.QueryParam) > 0 { + em.QueryParameters = make([]*envoyroute.QueryParameterMatcher, 0, len(match.HTTP.QueryParam)) + for _, qm := range match.HTTP.QueryParam { + eq := &envoyroute.QueryParameterMatcher{ + Name: qm.Name, + Value: qm.Value, + Regex: makeBoolValue(qm.Regex), + } + + em.QueryParameters = append(em.QueryParameters, eq) + } + } + + return em +} + +func makeDefaultRouteMatch() envoyroute.RouteMatch { + return envoyroute.RouteMatch{ + PathSpecifier: &envoyroute.RouteMatch_Prefix{ + Prefix: "/", + }, + // TODO(banks) Envoy supports matching only valid GRPC + // requests which might be nice to add here for gRPC services + // but it's not supported in our current envoy SDK version + // although docs say it was supported by 1.8.0. Going to defer + // that until we've updated the deps. + } +} + +func makeRouteActionForSingleCluster(upstreamID, currentDatacenter string, target structs.DiscoveryTarget) *envoyroute.Route_Route { + clusterName := makeClusterName(upstreamID, target, currentDatacenter) + + return &envoyroute.Route_Route{ + Route: &envoyroute.RouteAction{ + ClusterSpecifier: &envoyroute.RouteAction_Cluster{ + Cluster: clusterName, + }, + }, + } +} + +func makeRouteActionForSplitter(upstreamID, currentDatacenter string, splits []*structs.DiscoverySplit) (*envoyroute.Route_Route, error) { + clusters := make([]*envoyroute.WeightedCluster_ClusterWeight, 0, len(splits)) + for _, split := range splits { + if split.Node.Type != structs.DiscoveryGraphNodeTypeGroupResolver { + return nil, fmt.Errorf("unexpected splitter destination node type: %s", split.Node.Type) + } + groupResolver := split.Node.GroupResolver + target := groupResolver.Target + clusterName := makeClusterName(upstreamID, target, currentDatacenter) + + // TODO(rb): scale up by 100 and adjust total weight + cw := &envoyroute.WeightedCluster_ClusterWeight{ + Weight: makeUint32Value(int(split.Weight)), + Name: clusterName, + } + + clusters = append(clusters, cw) + } + + return &envoyroute.Route_Route{ + Route: &envoyroute.RouteAction{ + ClusterSpecifier: &envoyroute.RouteAction_WeightedClusters{ + WeightedClusters: &envoyroute.WeightedCluster{ + Clusters: clusters, + TotalWeight: makeUint32Value(100), + }, + }, + }, + }, nil } diff --git a/test/integration/connect/envoy/.gitignore b/test/integration/connect/envoy/.gitignore new file mode 100644 index 0000000000..b2fa65ff63 --- /dev/null +++ b/test/integration/connect/envoy/.gitignore @@ -0,0 +1 @@ +/workdir diff --git a/test/integration/connect/envoy/helpers.bash b/test/integration/connect/envoy/helpers.bash index 7cd1bf89ec..4f9eefbed7 100755 --- a/test/integration/connect/envoy/helpers.bash +++ b/test/integration/connect/envoy/helpers.bash @@ -111,10 +111,23 @@ function get_envoy_stats_flush_interval { echo "$output" | jq --raw-output '.configs[0].bootstrap.stats_flush_interval' } +# snapshot_envoy_admin is meant to be used from a teardown scriptlet from the host. +function snapshot_envoy_admin { + local HOSTPORT=$1 + local ENVOY_NAME=$2 + + docker_wget "http://${HOSTPORT}/config_dump" -q -O - > "./workdir/envoy/${ENVOY_NAME}-config_dump.json" + docker_wget "http://${HOSTPORT}/clusters" -q -O - > "./workdir/envoy/${ENVOY_NAME}-clusters.out" +} + function docker_consul { docker run -ti --rm --network container:envoy_consul_1 consul-dev $@ } +function docker_wget { + docker run -ti --rm --network container:envoy_consul_1 alpine:3.9 wget $@ +} + function must_match_in_statsd_logs { run cat /workdir/statsd/statsd.log COUNT=$( echo "$output" | grep -Ec $1 )