connect: simplify the compiled discovery chain data structures (#6242)

This should make them better for sending over RPC or the API.

Instead of a chain implemented explicitly like a linked list (nodes
holding pointers to other nodes) instead switch to a flat map of named
nodes with nodes linking other other nodes by name. The shipped
structure is just a map and a string to indicate which key to start
from.

Other changes:

* inline the compiler option InferDefaults as true

* introduce compiled target config to avoid needing to send back
  additional maps of Resolvers; future target-specific compiled state
  can go here

* move compiled MeshGateway out of the Resolver and into the
  TargetConfig where it makes more sense.
This commit is contained in:
R.B. Boyer 2019-08-01 22:44:05 -05:00 committed by GitHub
parent 3128937145
commit f02924fafe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1069 additions and 1051 deletions

View File

@ -361,7 +361,6 @@ func (c *ConfigEntry) ReadDiscoveryChain(args *structs.DiscoveryChainRequest, re
OverrideMeshGateway: args.OverrideMeshGateway,
OverrideProtocol: args.OverrideProtocol,
OverrideConnectTimeout: args.OverrideConnectTimeout,
InferDefaults: true,
Entries: entries,
})
if err != nil {

View File

@ -33,8 +33,7 @@ type CompileRequest struct {
// overridden for any resolver in the compiled chain.
OverrideConnectTimeout time.Duration
InferDefaults bool // TODO(rb): remove this?
Entries *structs.DiscoveryChainConfigEntries
Entries *structs.DiscoveryChainConfigEntries
}
// Compile assembles a discovery chain in the form of a graph of nodes using
@ -56,7 +55,6 @@ func Compile(req CompileRequest) (*structs.CompiledDiscoveryChain, error) {
serviceName = req.ServiceName
currentNamespace = req.CurrentNamespace
currentDatacenter = req.CurrentDatacenter
inferDefaults = req.InferDefaults
entries = req.Entries
)
if serviceName == "" {
@ -79,15 +77,15 @@ func Compile(req CompileRequest) (*structs.CompiledDiscoveryChain, error) {
overrideMeshGateway: req.OverrideMeshGateway,
overrideProtocol: req.OverrideProtocol,
overrideConnectTimeout: req.OverrideConnectTimeout,
inferDefaults: inferDefaults,
entries: entries,
splitterNodes: make(map[string]*structs.DiscoveryGraphNode),
groupResolverNodes: make(map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode),
resolvers: make(map[string]*structs.ServiceResolverConfigEntry),
splitterNodes: make(map[string]*structs.DiscoveryGraphNode),
resolveNodes: make(map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode),
resolvers: make(map[string]*structs.ServiceResolverConfigEntry),
retainResolvers: make(map[string]struct{}),
targets: make(map[structs.DiscoveryTarget]struct{}),
nodes: make(map[string]*structs.DiscoveryGraphNode),
targets: make(map[structs.DiscoveryTarget]structs.DiscoveryTargetConfig),
}
if req.OverrideProtocol != "" {
@ -113,16 +111,19 @@ type compiler struct {
overrideMeshGateway structs.MeshGatewayConfig
overrideProtocol string
overrideConnectTimeout time.Duration
inferDefaults bool
// config entries that are being compiled (will be mutated during compilation)
//
// This is an INPUT field.
entries *structs.DiscoveryChainConfigEntries
// resolvers is initially seeded by copying the provided entries.Resolvers
// map and default resolvers are added as they are needed.
resolvers map[string]*structs.ServiceResolverConfigEntry
// cached nodes
splitterNodes map[string]*structs.DiscoveryGraphNode
groupResolverNodes map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode // this is also an OUTPUT field
splitterNodes map[string]*structs.DiscoveryGraphNode
resolveNodes map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode
// usesAdvancedRoutingFeatures is set to true if config entries for routing
// or splitting appear in the compiled chain
@ -137,31 +138,24 @@ type compiler struct {
// This is an OUTPUT field.
customizedBy customizationMarkers
// topNode is computed inside of assembleChain()
//
// This is an OUTPUT field.
topNode *structs.DiscoveryGraphNode
// protocol is the common protocol used for all referenced services. These
// cannot be mixed.
//
// This is an OUTPUT field.
protocol string
// resolvers is initially seeded by copying the provided entries.Resolvers
// map and default resolvers are added as they are needed.
//
// If redirects cause a resolver to not be needed it will be omitted from
// this map.
// startNode is computed inside of assembleChain()
//
// This is an OUTPUT field.
resolvers map[string]*structs.ServiceResolverConfigEntry
// retainResolvers flags the elements of the resolvers map that should be
// retained in the final results.
retainResolvers map[string]struct{}
startNode string
// nodes is computed inside of compile()
//
// This is an OUTPUT field.
nodes map[string]*structs.DiscoveryGraphNode
// This is an OUTPUT field.
targets map[structs.DiscoveryTarget]struct{}
targets map[structs.DiscoveryTarget]structs.DiscoveryTargetConfig
}
type customizationMarkers struct {
@ -174,6 +168,23 @@ func (m *customizationMarkers) IsZero() bool {
return !m.MeshGateway && !m.Protocol && !m.ConnectTimeout
}
// recordNode stores the node internally in the compiled chain.
func (c *compiler) recordNode(node *structs.DiscoveryGraphNode) {
// Some types have their own type-specific lookups, so record those, too.
switch node.Type {
case structs.DiscoveryGraphNodeTypeRouter:
// no special storage
case structs.DiscoveryGraphNodeTypeSplitter:
c.splitterNodes[node.ServiceName()] = node
case structs.DiscoveryGraphNodeTypeResolver:
c.resolveNodes[node.Resolver.Target] = node
default:
panic("unknown node type '" + node.Type + "'")
}
c.nodes[node.MapKey()] = node
}
func (c *compiler) recordServiceProtocol(serviceName string) error {
if serviceDefault := c.entries.GetService(serviceName); serviceDefault != nil {
return c.recordProtocol(serviceName, serviceDefault.Protocol)
@ -220,11 +231,12 @@ func (c *compiler) compile() (*structs.CompiledDiscoveryChain, error) {
return nil, err
}
if c.topNode == nil {
if c.inferDefaults {
panic("impossible to return no results with infer defaults set to true")
}
return nil, nil
// We don't need these intermediates anymore.
c.splitterNodes = nil
c.resolveNodes = nil
if c.startNode == "" {
panic("impossible to return no results")
}
if err := c.detectCircularSplits(); err != nil {
@ -236,11 +248,8 @@ func (c *compiler) compile() (*structs.CompiledDiscoveryChain, error) {
c.flattenAdjacentSplitterNodes()
// Remove any unused resolvers.
for name, _ := range c.resolvers {
if _, ok := c.retainResolvers[name]; !ok {
delete(c.resolvers, name)
}
if err := c.removeUnusedNodes(); err != nil {
return nil, err
}
if !enableAdvancedRoutingForProtocol(c.protocol) && c.usesAdvancedRoutingFeatures {
@ -252,12 +261,6 @@ func (c *compiler) compile() (*structs.CompiledDiscoveryChain, error) {
}
}
targets := make([]structs.DiscoveryTarget, 0, len(c.targets))
for target, _ := range c.targets {
targets = append(targets, target)
}
structs.DiscoveryTargets(targets).Sort()
if c.overrideProtocol != "" {
if c.overrideProtocol != c.protocol {
c.protocol = c.overrideProtocol
@ -290,15 +293,14 @@ func (c *compiler) compile() (*structs.CompiledDiscoveryChain, error) {
}
return &structs.CompiledDiscoveryChain{
ServiceName: c.serviceName,
Namespace: c.currentNamespace,
Datacenter: c.currentDatacenter,
CustomizationHash: customizationHash,
Protocol: c.protocol,
Node: c.topNode,
Resolvers: c.resolvers,
Targets: targets,
GroupResolverNodes: c.groupResolverNodes, // TODO(rb): prune unused
ServiceName: c.serviceName,
Namespace: c.currentNamespace,
Datacenter: c.currentDatacenter,
CustomizationHash: customizationHash,
Protocol: c.protocol,
StartNode: c.startNode,
Nodes: c.nodes,
Targets: c.targets,
}, nil
}
@ -315,23 +317,28 @@ func (c *compiler) detectCircularResolves() error {
func (c *compiler) flattenAdjacentSplitterNodes() {
for {
anyChanged := false
for _, splitterNode := range c.splitterNodes {
fixedSplits := make([]*structs.DiscoverySplit, 0, len(splitterNode.Splits))
for _, node := range c.nodes {
if node.Type != structs.DiscoveryGraphNodeTypeSplitter {
continue
}
fixedSplits := make([]*structs.DiscoverySplit, 0, len(node.Splits))
changed := false
for _, split := range splitterNode.Splits {
if split.Node.Type != structs.DiscoveryGraphNodeTypeSplitter {
for _, split := range node.Splits {
nextNode := c.nodes[split.NextNode]
if nextNode.Type != structs.DiscoveryGraphNodeTypeSplitter {
fixedSplits = append(fixedSplits, split)
continue
}
changed = true
for _, innerSplit := range split.Node.Splits {
for _, innerSplit := range nextNode.Splits {
effectiveWeight := split.Weight * innerSplit.Weight / 100
newDiscoverySplit := &structs.DiscoverySplit{
Weight: structs.NormalizeServiceSplitWeight(effectiveWeight),
Node: innerSplit.Node,
Weight: structs.NormalizeServiceSplitWeight(effectiveWeight),
NextNode: innerSplit.NextNode,
}
fixedSplits = append(fixedSplits, newDiscoverySplit)
@ -339,7 +346,7 @@ func (c *compiler) flattenAdjacentSplitterNodes() {
}
if changed {
splitterNode.Splits = fixedSplits
node.Splits = fixedSplits
anyChanged = true
}
}
@ -350,21 +357,81 @@ func (c *compiler) flattenAdjacentSplitterNodes() {
}
}
// removeUnusedNodes walks the chain from the start and prunes any nodes that
// are no longer referenced. This can happen as a result of operations like
// flattenAdjacentSplitterNodes().
func (c *compiler) removeUnusedNodes() error {
var (
visited = make(map[string]struct{})
todo = make(map[string]struct{})
)
todo[c.startNode] = struct{}{}
getNext := func() string {
if len(todo) == 0 {
return ""
}
for k, _ := range todo {
delete(todo, k)
return k
}
return ""
}
for {
next := getNext()
if next == "" {
break
}
if _, ok := visited[next]; ok {
continue
}
visited[next] = struct{}{}
node := c.nodes[next]
if node == nil {
return fmt.Errorf("compilation references non-retained node %q", next)
}
switch node.Type {
case structs.DiscoveryGraphNodeTypeRouter:
for _, route := range node.Routes {
todo[route.NextNode] = struct{}{}
}
case structs.DiscoveryGraphNodeTypeSplitter:
for _, split := range node.Splits {
todo[split.NextNode] = struct{}{}
}
case structs.DiscoveryGraphNodeTypeResolver:
// nothing special
default:
return fmt.Errorf("unknown node type %q", node.Type)
}
}
if len(visited) == len(c.nodes) {
return nil
}
for name, _ := range c.nodes {
if _, ok := visited[name]; !ok {
delete(c.nodes, name)
}
}
return nil
}
// assembleChain will do the initial assembly of a chain of DiscoveryGraphNode
// entries from the provided config entries. No default resolvers are injected
// here so it is expected that if there are no discovery chain config entries
// set up for a given service that it will produce no topNode from this.
// entries from the provided config entries.
func (c *compiler) assembleChain() error {
if c.topNode != nil {
if c.startNode != "" || len(c.nodes) > 0 {
return fmt.Errorf("assembleChain should only be called once")
}
// Check for short circuit path.
if len(c.resolvers) == 0 && c.entries.IsChainEmpty() {
if !c.inferDefaults {
return nil // nothing explicitly configured
}
// Materialize defaults and cache.
c.resolvers[c.serviceName] = newDefaultServiceResolver(c.serviceName)
}
@ -380,12 +447,12 @@ func (c *compiler) assembleChain() error {
if router == nil {
// If no router is configured, move on down the line to the next hop of
// the chain.
node, err := c.getSplitterOrGroupResolverNode(c.newTarget(c.serviceName, "", "", ""))
node, err := c.getSplitterOrResolverNode(c.newTarget(c.serviceName, "", "", ""))
if err != nil {
return err
}
c.topNode = node
c.startNode = node.MapKey()
return nil
}
@ -419,11 +486,11 @@ func (c *compiler) assembleChain() error {
err error
)
if dest.ServiceSubset == "" && dest.Namespace == "" {
node, err = c.getSplitterOrGroupResolverNode(
node, err = c.getSplitterOrResolverNode(
c.newTarget(svc, dest.ServiceSubset, dest.Namespace, ""),
)
} else {
node, err = c.getGroupResolverNode(
node, err = c.getResolverNode(
c.newTarget(svc, dest.ServiceSubset, dest.Namespace, ""),
false,
)
@ -431,23 +498,25 @@ func (c *compiler) assembleChain() error {
if err != nil {
return err
}
compiledRoute.DestinationNode = node
compiledRoute.NextNode = node.MapKey()
}
// If we have a router, we'll add a catch-all route at the end to send
// unmatched traffic to the next hop in the chain.
defaultDestinationNode, err := c.getSplitterOrGroupResolverNode(c.newTarget(c.serviceName, "", "", ""))
defaultDestinationNode, err := c.getSplitterOrResolverNode(c.newTarget(c.serviceName, "", "", ""))
if err != nil {
return err
}
defaultRoute := &structs.DiscoveryRoute{
Definition: newDefaultServiceRoute(c.serviceName),
DestinationNode: defaultDestinationNode,
Definition: newDefaultServiceRoute(c.serviceName),
NextNode: defaultDestinationNode.MapKey(),
}
routeNode.Routes = append(routeNode.Routes, defaultRoute)
c.topNode = routeNode
c.startNode = routeNode.MapKey()
c.recordNode(routeNode)
return nil
}
@ -476,18 +545,17 @@ func (c *compiler) newTarget(service, serviceSubset, namespace, datacenter strin
}
}
func (c *compiler) getSplitterOrGroupResolverNode(target structs.DiscoveryTarget) (*structs.DiscoveryGraphNode, error) {
func (c *compiler) getSplitterOrResolverNode(target structs.DiscoveryTarget) (*structs.DiscoveryGraphNode, error) {
nextNode, err := c.getSplitterNode(target.Service)
if err != nil {
return nil, err
} else if nextNode != nil {
return nextNode, nil
}
return c.getGroupResolverNode(target, false)
return c.getResolverNode(target, false)
}
func (c *compiler) getSplitterNode(name string) (*structs.DiscoveryGraphNode, error) {
// Do we already have the node?
if prev, ok := c.splitterNodes[name]; ok {
return prev, nil
@ -512,7 +580,7 @@ func (c *compiler) getSplitterNode(name string) (*structs.DiscoveryGraphNode, er
// If we record this exists before recursing down it will short-circuit
// sanely if there is some sort of graph loop below.
c.splitterNodes[name] = splitNode
c.recordNode(splitNode)
for _, split := range splitter.Splits {
compiledSplit := &structs.DiscoverySplit{
@ -527,33 +595,33 @@ func (c *compiler) getSplitterNode(name string) (*structs.DiscoveryGraphNode, er
if err != nil {
return nil, err
} else if nextNode != nil {
compiledSplit.Node = nextNode
compiledSplit.NextNode = nextNode.MapKey()
continue
}
// fall through to group-resolver
}
node, err := c.getGroupResolverNode(
node, err := c.getResolverNode(
c.newTarget(svc, split.ServiceSubset, split.Namespace, ""),
false,
)
if err != nil {
return nil, err
}
compiledSplit.Node = node
compiledSplit.NextNode = node.MapKey()
}
c.usesAdvancedRoutingFeatures = true
return splitNode, nil
}
// getGroupResolverNode handles most of the code to handle
// redirection/rewriting capabilities from a resolver config entry. It recurses
// into itself to _generate_ targets used for failover out of convenience.
func (c *compiler) getGroupResolverNode(target structs.DiscoveryTarget, recursedForFailover bool) (*structs.DiscoveryGraphNode, error) {
// getResolverNode handles most of the code to handle redirection/rewriting
// capabilities from a resolver config entry. It recurses into itself to
// _generate_ targets used for failover out of convenience.
func (c *compiler) getResolverNode(target structs.DiscoveryTarget, recursedForFailover bool) (*structs.DiscoveryGraphNode, error) {
RESOLVE_AGAIN:
// Do we already have the node?
if prev, ok := c.groupResolverNodes[target]; ok {
if prev, ok := c.resolveNodes[target]; ok {
return prev, nil
}
@ -603,9 +671,6 @@ RESOLVE_AGAIN:
}
}
// Since we're actually building a node with it, we can keep it.
c.retainResolvers[target.Service] = struct{}{}
connectTimeout := resolver.ConnectTimeout
if connectTimeout < 1 {
connectTimeout = 5 * time.Second
@ -619,48 +684,51 @@ RESOLVE_AGAIN:
}
// Build node.
groupResolverNode := &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeGroupResolver,
Name: resolver.Name,
GroupResolver: &structs.DiscoveryGroupResolver{
node := &structs.DiscoveryGraphNode{
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: target.Identifier(),
Resolver: &structs.DiscoveryResolver{
Definition: resolver,
Default: resolver.IsDefault(),
Target: target,
ConnectTimeout: connectTimeout,
},
}
groupResolver := groupResolverNode.GroupResolver
// Default mesh gateway settings
if serviceDefault := c.entries.GetService(resolver.Name); serviceDefault != nil {
groupResolver.MeshGateway = serviceDefault.MeshGateway
targetConfig := structs.DiscoveryTargetConfig{
Subset: resolver.Subsets[target.ServiceSubset],
}
if c.entries.GlobalProxy != nil && groupResolver.MeshGateway.Mode == structs.MeshGatewayModeDefault {
groupResolver.MeshGateway.Mode = c.entries.GlobalProxy.MeshGateway.Mode
// Default mesh gateway settings
if serviceDefault := c.entries.GetService(target.Service); serviceDefault != nil {
targetConfig.MeshGateway = serviceDefault.MeshGateway
}
if c.entries.GlobalProxy != nil && targetConfig.MeshGateway.Mode == structs.MeshGatewayModeDefault {
targetConfig.MeshGateway.Mode = c.entries.GlobalProxy.MeshGateway.Mode
}
if c.overrideMeshGateway.Mode != structs.MeshGatewayModeDefault {
if groupResolver.MeshGateway.Mode != c.overrideMeshGateway.Mode {
groupResolver.MeshGateway.Mode = c.overrideMeshGateway.Mode
if targetConfig.MeshGateway.Mode != c.overrideMeshGateway.Mode {
targetConfig.MeshGateway.Mode = c.overrideMeshGateway.Mode
c.customizedBy.MeshGateway = true
}
}
// Retain this target even if we may not retain the group resolver.
c.targets[target] = struct{}{}
c.targets[target] = targetConfig
if recursedForFailover {
// If we recursed here from ourselves in a failover context, just emit
// this node without caching it or even processing failover again.
// This is a little weird but it keeps the redirect/default-subset
// logic in one place.
return groupResolverNode, nil
return node, nil
}
// If we record this exists before recursing down it will short-circuit
// sanely if there is some sort of graph loop below.
c.groupResolverNodes[target] = groupResolverNode
c.recordNode(node)
if len(resolver.Failover) > 0 {
f := resolver.Failover
@ -705,23 +773,23 @@ RESOLVE_AGAIN:
df := &structs.DiscoveryFailover{
Definition: &failover,
}
groupResolver.Failover = df
node.Resolver.Failover = df
// Convert the targets into targets by cheating a bit and
// recursing into ourselves.
for _, target := range failoverTargets {
failoverGroupResolverNode, err := c.getGroupResolverNode(target, true)
failoverResolveNode, err := c.getResolverNode(target, true)
if err != nil {
return nil, err
}
failoverTarget := failoverGroupResolverNode.GroupResolver.Target
failoverTarget := failoverResolveNode.Resolver.Target
df.Targets = append(df.Targets, failoverTarget)
}
}
}
}
return groupResolverNode, nil
return node, nil
}
func newDefaultServiceResolver(serviceName string) *structs.ServiceResolverConfigEntry {

File diff suppressed because it is too large Load Diff

View File

@ -22,7 +22,6 @@ func TestCompileConfigEntries(
ServiceName: serviceName,
CurrentNamespace: currentNamespace,
CurrentDatacenter: currentDatacenter,
InferDefaults: true,
Entries: set,
}
if setup != nil {

View File

@ -559,37 +559,6 @@ func (s *state) resetWatchesFromChain(
return fmt.Errorf("not possible to arrive here with no discovery chain")
}
// Collect all sorts of catalog queries we'll have to run.
targets := make(map[structs.DiscoveryTarget]*structs.ServiceResolverConfigEntry)
addTarget := func(target structs.DiscoveryTarget) error {
resolver, ok := chain.Resolvers[target.Service]
if !ok {
return fmt.Errorf("missing resolver %q for target %s", target.Service, target)
}
targets[target] = resolver
return nil
}
// NOTE: We will NEVER see a missing chain, because we always request it with defaulting enabled.
meshGatewayModes := make(map[structs.DiscoveryTarget]structs.MeshGatewayMode)
for _, group := range chain.GroupResolverNodes {
groupResolver := group.GroupResolver
meshGatewayModes[groupResolver.Target] = groupResolver.MeshGateway.Mode
if err := addTarget(groupResolver.Target); err != nil {
return err
}
if groupResolver.Failover != nil {
for _, target := range groupResolver.Failover.Targets {
if err := addTarget(target); err != nil {
return err
}
}
}
}
// Initialize relevant sub maps.
if _, ok := snap.ConnectProxy.WatchedUpstreams[id]; !ok {
snap.ConnectProxy.WatchedUpstreams[id] = make(map[structs.DiscoveryTarget]context.CancelFunc)
@ -611,35 +580,17 @@ func (s *state) resetWatchesFromChain(
cancelFn()
}
for target, resolver := range targets {
if target.Service != resolver.Name {
panic(target.Service + " != " + resolver.Name) // TODO(rb): remove
}
for target, targetConfig := range chain.Targets {
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target)
// TODO(rb): make sure the cross-dc request properly fills in the alternate datacenters
var subset structs.ServiceResolverSubset
if target.ServiceSubset != "" {
var ok bool
subset, ok = resolver.Subsets[target.ServiceSubset]
if !ok {
// Not possible really.
return fmt.Errorf("target %s cannot be resolved; service %q does not have a subset named %q", target, target.Service, target.ServiceSubset)
}
}
encodedTarget, err := target.MarshalText()
if err != nil {
return fmt.Errorf("target %s cannot be converted into a cache key string: %v", target, err)
}
encodedTarget := target.Identifier()
ctx, cancel := context.WithCancel(s.ctx)
// TODO (mesh-gateway)- maybe allow using a gateway within a datacenter at some point
meshGateway := structs.MeshGatewayModeDefault
if target.Datacenter != s.source.Datacenter {
meshGateway = meshGatewayModes[target]
meshGateway = targetConfig.MeshGateway.Mode
}
// if the default mode
@ -647,12 +598,12 @@ func (s *state) resetWatchesFromChain(
meshGateway = structs.MeshGatewayModeNone
}
err = s.watchConnectProxyService(
err := s.watchConnectProxyService(
ctx,
"upstream-target:"+string(encodedTarget)+":"+id,
"upstream-target:"+encodedTarget+":"+id,
target.Service,
target.Datacenter,
subset.Filter,
targetConfig.Subset.Filter,
meshGateway,
)
if err != nil {

View File

@ -5,7 +5,6 @@ import (
"encoding"
"fmt"
"net/url"
"sort"
"strings"
"time"
)
@ -30,24 +29,19 @@ type CompiledDiscoveryChain struct {
// Protocol is the overall protocol shared by everything in the chain.
Protocol string
// Node is the top node in the chain.
//
// If this is a router or splitter then in envoy this renders as an http
// route object.
//
// If this is a group resolver then in envoy this renders as a default
// wildcard http route object.
Node *DiscoveryGraphNode `json:",omitempty"`
// StartNode is the first key into the Nodes map that should be followed
// when walking the discovery chain.
StartNode string `json:",omitempty"`
// GroupResolverNodes respresents all unique service instance groups that
// need to be represented. For envoy these render as Clusters.
// Nodes contains all nodes available for traversal in the chain keyed by a
// unique name. You can walk this by starting with StartNode.
//
// Omitted from JSON because these already show up under the Node field.
GroupResolverNodes map[DiscoveryTarget]*DiscoveryGraphNode `json:"-"`
// NOTE: The names should be treated as opaque values and are only
// guaranteed to be consistent within a single compilation.
Nodes map[string]*DiscoveryGraphNode `json:",omitempty"`
// TODO(rb): not sure if these two fields are actually necessary but I'll know when I get into xDS
Resolvers map[string]*ServiceResolverConfigEntry `json:",omitempty"`
Targets []DiscoveryTarget `json:",omitempty"`
// Targets is a list of all targets and configuration related just to targets.
Targets map[DiscoveryTarget]DiscoveryTargetConfig `json:",omitempty"`
}
// IsDefault returns true if the compiled chain represents no routing, no
@ -56,41 +50,31 @@ type CompiledDiscoveryChain struct {
// applied is redirection to another resolver that is default, so we double
// check the resolver matches the requested resolver.
func (c *CompiledDiscoveryChain) IsDefault() bool {
if c.Node == nil {
if c.StartNode == "" || len(c.Nodes) == 0 {
return true
}
node := c.Nodes[c.StartNode]
if node == nil {
panic("not possible: missing node named '" + c.StartNode + "' in chain '" + c.ServiceName + "'")
}
// TODO(rb): include CustomizationHash here?
return c.Node.Name == c.ServiceName &&
c.Node.Type == DiscoveryGraphNodeTypeGroupResolver &&
c.Node.GroupResolver.Default
}
// SubsetDefinitionForTarget is a convenience function to fetch the subset
// definition for the service subset defined by the provided target. If the
// subset is not defined an empty definition is returned.
func (c *CompiledDiscoveryChain) SubsetDefinitionForTarget(t DiscoveryTarget) ServiceResolverSubset {
if t.ServiceSubset == "" {
return ServiceResolverSubset{}
}
resolver, ok := c.Resolvers[t.Service]
if !ok {
return ServiceResolverSubset{}
}
return resolver.Subsets[t.ServiceSubset]
return node.Type == DiscoveryGraphNodeTypeResolver &&
node.Resolver.Default &&
node.Resolver.Target.Service == c.ServiceName
}
const (
DiscoveryGraphNodeTypeRouter = "router"
DiscoveryGraphNodeTypeSplitter = "splitter"
DiscoveryGraphNodeTypeGroupResolver = "group-resolver"
DiscoveryGraphNodeTypeRouter = "router"
DiscoveryGraphNodeTypeSplitter = "splitter"
DiscoveryGraphNodeTypeResolver = "resolver"
)
// DiscoveryGraphNode is a single node of the compiled discovery chain.
// DiscoveryGraphNode is a single node in the compiled discovery chain.
type DiscoveryGraphNode struct {
Type string
Name string // default chain/service name at this spot
Name string // this is NOT necessarily a service
// fields for Type==router
Routes []*DiscoveryRoute `json:",omitempty"`
@ -98,34 +82,48 @@ type DiscoveryGraphNode struct {
// fields for Type==splitter
Splits []*DiscoverySplit `json:",omitempty"`
// fields for Type==group-resolver
GroupResolver *DiscoveryGroupResolver `json:",omitempty"`
// fields for Type==resolver
Resolver *DiscoveryResolver `json:",omitempty"`
}
// compiled form of ServiceResolverConfigEntry but customized per non-failover target
type DiscoveryGroupResolver struct {
func (s *DiscoveryGraphNode) ServiceName() string {
if s.Type == DiscoveryGraphNodeTypeResolver {
return s.Resolver.Target.Service
}
return s.Name
}
func (s *DiscoveryGraphNode) MapKey() string {
return fmt.Sprintf("%s:%s", s.Type, s.Name)
}
// compiled form of ServiceResolverConfigEntry
type DiscoveryResolver struct {
Definition *ServiceResolverConfigEntry `json:",omitempty"`
Default bool `json:",omitempty"`
ConnectTimeout time.Duration `json:",omitempty"`
MeshGateway MeshGatewayConfig `json:",omitempty"`
Target DiscoveryTarget `json:",omitempty"`
Failover *DiscoveryFailover `json:",omitempty"`
}
type DiscoveryTargetConfig struct {
MeshGateway MeshGatewayConfig `json:",omitempty"`
Subset ServiceResolverSubset `json:",omitempty"`
}
// compiled form of ServiceRoute
type DiscoveryRoute struct {
Definition *ServiceRoute `json:",omitempty"`
DestinationNode *DiscoveryGraphNode `json:",omitempty"`
Definition *ServiceRoute `json:",omitempty"`
NextNode string `json:",omitempty"`
}
// compiled form of ServiceSplit
type DiscoverySplit struct {
Weight float32 `json:",omitempty"`
Node *DiscoveryGraphNode `json:",omitempty"`
Weight float32 `json:",omitempty"`
NextNode string `json:",omitempty"`
}
// compiled form of ServiceResolverFailover
// TODO(rb): figure out how to get mesh gateways in here
type DiscoveryFailover struct {
Definition *ServiceResolverFailover `json:",omitempty"`
Targets []DiscoveryTarget `json:",omitempty"`
@ -183,17 +181,21 @@ var _ encoding.TextUnmarshaler = (*DiscoveryTarget)(nil)
//
// This should NOT return any errors.
func (t DiscoveryTarget) MarshalText() (text []byte, err error) {
return []byte(t.Identifier()), nil
}
func (t DiscoveryTarget) Identifier() string {
var buf bytes.Buffer
buf.WriteString(url.QueryEscape(t.Service))
buf.WriteRune(',')
buf.WriteString(url.QueryEscape(t.ServiceSubset))
buf.WriteString(url.QueryEscape(t.ServiceSubset)) // TODO(rb): move this first so the scoping flows from small->large?
buf.WriteRune(',')
if t.Namespace != "default" {
buf.WriteString(url.QueryEscape(t.Namespace))
}
buf.WriteRune(',')
buf.WriteString(url.QueryEscape(t.Datacenter))
return buf.Bytes(), nil
return buf.String()
}
// UnmarshalText implements encoding.TextUnmarshaler.
@ -256,29 +258,3 @@ func (t DiscoveryTarget) String() string {
return b.String()
}
type DiscoveryTargets []DiscoveryTarget
func (targets DiscoveryTargets) Sort() {
sort.Slice(targets, func(i, j int) bool {
if targets[i].Service < targets[j].Service {
return true
} else if targets[i].Service > targets[j].Service {
return false
}
if targets[i].ServiceSubset < targets[j].ServiceSubset {
return true
} else if targets[i].ServiceSubset > targets[j].ServiceSubset {
return false
}
if targets[i].Namespace < targets[j].Namespace {
return true
} else if targets[i].Namespace > targets[j].Namespace {
return false
}
return targets[i].Datacenter < targets[j].Datacenter
})
}

View File

@ -234,7 +234,6 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
chain *structs.CompiledDiscoveryChain,
cfgSnap *proxycfg.ConfigSnapshot,
) ([]*envoy.Cluster, error) {
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
@ -250,8 +249,12 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
// TODO(rb): make escape hatches work with chains
var out []*envoy.Cluster
for target, node := range chain.GroupResolverNodes {
groupResolver := node.GroupResolver
for _, node := range chain.Nodes {
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
continue
}
target := node.Resolver.Target
sni := TargetSNI(target, cfgSnap)
clusterName := CustomizeClusterName(sni, chain)
@ -260,14 +263,13 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
c := &envoy.Cluster{
Name: clusterName,
AltStatName: clusterName,
ConnectTimeout: groupResolver.ConnectTimeout,
ConnectTimeout: node.Resolver.ConnectTimeout,
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_EDS},
CommonLbConfig: &envoy.Cluster_CommonLbConfig{
HealthyPanicThreshold: &envoytype.Percent{
Value: 0, // disable panic threshold
},
},
// TODO(rb): adjust load assignment
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{

View File

@ -73,15 +73,21 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
continue // skip the upstream (should not happen)
}
for target, node := range chain.GroupResolverNodes {
groupResolver := node.GroupResolver
failover := groupResolver.Failover
// Find all resolver nodes.
for _, node := range chain.Nodes {
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
continue
}
failover := node.Resolver.Failover
target := node.Resolver.Target
endpoints, ok := chainEndpointMap[target]
if !ok {
continue // skip the cluster (should not happen)
}
targetConfig := chain.Targets[target]
var (
endpointGroups []loadAssignmentEndpointGroup
overprovisioningFactor int
@ -89,7 +95,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
primaryGroup := loadAssignmentEndpointGroup{
Endpoints: endpoints,
OnlyPassing: chain.SubsetDefinitionForTarget(target).OnlyPassing,
OnlyPassing: targetConfig.Subset.OnlyPassing,
}
if failover != nil && len(failover.Targets) > 0 {
@ -112,9 +118,11 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
continue // skip the failover target (should not happen)
}
failTargetConfig := chain.Targets[failTarget]
endpointGroups = append(endpointGroups, loadAssignmentEndpointGroup{
Endpoints: failEndpoints,
OnlyPassing: chain.SubsetDefinitionForTarget(failTarget).OnlyPassing,
OnlyPassing: failTargetConfig.Subset.OnlyPassing,
})
}
} else {

View File

@ -266,11 +266,11 @@ func Test_endpointsFromSnapshot(t *testing.T) {
Namespace: "default",
Datacenter: "dc1",
}
dbResolverNode := chain.GroupResolverNodes[dbTarget]
dbResolverNode := chain.Nodes["resolver:"+dbTarget.Identifier()]
groupResolverFailover := dbResolverNode.GroupResolver.Failover
failover := dbResolverNode.Resolver.Failover
groupResolverFailover.Definition.OverprovisioningFactor = 160
failover.Definition.OverprovisioningFactor = 160
},
},
{

View File

@ -73,11 +73,16 @@ func makeUpstreamRouteForDiscoveryChain(
var routes []envoyroute.Route
switch chain.Node.Type {
case structs.DiscoveryGraphNodeTypeRouter:
routes = make([]envoyroute.Route, 0, len(chain.Node.Routes))
startNode := chain.Nodes[chain.StartNode]
if startNode == nil {
panic("missing first node in compiled discovery chain for: " + chain.ServiceName)
}
for _, discoveryRoute := range chain.Node.Routes {
switch startNode.Type {
case structs.DiscoveryGraphNodeTypeRouter:
routes = make([]envoyroute.Route, 0, len(startNode.Routes))
for _, discoveryRoute := range startNode.Routes {
routeMatch := makeRouteMatchForDiscoveryRoute(discoveryRoute, chain.Protocol)
var (
@ -85,19 +90,19 @@ func makeUpstreamRouteForDiscoveryChain(
err error
)
next := discoveryRoute.DestinationNode
if next.Type == structs.DiscoveryGraphNodeTypeSplitter {
routeAction, err = makeRouteActionForSplitter(next.Splits, chain, cfgSnap)
nextNode := chain.Nodes[discoveryRoute.NextNode]
switch nextNode.Type {
case structs.DiscoveryGraphNodeTypeSplitter:
routeAction, err = makeRouteActionForSplitter(nextNode.Splits, chain, cfgSnap)
if err != nil {
return nil, err
}
} else if next.Type == structs.DiscoveryGraphNodeTypeGroupResolver {
groupResolver := next.GroupResolver
routeAction = makeRouteActionForSingleCluster(groupResolver.Target, chain, cfgSnap)
case structs.DiscoveryGraphNodeTypeResolver:
routeAction = makeRouteActionForSingleCluster(nextNode.Resolver.Target, chain, cfgSnap)
} else {
return nil, fmt.Errorf("unexpected graph node after route %q", next.Type)
default:
return nil, fmt.Errorf("unexpected graph node after route %q", nextNode.Type)
}
// TODO(rb): Better help handle the envoy case where you need (prefix=/foo/,rewrite=/) and (exact=/foo,rewrite=/) to do a full rewrite
@ -142,7 +147,7 @@ func makeUpstreamRouteForDiscoveryChain(
}
case structs.DiscoveryGraphNodeTypeSplitter:
routeAction, err := makeRouteActionForSplitter(chain.Node.Splits, chain, cfgSnap)
routeAction, err := makeRouteActionForSplitter(startNode.Splits, chain, cfgSnap)
if err != nil {
return nil, err
}
@ -154,10 +159,8 @@ func makeUpstreamRouteForDiscoveryChain(
routes = []envoyroute.Route{defaultRoute}
case structs.DiscoveryGraphNodeTypeGroupResolver:
groupResolver := chain.Node.GroupResolver
routeAction := makeRouteActionForSingleCluster(groupResolver.Target, chain, cfgSnap)
case structs.DiscoveryGraphNodeTypeResolver:
routeAction := makeRouteActionForSingleCluster(startNode.Resolver.Target, chain, cfgSnap)
defaultRoute := envoyroute.Route{
Match: makeDefaultRouteMatch(),
@ -167,7 +170,7 @@ func makeUpstreamRouteForDiscoveryChain(
routes = []envoyroute.Route{defaultRoute}
default:
panic("unknown top node in discovery chain of type: " + chain.Node.Type)
panic("unknown first node in discovery chain of type: " + startNode.Type)
}
return &envoy.RouteConfiguration{
@ -320,11 +323,12 @@ func makeRouteActionForSingleCluster(target structs.DiscoveryTarget, chain *stru
func makeRouteActionForSplitter(splits []*structs.DiscoverySplit, chain *structs.CompiledDiscoveryChain, cfgSnap *proxycfg.ConfigSnapshot) (*envoyroute.Route_Route, error) {
clusters := make([]*envoyroute.WeightedCluster_ClusterWeight, 0, len(splits))
for _, split := range splits {
if split.Node.Type != structs.DiscoveryGraphNodeTypeGroupResolver {
return nil, fmt.Errorf("unexpected splitter destination node type: %s", split.Node.Type)
nextNode := chain.Nodes[split.NextNode]
if nextNode.Type != structs.DiscoveryGraphNodeTypeResolver {
return nil, fmt.Errorf("unexpected splitter destination node type: %s", nextNode.Type)
}
groupResolver := split.Node.GroupResolver
target := groupResolver.Target
target := nextNode.Resolver.Target
sni := TargetSNI(target, cfgSnap)
clusterName := CustomizeClusterName(sni, chain)