mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
activate most discovery chain features in xDS for envoy (#6024)
This commit is contained in:
parent
bdebe62fd0
commit
4bdb690a25
@ -3945,6 +3945,15 @@ func (a *Agent) registerCache() {
|
|||||||
RefreshTimer: 0 * time.Second,
|
RefreshTimer: 0 * time.Second,
|
||||||
RefreshTimeout: 10 * time.Minute,
|
RefreshTimeout: 10 * time.Minute,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
a.cache.RegisterType(cachetype.CompiledDiscoveryChainName, &cachetype.CompiledDiscoveryChain{
|
||||||
|
RPC: a,
|
||||||
|
}, &cache.RegisterOptions{
|
||||||
|
// Maintain a blocking query, retry dropped connections quickly
|
||||||
|
Refresh: true,
|
||||||
|
RefreshTimer: 0 * time.Second,
|
||||||
|
RefreshTimeout: 10 * time.Minute,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// defaultProxyCommand returns the default Connect managed proxy command.
|
// defaultProxyCommand returns the default Connect managed proxy command.
|
||||||
|
52
agent/cache-types/discovery_chain.go
Normal file
52
agent/cache-types/discovery_chain.go
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Recommended name for registration.
|
||||||
|
const CompiledDiscoveryChainName = "compiled-discovery-chain"
|
||||||
|
|
||||||
|
// CompiledDiscoveryChain supports fetching the complete discovery chain for a
|
||||||
|
// service and caching its compilation.
|
||||||
|
type CompiledDiscoveryChain struct {
|
||||||
|
RPC RPC
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CompiledDiscoveryChain) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||||
|
var result cache.FetchResult
|
||||||
|
|
||||||
|
// The request should be a DiscoveryChainRequest.
|
||||||
|
reqReal, ok := req.(*structs.DiscoveryChainRequest)
|
||||||
|
if !ok {
|
||||||
|
return result, fmt.Errorf(
|
||||||
|
"Internal cache failure: request wrong type: %T", req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the minimum query index to our current index so we block
|
||||||
|
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
|
||||||
|
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
|
||||||
|
|
||||||
|
// Always allow stale - there's no point in hitting leader if the request is
|
||||||
|
// going to be served from cache and endup arbitrarily stale anyway. This
|
||||||
|
// allows cached compiled-discovery-chain to automatically read scale across all
|
||||||
|
// servers too.
|
||||||
|
reqReal.AllowStale = true
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
var reply structs.DiscoveryChainResponse
|
||||||
|
if err := c.RPC.RPC("ConfigEntry.ReadDiscoveryChain", reqReal, &reply); err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Value = &reply
|
||||||
|
result.Index = reply.QueryMeta.Index
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CompiledDiscoveryChain) SupportsBlocking() bool {
|
||||||
|
return true
|
||||||
|
}
|
@ -6,6 +6,7 @@ import (
|
|||||||
|
|
||||||
metrics "github.com/armon/go-metrics"
|
metrics "github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
@ -312,3 +313,53 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConfigEntry) ReadDiscoveryChain(args *structs.DiscoveryChainRequest, reply *structs.DiscoveryChainResponse) error {
|
||||||
|
if done, err := c.srv.forward("ConfigEntry.ReadDiscoveryChain", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer metrics.MeasureSince([]string{"config_entry", "read_discovery_chain"}, time.Now())
|
||||||
|
|
||||||
|
// Fetch the ACL token, if any.
|
||||||
|
rule, err := c.srv.ResolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if rule != nil && !rule.ServiceRead(args.Name) {
|
||||||
|
return acl.ErrPermissionDenied
|
||||||
|
}
|
||||||
|
|
||||||
|
if args.Name == "" {
|
||||||
|
return fmt.Errorf("Must provide service name")
|
||||||
|
}
|
||||||
|
|
||||||
|
const currentNamespace = "default"
|
||||||
|
|
||||||
|
return c.srv.blockingQuery(
|
||||||
|
&args.QueryOptions,
|
||||||
|
&reply.QueryMeta,
|
||||||
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
|
index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then we compile it into something useful.
|
||||||
|
chain, err := discoverychain.Compile(discoverychain.CompileRequest{
|
||||||
|
ServiceName: args.Name,
|
||||||
|
CurrentNamespace: currentNamespace,
|
||||||
|
CurrentDatacenter: c.srv.config.Datacenter,
|
||||||
|
InferDefaults: true,
|
||||||
|
Entries: entries,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.Index = index
|
||||||
|
reply.ConfigEntries = entries
|
||||||
|
reply.Chain = chain
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -519,6 +519,12 @@ RESOLVE_AGAIN:
|
|||||||
}
|
}
|
||||||
groupResolver := groupResolverNode.GroupResolver
|
groupResolver := groupResolverNode.GroupResolver
|
||||||
|
|
||||||
|
// Digest mesh gateway settings.
|
||||||
|
if serviceDefault := c.entries.GetService(resolver.Name); serviceDefault != nil {
|
||||||
|
groupResolver.MeshGateway = serviceDefault.MeshGateway
|
||||||
|
}
|
||||||
|
// TODO(rb): thread proxy-defaults version through here as well
|
||||||
|
|
||||||
// Retain this target even if we may not retain the group resolver.
|
// Retain this target even if we may not retain the group resolver.
|
||||||
c.targets[target] = struct{}{}
|
c.targets[target] = struct{}{}
|
||||||
|
|
||||||
|
@ -168,7 +168,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// We are updating the proxy, close it's old state
|
// We are updating the proxy, close its old state
|
||||||
state.Close()
|
state.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,6 +47,42 @@ func TestManager_BasicLifecycle(t *testing.T) {
|
|||||||
|
|
||||||
roots, leaf := TestCerts(t)
|
roots, leaf := TestCerts(t)
|
||||||
|
|
||||||
|
// Initialize a default group resolver for "db"
|
||||||
|
dbResolverEntry := &structs.ServiceResolverConfigEntry{
|
||||||
|
Kind: structs.ServiceResolver,
|
||||||
|
Name: "db",
|
||||||
|
}
|
||||||
|
dbTarget := structs.DiscoveryTarget{
|
||||||
|
Service: "db",
|
||||||
|
Namespace: "default",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
}
|
||||||
|
dbResolverNode := &structs.DiscoveryGraphNode{
|
||||||
|
Type: structs.DiscoveryGraphNodeTypeGroupResolver,
|
||||||
|
Name: "db",
|
||||||
|
GroupResolver: &structs.DiscoveryGroupResolver{
|
||||||
|
Definition: dbResolverEntry,
|
||||||
|
Default: true,
|
||||||
|
Target: dbTarget,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
dbChain := &structs.CompiledDiscoveryChain{
|
||||||
|
ServiceName: "db",
|
||||||
|
Namespace: "default",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Protocol: "tcp",
|
||||||
|
Node: dbResolverNode,
|
||||||
|
GroupResolverNodes: map[structs.DiscoveryTarget]*structs.DiscoveryGraphNode{
|
||||||
|
dbTarget: dbResolverNode,
|
||||||
|
},
|
||||||
|
Resolvers: map[string]*structs.ServiceResolverConfigEntry{
|
||||||
|
"db": dbResolverEntry,
|
||||||
|
},
|
||||||
|
Targets: []structs.DiscoveryTarget{
|
||||||
|
dbTarget,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// Setup initial values
|
// Setup initial values
|
||||||
types.roots.value.Store(roots)
|
types.roots.value.Store(roots)
|
||||||
types.leaf.value.Store(leaf)
|
types.leaf.value.Store(leaf)
|
||||||
@ -55,6 +91,11 @@ func TestManager_BasicLifecycle(t *testing.T) {
|
|||||||
&structs.IndexedCheckServiceNodes{
|
&structs.IndexedCheckServiceNodes{
|
||||||
Nodes: TestUpstreamNodes(t),
|
Nodes: TestUpstreamNodes(t),
|
||||||
})
|
})
|
||||||
|
types.compiledChain.value.Store(
|
||||||
|
&structs.DiscoveryChainResponse{
|
||||||
|
Chain: dbChain,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||||
state := local.NewState(local.Config{}, logger, &token.Store{})
|
state := local.NewState(local.Config{}, logger, &token.Store{})
|
||||||
@ -116,9 +157,16 @@ func TestManager_BasicLifecycle(t *testing.T) {
|
|||||||
Roots: roots,
|
Roots: roots,
|
||||||
ConnectProxy: configSnapshotConnectProxy{
|
ConnectProxy: configSnapshotConnectProxy{
|
||||||
Leaf: leaf,
|
Leaf: leaf,
|
||||||
UpstreamEndpoints: map[string]structs.CheckServiceNodes{
|
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
|
||||||
"db": TestUpstreamNodes(t),
|
"db": dbChain,
|
||||||
},
|
},
|
||||||
|
WatchedUpstreams: nil, // Clone() clears this out
|
||||||
|
WatchedUpstreamEndpoints: map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes{
|
||||||
|
"db": {
|
||||||
|
dbTarget: TestUpstreamNodes(t),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
UpstreamEndpoints: map[string]structs.CheckServiceNodes{},
|
||||||
},
|
},
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,10 @@ import (
|
|||||||
|
|
||||||
type configSnapshotConnectProxy struct {
|
type configSnapshotConnectProxy struct {
|
||||||
Leaf *structs.IssuedCert
|
Leaf *structs.IssuedCert
|
||||||
UpstreamEndpoints map[string]structs.CheckServiceNodes
|
DiscoveryChain map[string]*structs.CompiledDiscoveryChain // this is keyed by the Upstream.Identifier(), not the chain name
|
||||||
|
WatchedUpstreams map[string]map[structs.DiscoveryTarget]context.CancelFunc
|
||||||
|
WatchedUpstreamEndpoints map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes
|
||||||
|
UpstreamEndpoints map[string]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints
|
||||||
}
|
}
|
||||||
|
|
||||||
type configSnapshotMeshGateway struct {
|
type configSnapshotMeshGateway struct {
|
||||||
@ -46,6 +49,7 @@ type ConfigSnapshot struct {
|
|||||||
func (s *ConfigSnapshot) Valid() bool {
|
func (s *ConfigSnapshot) Valid() bool {
|
||||||
switch s.Kind {
|
switch s.Kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
|
// TODO(rb): sanity check discovery chain things here?
|
||||||
return s.Roots != nil && s.ConnectProxy.Leaf != nil
|
return s.Roots != nil && s.ConnectProxy.Leaf != nil
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
// TODO (mesh-gateway) - what happens if all the connect services go away
|
// TODO (mesh-gateway) - what happens if all the connect services go away
|
||||||
@ -65,9 +69,11 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
|
|||||||
|
|
||||||
snap := snapCopy.(*ConfigSnapshot)
|
snap := snapCopy.(*ConfigSnapshot)
|
||||||
|
|
||||||
switch s.Kind {
|
|
||||||
case structs.ServiceKindMeshGateway:
|
|
||||||
// nil these out as anything receiving one of these clones does not need them and should never "cancel" our watches
|
// nil these out as anything receiving one of these clones does not need them and should never "cancel" our watches
|
||||||
|
switch s.Kind {
|
||||||
|
case structs.ServiceKindConnectProxy:
|
||||||
|
snap.ConnectProxy.WatchedUpstreams = nil
|
||||||
|
case structs.ServiceKindMeshGateway:
|
||||||
snap.MeshGateway.WatchedDatacenters = nil
|
snap.MeshGateway.WatchedDatacenters = nil
|
||||||
snap.MeshGateway.WatchedServices = nil
|
snap.MeshGateway.WatchedServices = nil
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,7 @@ type state struct {
|
|||||||
// goroutine later without reasoning about races with the NodeService passed
|
// goroutine later without reasoning about races with the NodeService passed
|
||||||
// (especially for embedded fields like maps and slices).
|
// (especially for embedded fields like maps and slices).
|
||||||
//
|
//
|
||||||
// The returned state needs it's required dependencies to be set before Watch
|
// The returned state needs its required dependencies to be set before Watch
|
||||||
// can be called.
|
// can be called.
|
||||||
func newState(ns *structs.NodeService, token string) (*state, error) {
|
func newState(ns *structs.NodeService, token string) (*state, error) {
|
||||||
if ns.Kind != structs.ServiceKindConnectProxy && ns.Kind != structs.ServiceKindMeshGateway {
|
if ns.Kind != structs.ServiceKindConnectProxy && ns.Kind != structs.ServiceKindMeshGateway {
|
||||||
@ -141,17 +141,17 @@ func (s *state) initWatches() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) watchConnectProxyService(correlationId string, service string, dc string, filter string, meshGatewayMode structs.MeshGatewayMode) error {
|
func (s *state) watchConnectProxyService(ctx context.Context, correlationId string, service string, dc string, filter string, meshGatewayMode structs.MeshGatewayMode) error {
|
||||||
switch meshGatewayMode {
|
switch meshGatewayMode {
|
||||||
case structs.MeshGatewayModeRemote:
|
case structs.MeshGatewayModeRemote:
|
||||||
return s.cache.Notify(s.ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
||||||
Datacenter: dc,
|
Datacenter: dc,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
ServiceKind: structs.ServiceKindMeshGateway,
|
ServiceKind: structs.ServiceKindMeshGateway,
|
||||||
UseServiceKind: true,
|
UseServiceKind: true,
|
||||||
}, correlationId, s.ch)
|
}, correlationId, s.ch)
|
||||||
case structs.MeshGatewayModeLocal:
|
case structs.MeshGatewayModeLocal:
|
||||||
return s.cache.Notify(s.ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
ServiceKind: structs.ServiceKindMeshGateway,
|
ServiceKind: structs.ServiceKindMeshGateway,
|
||||||
@ -159,7 +159,7 @@ func (s *state) watchConnectProxyService(correlationId string, service string, d
|
|||||||
}, correlationId, s.ch)
|
}, correlationId, s.ch)
|
||||||
default:
|
default:
|
||||||
// This includes both the None and Default modes on purpose
|
// This includes both the None and Default modes on purpose
|
||||||
return s.cache.Notify(s.ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
|
return s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
|
||||||
Datacenter: dc,
|
Datacenter: dc,
|
||||||
QueryOptions: structs.QueryOptions{
|
QueryOptions: structs.QueryOptions{
|
||||||
Token: s.token,
|
Token: s.token,
|
||||||
@ -218,6 +218,7 @@ func (s *state) initWatchesConnectProxy() error {
|
|||||||
for _, u := range s.proxyCfg.Upstreams {
|
for _, u := range s.proxyCfg.Upstreams {
|
||||||
dc := s.source.Datacenter
|
dc := s.source.Datacenter
|
||||||
if u.Datacenter != "" {
|
if u.Datacenter != "" {
|
||||||
|
// TODO(rb): if we ASK for a specific datacenter, do we still use the chain?
|
||||||
dc = u.Datacenter
|
dc = u.Datacenter
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,6 +233,30 @@ func (s *state) initWatchesConnectProxy() error {
|
|||||||
case structs.UpstreamDestTypeService:
|
case structs.UpstreamDestTypeService:
|
||||||
fallthrough
|
fallthrough
|
||||||
case "": // Treat unset as the default Service type
|
case "": // Treat unset as the default Service type
|
||||||
|
|
||||||
|
// Determine if this should use a discovery chain.
|
||||||
|
//
|
||||||
|
// TODO(rb): reduce this list of exceptions
|
||||||
|
var shouldUseDiscoveryChain bool
|
||||||
|
if dc != s.source.Datacenter {
|
||||||
|
shouldUseDiscoveryChain = false
|
||||||
|
} else if u.DestinationNamespace != "" && u.DestinationNamespace != "default" {
|
||||||
|
shouldUseDiscoveryChain = false
|
||||||
|
} else {
|
||||||
|
shouldUseDiscoveryChain = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if shouldUseDiscoveryChain {
|
||||||
|
// Watch for discovery chain configuration updates
|
||||||
|
err = s.cache.Notify(s.ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
||||||
|
Datacenter: dc,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
Name: u.DestinationName,
|
||||||
|
}, "discovery-chain:"+u.Identifier(), s.ch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
meshGateway := structs.MeshGatewayModeNone
|
meshGateway := structs.MeshGatewayModeNone
|
||||||
|
|
||||||
// TODO (mesh-gateway)- maybe allow using a gateway within a datacenter at some point
|
// TODO (mesh-gateway)- maybe allow using a gateway within a datacenter at some point
|
||||||
@ -239,9 +264,17 @@ func (s *state) initWatchesConnectProxy() error {
|
|||||||
meshGateway = u.MeshGateway.Mode
|
meshGateway = u.MeshGateway.Mode
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.watchConnectProxyService("upstream:"+serviceIDPrefix+u.Identifier(), u.DestinationName, dc, "", meshGateway); err != nil {
|
if err := s.watchConnectProxyService(
|
||||||
|
s.ctx,
|
||||||
|
"upstream:"+serviceIDPrefix+u.Identifier(),
|
||||||
|
u.DestinationName,
|
||||||
|
dc,
|
||||||
|
"",
|
||||||
|
meshGateway,
|
||||||
|
); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unknown upstream type: %q", u.DestinationType)
|
return fmt.Errorf("unknown upstream type: %q", u.DestinationType)
|
||||||
@ -307,7 +340,10 @@ func (s *state) run() {
|
|||||||
|
|
||||||
switch s.kind {
|
switch s.kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
snap.ConnectProxy.UpstreamEndpoints = make(map[string]structs.CheckServiceNodes)
|
snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
|
||||||
|
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[structs.DiscoveryTarget]context.CancelFunc)
|
||||||
|
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[structs.DiscoveryTarget]structs.CheckServiceNodes)
|
||||||
|
snap.ConnectProxy.UpstreamEndpoints = make(map[string]structs.CheckServiceNodes) // TODO(rb): deprecated
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
snap.MeshGateway.WatchedServices = make(map[string]context.CancelFunc)
|
snap.MeshGateway.WatchedServices = make(map[string]context.CancelFunc)
|
||||||
snap.MeshGateway.WatchedDatacenters = make(map[string]context.CancelFunc)
|
snap.MeshGateway.WatchedDatacenters = make(map[string]context.CancelFunc)
|
||||||
@ -400,24 +436,61 @@ func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
switch u.CorrelationID {
|
switch {
|
||||||
case rootsWatchID:
|
case u.CorrelationID == rootsWatchID:
|
||||||
roots, ok := u.Result.(*structs.IndexedCARoots)
|
roots, ok := u.Result.(*structs.IndexedCARoots)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("invalid type for roots response: %T", u.Result)
|
return fmt.Errorf("invalid type for roots response: %T", u.Result)
|
||||||
}
|
}
|
||||||
snap.Roots = roots
|
snap.Roots = roots
|
||||||
case leafWatchID:
|
|
||||||
|
case u.CorrelationID == leafWatchID:
|
||||||
leaf, ok := u.Result.(*structs.IssuedCert)
|
leaf, ok := u.Result.(*structs.IssuedCert)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("invalid type for leaf response: %T", u.Result)
|
return fmt.Errorf("invalid type for leaf response: %T", u.Result)
|
||||||
}
|
}
|
||||||
snap.ConnectProxy.Leaf = leaf
|
snap.ConnectProxy.Leaf = leaf
|
||||||
case intentionsWatchID:
|
|
||||||
|
case u.CorrelationID == intentionsWatchID:
|
||||||
// Not in snapshot currently, no op
|
// Not in snapshot currently, no op
|
||||||
default:
|
|
||||||
// Service discovery result, figure out which type
|
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
|
||||||
switch {
|
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid type for service response: %T", u.Result)
|
||||||
|
}
|
||||||
|
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
|
||||||
|
snap.ConnectProxy.DiscoveryChain[svc] = resp.Chain
|
||||||
|
|
||||||
|
if err := s.resetWatchesFromChain(svc, resp.Chain, snap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
|
||||||
|
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid type for service response: %T", u.Result)
|
||||||
|
}
|
||||||
|
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
|
||||||
|
encTarget, svc, ok := removeColonPrefix(correlationID)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
|
||||||
|
}
|
||||||
|
|
||||||
|
target := structs.DiscoveryTarget{}
|
||||||
|
if err := target.UnmarshalText([]byte(encTarget)); err != nil {
|
||||||
|
return fmt.Errorf("invalid correlation id %q: %v", u.CorrelationID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(rb): do we have to do onlypassing filters here?
|
||||||
|
|
||||||
|
m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc]
|
||||||
|
if !ok {
|
||||||
|
m = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes)
|
||||||
|
snap.ConnectProxy.WatchedUpstreamEndpoints[svc] = m
|
||||||
|
}
|
||||||
|
snap.ConnectProxy.WatchedUpstreamEndpoints[svc][target] = resp.Nodes
|
||||||
|
|
||||||
case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix):
|
case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix):
|
||||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -437,7 +510,138 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
|||||||
default:
|
default:
|
||||||
return errors.New("unknown correlation ID")
|
return errors.New("unknown correlation ID")
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func removeColonPrefix(s string) (string, string, bool) {
|
||||||
|
idx := strings.Index(s, ":")
|
||||||
|
if idx == -1 {
|
||||||
|
return "", "", false
|
||||||
|
}
|
||||||
|
return s[0:idx], s[idx+1:], true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *state) resetWatchesFromChain(
|
||||||
|
id string,
|
||||||
|
chain *structs.CompiledDiscoveryChain,
|
||||||
|
snap *ConfigSnapshot,
|
||||||
|
) error {
|
||||||
|
if chain == nil {
|
||||||
|
return fmt.Errorf("not possible to arrive here with no discovery chain")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect all sorts of catalog queries we'll have to run.
|
||||||
|
targets := make(map[structs.DiscoveryTarget]*structs.ServiceResolverConfigEntry)
|
||||||
|
addTarget := func(target structs.DiscoveryTarget) error {
|
||||||
|
resolver, ok := chain.Resolvers[target.Service]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("missing resolver %q for target %s", target.Service, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
targets[target] = resolver
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: We will NEVER see a missing chain, because we always request it with defaulting enabled.
|
||||||
|
meshGatewayModes := make(map[structs.DiscoveryTarget]structs.MeshGatewayMode)
|
||||||
|
for _, group := range chain.GroupResolverNodes {
|
||||||
|
groupResolver := group.GroupResolver
|
||||||
|
|
||||||
|
meshGatewayModes[groupResolver.Target] = groupResolver.MeshGateway.Mode
|
||||||
|
|
||||||
|
if err := addTarget(groupResolver.Target); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if groupResolver.Failover != nil {
|
||||||
|
for _, target := range groupResolver.Failover.Targets {
|
||||||
|
if err := addTarget(target); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize relevant sub maps.
|
||||||
|
if _, ok := snap.ConnectProxy.WatchedUpstreams[id]; !ok {
|
||||||
|
snap.ConnectProxy.WatchedUpstreams[id] = make(map[structs.DiscoveryTarget]context.CancelFunc)
|
||||||
|
}
|
||||||
|
if _, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[id]; !ok {
|
||||||
|
// TODO(rb): does this belong here?
|
||||||
|
snap.ConnectProxy.WatchedUpstreamEndpoints[id] = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We could invalidate this selectively based on a hash of the relevant
|
||||||
|
// resolver information, but for now just reset anything about this
|
||||||
|
// upstream when the chain changes in any way.
|
||||||
|
//
|
||||||
|
// TODO(rb): content hash based add/remove
|
||||||
|
for target, cancelFn := range snap.ConnectProxy.WatchedUpstreams[id] {
|
||||||
|
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: stopping watch of target %s", id, chain.ServiceName, target)
|
||||||
|
delete(snap.ConnectProxy.WatchedUpstreams[id], target)
|
||||||
|
delete(snap.ConnectProxy.WatchedUpstreamEndpoints[id], target) // TODO(rb): safe?
|
||||||
|
cancelFn()
|
||||||
|
}
|
||||||
|
|
||||||
|
for target, resolver := range targets {
|
||||||
|
if target.Service != resolver.Name {
|
||||||
|
panic(target.Service + " != " + resolver.Name) // TODO(rb): remove
|
||||||
|
}
|
||||||
|
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target)
|
||||||
|
|
||||||
|
// snap.WatchedUpstreams[name]
|
||||||
|
|
||||||
|
// delete(snap.WatchedUpstreams[name], target)
|
||||||
|
// delete(snap.WatchedUpstreamEndpoint[name], target)
|
||||||
|
|
||||||
|
// TODO(rb): augment the health rpc so we can get the health information to pass to envoy directly
|
||||||
|
|
||||||
|
// TODO(rb): make sure the cross-dc request properly fills in the alternate datacenters
|
||||||
|
|
||||||
|
// TODO(rb): handle subset.onlypassing
|
||||||
|
var subset structs.ServiceResolverSubset
|
||||||
|
if target.ServiceSubset != "" {
|
||||||
|
var ok bool
|
||||||
|
subset, ok = resolver.Subsets[target.ServiceSubset]
|
||||||
|
if !ok {
|
||||||
|
// Not possible really.
|
||||||
|
return fmt.Errorf("target %s cannot be resolved; service %q does not have a subset named %q", target, target.Service, target.ServiceSubset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
encodedTarget, err := target.MarshalText()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("target %s cannot be converted into a cache key string: %v", target, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(s.ctx)
|
||||||
|
|
||||||
|
meshGateway := structs.MeshGatewayModeNone
|
||||||
|
if target.Datacenter != s.source.Datacenter {
|
||||||
|
meshGateway = meshGatewayModes[target]
|
||||||
|
if meshGateway == structs.MeshGatewayModeDefault {
|
||||||
|
meshGateway = structs.MeshGatewayModeNone
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
meshGateway = structs.MeshGatewayModeNone
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(rb): update the health endpoint to allow returning even unhealthy endpoints
|
||||||
|
err = s.watchConnectProxyService(
|
||||||
|
ctx,
|
||||||
|
"upstream-target:"+string(encodedTarget)+":"+id,
|
||||||
|
target.Service,
|
||||||
|
target.Datacenter,
|
||||||
|
subset.Filter,
|
||||||
|
meshGateway,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
snap.ConnectProxy.WatchedUpstreams[id][target] = cancel
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ type TestCacheTypes struct {
|
|||||||
intentions *ControllableCacheType
|
intentions *ControllableCacheType
|
||||||
health *ControllableCacheType
|
health *ControllableCacheType
|
||||||
query *ControllableCacheType
|
query *ControllableCacheType
|
||||||
|
compiledChain *ControllableCacheType
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTestCacheTypes creates a set of ControllableCacheTypes for all types that
|
// NewTestCacheTypes creates a set of ControllableCacheTypes for all types that
|
||||||
@ -34,6 +35,7 @@ func NewTestCacheTypes(t testing.T) *TestCacheTypes {
|
|||||||
intentions: NewControllableCacheType(t),
|
intentions: NewControllableCacheType(t),
|
||||||
health: NewControllableCacheType(t),
|
health: NewControllableCacheType(t),
|
||||||
query: NewControllableCacheType(t),
|
query: NewControllableCacheType(t),
|
||||||
|
compiledChain: NewControllableCacheType(t),
|
||||||
}
|
}
|
||||||
ct.query.blocking = false
|
ct.query.blocking = false
|
||||||
return ct
|
return ct
|
||||||
@ -66,6 +68,12 @@ func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache {
|
|||||||
c.RegisterType(cachetype.PreparedQueryName, types.query, &cache.RegisterOptions{
|
c.RegisterType(cachetype.PreparedQueryName, types.query, &cache.RegisterOptions{
|
||||||
Refresh: false,
|
Refresh: false,
|
||||||
})
|
})
|
||||||
|
c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain, &cache.RegisterOptions{
|
||||||
|
Refresh: true,
|
||||||
|
RefreshTimer: 0,
|
||||||
|
RefreshTimeout: 10 * time.Minute,
|
||||||
|
})
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,9 +4,12 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/mitchellh/hashstructure"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ServiceRouterConfigEntry defines L7 (e.g. http) routing rules for a named
|
// ServiceRouterConfigEntry defines L7 (e.g. http) routing rules for a named
|
||||||
@ -917,6 +920,52 @@ func (e *DiscoveryChainConfigEntries) IsChainEmpty() bool {
|
|||||||
return len(e.Routers) == 0 && len(e.Splitters) == 0 && len(e.Resolvers) == 0
|
return len(e.Routers) == 0 && len(e.Splitters) == 0 && len(e.Resolvers) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DiscoveryChainRequest is used when requesting the discovery chain for a
|
||||||
|
// service.
|
||||||
|
type DiscoveryChainRequest struct {
|
||||||
|
Name string
|
||||||
|
Datacenter string
|
||||||
|
// Source QuerySource
|
||||||
|
|
||||||
|
QueryOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DiscoveryChainRequest) RequestDatacenter() string {
|
||||||
|
return r.Datacenter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DiscoveryChainRequest) CacheInfo() cache.RequestInfo {
|
||||||
|
info := cache.RequestInfo{
|
||||||
|
Token: r.Token,
|
||||||
|
Datacenter: r.Datacenter,
|
||||||
|
MinIndex: r.MinQueryIndex,
|
||||||
|
Timeout: r.MaxQueryTime,
|
||||||
|
MaxAge: r.MaxAge,
|
||||||
|
MustRevalidate: r.MustRevalidate,
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := hashstructure.Hash(struct {
|
||||||
|
Name string
|
||||||
|
}{
|
||||||
|
Name: r.Name,
|
||||||
|
}, nil)
|
||||||
|
if err == nil {
|
||||||
|
// If there is an error, we don't set the key. A blank key forces
|
||||||
|
// no cache for this request so the request is forwarded directly
|
||||||
|
// to the server.
|
||||||
|
info.Key = strconv.FormatUint(v, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
return info
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(rb): either fix the compiled results, or take the derived data and stash it here in a json/msgpack-friendly way?
|
||||||
|
type DiscoveryChainResponse struct {
|
||||||
|
ConfigEntries *DiscoveryChainConfigEntries `json:",omitempty"` // TODO(rb): remove these?
|
||||||
|
Chain *CompiledDiscoveryChain `json:",omitempty"`
|
||||||
|
QueryMeta
|
||||||
|
}
|
||||||
|
|
||||||
type ConfigEntryGraphError struct {
|
type ConfigEntryGraphError struct {
|
||||||
// one of Message or Err should be set
|
// one of Message or Err should be set
|
||||||
Message string
|
Message string
|
||||||
|
@ -73,6 +73,7 @@ type DiscoveryGroupResolver struct {
|
|||||||
Definition *ServiceResolverConfigEntry `json:",omitempty"`
|
Definition *ServiceResolverConfigEntry `json:",omitempty"`
|
||||||
Default bool `json:",omitempty"`
|
Default bool `json:",omitempty"`
|
||||||
ConnectTimeout time.Duration `json:",omitempty"`
|
ConnectTimeout time.Duration `json:",omitempty"`
|
||||||
|
MeshGateway MeshGatewayConfig `json:",omitempty"`
|
||||||
Target DiscoveryTarget `json:",omitempty"`
|
Target DiscoveryTarget `json:",omitempty"`
|
||||||
Failover *DiscoveryFailover `json:",omitempty"`
|
Failover *DiscoveryFailover `json:",omitempty"`
|
||||||
}
|
}
|
||||||
@ -90,6 +91,7 @@ type DiscoverySplit struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// compiled form of ServiceResolverFailover
|
// compiled form of ServiceResolverFailover
|
||||||
|
// TODO(rb): figure out how to get mesh gateways in here
|
||||||
type DiscoveryFailover struct {
|
type DiscoveryFailover struct {
|
||||||
Definition *ServiceResolverFailover `json:",omitempty"`
|
Definition *ServiceResolverFailover `json:",omitempty"`
|
||||||
Targets []DiscoveryTarget `json:",omitempty"`
|
Targets []DiscoveryTarget `json:",omitempty"`
|
||||||
@ -212,7 +214,7 @@ func (t DiscoveryTarget) String() string {
|
|||||||
if t.Namespace != "" {
|
if t.Namespace != "" {
|
||||||
b.WriteString(t.Namespace)
|
b.WriteString(t.Namespace)
|
||||||
} else {
|
} else {
|
||||||
b.WriteString("default")
|
b.WriteString("<default>")
|
||||||
}
|
}
|
||||||
b.WriteRune('.')
|
b.WriteRune('.')
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
|
envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
|
||||||
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||||
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
|
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
|
||||||
|
envoytype "github.com/envoyproxy/go-control-plane/envoy/type"
|
||||||
"github.com/gogo/protobuf/jsonpb"
|
"github.com/gogo/protobuf/jsonpb"
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
"github.com/gogo/protobuf/types"
|
"github.com/gogo/protobuf/types"
|
||||||
@ -38,20 +39,42 @@ func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token st
|
|||||||
// clustersFromSnapshot returns the xDS API representation of the "clusters"
|
// clustersFromSnapshot returns the xDS API representation of the "clusters"
|
||||||
// (upstreams) in the snapshot.
|
// (upstreams) in the snapshot.
|
||||||
func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
||||||
// Include the "app" cluster for the public listener
|
// TODO(rb): this sizing is a low bound.
|
||||||
clusters := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1)
|
clusters := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1)
|
||||||
|
|
||||||
var err error
|
// Include the "app" cluster for the public listener
|
||||||
clusters[0], err = s.makeAppCluster(cfgSnap)
|
appCluster, err := s.makeAppCluster(cfgSnap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for idx, upstream := range cfgSnap.Proxy.Upstreams {
|
clusters = append(clusters, appCluster)
|
||||||
clusters[idx+1], err = s.makeUpstreamCluster(upstream, cfgSnap)
|
|
||||||
|
for _, u := range cfgSnap.Proxy.Upstreams {
|
||||||
|
id := u.Identifier()
|
||||||
|
var chain *structs.CompiledDiscoveryChain
|
||||||
|
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
|
||||||
|
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
|
||||||
|
}
|
||||||
|
|
||||||
|
if chain == nil || chain.IsDefault() {
|
||||||
|
// Either old-school upstream or prepared query.
|
||||||
|
upstreamCluster, err := s.makeUpstreamCluster(u, cfgSnap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
clusters = append(clusters, upstreamCluster)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, chain, cfgSnap)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, cluster := range upstreamClusters {
|
||||||
|
clusters = append(clusters, cluster)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return clusters, nil
|
return clusters, nil
|
||||||
@ -197,6 +220,85 @@ func (s *Server) makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycf
|
|||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) makeUpstreamClustersForDiscoveryChain(
|
||||||
|
upstreamID string,
|
||||||
|
chain *structs.CompiledDiscoveryChain,
|
||||||
|
cfgSnap *proxycfg.ConfigSnapshot,
|
||||||
|
) ([]*envoy.Cluster, error) {
|
||||||
|
if chain == nil {
|
||||||
|
panic("chain must be provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(rb): make escape hatches work with chains
|
||||||
|
|
||||||
|
var out []*envoy.Cluster
|
||||||
|
for target, node := range chain.GroupResolverNodes {
|
||||||
|
groupResolver := node.GroupResolver
|
||||||
|
// TODO(rb): failover
|
||||||
|
// Failover *DiscoveryFailover `json:",omitempty"` // sad path
|
||||||
|
|
||||||
|
clusterName := makeClusterName(upstreamID, target, cfgSnap.Datacenter)
|
||||||
|
c := &envoy.Cluster{
|
||||||
|
Name: clusterName,
|
||||||
|
AltStatName: clusterName, // TODO(rb): change this?
|
||||||
|
ConnectTimeout: groupResolver.ConnectTimeout,
|
||||||
|
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_EDS},
|
||||||
|
CommonLbConfig: &envoy.Cluster_CommonLbConfig{
|
||||||
|
HealthyPanicThreshold: &envoytype.Percent{
|
||||||
|
Value: 0, // disable panic threshold
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// TODO(rb): adjust load assignment
|
||||||
|
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
||||||
|
EdsConfig: &envoycore.ConfigSource{
|
||||||
|
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
||||||
|
Ads: &envoycore.AggregatedConfigSource{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// Having an empty config enables outlier detection with default config.
|
||||||
|
OutlierDetection: &envoycluster.OutlierDetection{},
|
||||||
|
}
|
||||||
|
if chain.Protocol == "http2" || chain.Protocol == "grpc" {
|
||||||
|
c.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enable TLS upstream with the configured client certificate.
|
||||||
|
c.TlsContext = &envoyauth.UpstreamTlsContext{
|
||||||
|
CommonTlsContext: makeCommonTLSContext(cfgSnap),
|
||||||
|
}
|
||||||
|
|
||||||
|
out = append(out, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// makeClusterName returns a string representation that uniquely identifies the
|
||||||
|
// cluster in a canonical but human readable way.
|
||||||
|
func makeClusterName(upstreamID string, target structs.DiscoveryTarget, currentDatacenter string) string {
|
||||||
|
var name string
|
||||||
|
if target.ServiceSubset != "" {
|
||||||
|
name = target.Service + "/" + target.ServiceSubset
|
||||||
|
} else {
|
||||||
|
name = target.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
if target.Namespace != "" && target.Namespace != "default" {
|
||||||
|
name = target.Namespace + "/" + name
|
||||||
|
}
|
||||||
|
if target.Datacenter != "" && target.Datacenter != currentDatacenter {
|
||||||
|
name += "?dc=" + target.Datacenter
|
||||||
|
}
|
||||||
|
|
||||||
|
if upstreamID == target.Service {
|
||||||
|
// In the common case don't stutter.
|
||||||
|
return name
|
||||||
|
}
|
||||||
|
|
||||||
|
return upstreamID + "//" + name
|
||||||
|
}
|
||||||
|
|
||||||
// makeClusterFromUserConfig returns the listener config decoded from an
|
// makeClusterFromUserConfig returns the listener config decoded from an
|
||||||
// arbitrary proto3 json format string or an error if it's invalid.
|
// arbitrary proto3 json format string or an error if it's invalid.
|
||||||
//
|
//
|
||||||
|
@ -33,11 +33,94 @@ func (s *Server) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token s
|
|||||||
// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints"
|
// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints"
|
||||||
// (upstream instances) in the snapshot.
|
// (upstream instances) in the snapshot.
|
||||||
func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
||||||
|
// TODO(rb): this sizing is a low bound.
|
||||||
resources := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.UpstreamEndpoints))
|
resources := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.UpstreamEndpoints))
|
||||||
for id, endpoints := range cfgSnap.ConnectProxy.UpstreamEndpoints {
|
|
||||||
la := makeLoadAssignment(id, endpoints, cfgSnap.Datacenter)
|
// TODO(rb): should naming from 1.5 -> 1.6 for clusters remain unchanged?
|
||||||
|
|
||||||
|
for _, u := range cfgSnap.Proxy.Upstreams {
|
||||||
|
id := u.Identifier()
|
||||||
|
|
||||||
|
var chain *structs.CompiledDiscoveryChain
|
||||||
|
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
|
||||||
|
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
|
||||||
|
}
|
||||||
|
|
||||||
|
if chain == nil {
|
||||||
|
// We ONLY want this branch for prepared queries.
|
||||||
|
|
||||||
|
endpoints, ok := cfgSnap.ConnectProxy.UpstreamEndpoints[id]
|
||||||
|
if ok {
|
||||||
|
la := makeLoadAssignment(
|
||||||
|
id,
|
||||||
|
0,
|
||||||
|
[]structs.CheckServiceNodes{endpoints},
|
||||||
|
cfgSnap.Datacenter,
|
||||||
|
)
|
||||||
resources = append(resources, la)
|
resources = append(resources, la)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// Newfangled discovery chain plumbing.
|
||||||
|
|
||||||
|
chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
|
||||||
|
if !ok {
|
||||||
|
continue // TODO(rb): whaaaa?
|
||||||
|
}
|
||||||
|
|
||||||
|
for target, node := range chain.GroupResolverNodes {
|
||||||
|
groupResolver := node.GroupResolver
|
||||||
|
failover := groupResolver.Failover
|
||||||
|
|
||||||
|
endpoints, ok := chainEndpointMap[target]
|
||||||
|
if !ok {
|
||||||
|
continue // TODO(rb): whaaaa?
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
priorityEndpoints []structs.CheckServiceNodes
|
||||||
|
overprovisioningFactor int
|
||||||
|
)
|
||||||
|
|
||||||
|
if failover != nil && len(failover.Targets) > 0 {
|
||||||
|
priorityEndpoints = make([]structs.CheckServiceNodes, 0, len(failover.Targets)+1)
|
||||||
|
|
||||||
|
priorityEndpoints = append(priorityEndpoints, endpoints)
|
||||||
|
|
||||||
|
if failover.Definition.OverprovisioningFactor > 0 {
|
||||||
|
overprovisioningFactor = failover.Definition.OverprovisioningFactor
|
||||||
|
}
|
||||||
|
if overprovisioningFactor <= 0 {
|
||||||
|
// We choose such a large value here that the failover math should
|
||||||
|
// in effect not happen until zero instances are healthy.
|
||||||
|
overprovisioningFactor = 100000
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, failTarget := range failover.Targets {
|
||||||
|
failEndpoints, ok := chainEndpointMap[failTarget]
|
||||||
|
if ok {
|
||||||
|
priorityEndpoints = append(priorityEndpoints, failEndpoints)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
priorityEndpoints = []structs.CheckServiceNodes{
|
||||||
|
endpoints,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
clusterName := makeClusterName(id, target, cfgSnap.Datacenter)
|
||||||
|
|
||||||
|
la := makeLoadAssignment(
|
||||||
|
clusterName,
|
||||||
|
overprovisioningFactor,
|
||||||
|
priorityEndpoints,
|
||||||
|
cfgSnap.Datacenter,
|
||||||
|
)
|
||||||
|
resources = append(resources, la)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return resources, nil
|
return resources, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -47,14 +130,28 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
|
|||||||
// generate the endpoints for the gateways in the remote datacenters
|
// generate the endpoints for the gateways in the remote datacenters
|
||||||
for dc, endpoints := range cfgSnap.MeshGateway.GatewayGroups {
|
for dc, endpoints := range cfgSnap.MeshGateway.GatewayGroups {
|
||||||
clusterName := DatacenterSNI(dc, cfgSnap)
|
clusterName := DatacenterSNI(dc, cfgSnap)
|
||||||
la := makeLoadAssignment(clusterName, endpoints, cfgSnap.Datacenter)
|
la := makeLoadAssignment(
|
||||||
|
clusterName,
|
||||||
|
0,
|
||||||
|
[]structs.CheckServiceNodes{
|
||||||
|
endpoints,
|
||||||
|
},
|
||||||
|
cfgSnap.Datacenter,
|
||||||
|
)
|
||||||
resources = append(resources, la)
|
resources = append(resources, la)
|
||||||
}
|
}
|
||||||
|
|
||||||
// generate the endpoints for the local service groups
|
// generate the endpoints for the local service groups
|
||||||
for svc, endpoints := range cfgSnap.MeshGateway.ServiceGroups {
|
for svc, endpoints := range cfgSnap.MeshGateway.ServiceGroups {
|
||||||
clusterName := ServiceSNI(svc, "default", cfgSnap.Datacenter, cfgSnap)
|
clusterName := ServiceSNI(svc, "default", cfgSnap.Datacenter, cfgSnap)
|
||||||
la := makeLoadAssignment(clusterName, endpoints, cfgSnap.Datacenter)
|
la := makeLoadAssignment(
|
||||||
|
clusterName,
|
||||||
|
0,
|
||||||
|
[]structs.CheckServiceNodes{
|
||||||
|
endpoints,
|
||||||
|
},
|
||||||
|
cfgSnap.Datacenter,
|
||||||
|
)
|
||||||
resources = append(resources, la)
|
resources = append(resources, la)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,14 +160,33 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
|
|||||||
|
|
||||||
func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint {
|
func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint {
|
||||||
return envoyendpoint.LbEndpoint{
|
return envoyendpoint.LbEndpoint{
|
||||||
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{Endpoint: &envoyendpoint.Endpoint{
|
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{
|
||||||
|
Endpoint: &envoyendpoint.Endpoint{
|
||||||
Address: makeAddressPtr(host, port),
|
Address: makeAddressPtr(host, port),
|
||||||
},
|
},
|
||||||
}}
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes, localDatacenter string) *envoy.ClusterLoadAssignment {
|
func makeLoadAssignment(
|
||||||
|
clusterName string,
|
||||||
|
overprovisioningFactor int,
|
||||||
|
priorityEndpoints []structs.CheckServiceNodes,
|
||||||
|
localDatacenter string,
|
||||||
|
) *envoy.ClusterLoadAssignment {
|
||||||
|
cla := &envoy.ClusterLoadAssignment{
|
||||||
|
ClusterName: clusterName,
|
||||||
|
Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(priorityEndpoints)),
|
||||||
|
}
|
||||||
|
if overprovisioningFactor > 0 {
|
||||||
|
cla.Policy = &envoy.ClusterLoadAssignment_Policy{
|
||||||
|
OverprovisioningFactor: makeUint32Value(overprovisioningFactor),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for priority, endpoints := range priorityEndpoints {
|
||||||
es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints))
|
es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints))
|
||||||
|
|
||||||
for _, ep := range endpoints {
|
for _, ep := range endpoints {
|
||||||
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
|
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
|
||||||
addr, port := ep.BestAddress(localDatacenter != ep.Node.Datacenter)
|
addr, port := ep.BestAddress(localDatacenter != ep.Node.Datacenter)
|
||||||
@ -104,15 +220,18 @@ func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes,
|
|||||||
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{
|
HostIdentifier: &envoyendpoint.LbEndpoint_Endpoint{
|
||||||
Endpoint: &envoyendpoint.Endpoint{
|
Endpoint: &envoyendpoint.Endpoint{
|
||||||
Address: makeAddressPtr(addr, port),
|
Address: makeAddressPtr(addr, port),
|
||||||
}},
|
},
|
||||||
|
},
|
||||||
HealthStatus: healthStatus,
|
HealthStatus: healthStatus,
|
||||||
LoadBalancingWeight: makeUint32Value(weight),
|
LoadBalancingWeight: makeUint32Value(weight),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return &envoy.ClusterLoadAssignment{
|
|
||||||
ClusterName: clusterName,
|
cla.Endpoints = append(cla.Endpoints, envoyendpoint.LocalityLbEndpoints{
|
||||||
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
Priority: uint32(priority),
|
||||||
LbEndpoints: es,
|
LbEndpoints: es,
|
||||||
}},
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return cla
|
||||||
}
|
}
|
||||||
|
@ -96,13 +96,16 @@ func Test_makeLoadAssignment(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
clusterName string
|
clusterName string
|
||||||
endpoints structs.CheckServiceNodes
|
overprovisioningFactor int
|
||||||
|
endpoints []structs.CheckServiceNodes
|
||||||
want *envoy.ClusterLoadAssignment
|
want *envoy.ClusterLoadAssignment
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "no instances",
|
name: "no instances",
|
||||||
clusterName: "service:test",
|
clusterName: "service:test",
|
||||||
endpoints: structs.CheckServiceNodes{},
|
endpoints: []structs.CheckServiceNodes{
|
||||||
|
{},
|
||||||
|
},
|
||||||
want: &envoy.ClusterLoadAssignment{
|
want: &envoy.ClusterLoadAssignment{
|
||||||
ClusterName: "service:test",
|
ClusterName: "service:test",
|
||||||
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
||||||
@ -113,7 +116,9 @@ func Test_makeLoadAssignment(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "instances, no weights",
|
name: "instances, no weights",
|
||||||
clusterName: "service:test",
|
clusterName: "service:test",
|
||||||
endpoints: testCheckServiceNodes,
|
endpoints: []structs.CheckServiceNodes{
|
||||||
|
testCheckServiceNodes,
|
||||||
|
},
|
||||||
want: &envoy.ClusterLoadAssignment{
|
want: &envoy.ClusterLoadAssignment{
|
||||||
ClusterName: "service:test",
|
ClusterName: "service:test",
|
||||||
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
||||||
@ -141,7 +146,9 @@ func Test_makeLoadAssignment(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "instances, healthy weights",
|
name: "instances, healthy weights",
|
||||||
clusterName: "service:test",
|
clusterName: "service:test",
|
||||||
endpoints: testWeightedCheckServiceNodes,
|
endpoints: []structs.CheckServiceNodes{
|
||||||
|
testWeightedCheckServiceNodes,
|
||||||
|
},
|
||||||
want: &envoy.ClusterLoadAssignment{
|
want: &envoy.ClusterLoadAssignment{
|
||||||
ClusterName: "service:test",
|
ClusterName: "service:test",
|
||||||
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
||||||
@ -169,7 +176,9 @@ func Test_makeLoadAssignment(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "instances, warning weights",
|
name: "instances, warning weights",
|
||||||
clusterName: "service:test",
|
clusterName: "service:test",
|
||||||
endpoints: testWarningCheckServiceNodes,
|
endpoints: []structs.CheckServiceNodes{
|
||||||
|
testWarningCheckServiceNodes,
|
||||||
|
},
|
||||||
want: &envoy.ClusterLoadAssignment{
|
want: &envoy.ClusterLoadAssignment{
|
||||||
ClusterName: "service:test",
|
ClusterName: "service:test",
|
||||||
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
||||||
@ -197,7 +206,7 @@ func Test_makeLoadAssignment(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
got := makeLoadAssignment(tt.clusterName, tt.endpoints, "dc1")
|
got := makeLoadAssignment(tt.clusterName, tt.overprovisioningFactor, tt.endpoints, "dc1")
|
||||||
require.Equal(t, tt.want, got)
|
require.Equal(t, tt.want, got)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -52,10 +52,23 @@ func (s *Server) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for i, u := range cfgSnap.Proxy.Upstreams {
|
for i, u := range cfgSnap.Proxy.Upstreams {
|
||||||
resources[i+1], err = s.makeUpstreamListener(&u)
|
id := u.Identifier()
|
||||||
|
|
||||||
|
var chain *structs.CompiledDiscoveryChain
|
||||||
|
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
|
||||||
|
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
|
||||||
|
}
|
||||||
|
|
||||||
|
var upstreamListener proto.Message
|
||||||
|
if chain == nil || chain.IsDefault() {
|
||||||
|
upstreamListener, err = s.makeUpstreamListener(&u, cfgSnap)
|
||||||
|
} else {
|
||||||
|
upstreamListener, err = s.makeUpstreamListenerForDiscoveryChain(&u, chain, cfgSnap)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
resources[i+1] = upstreamListener
|
||||||
}
|
}
|
||||||
return resources, nil
|
return resources, nil
|
||||||
}
|
}
|
||||||
@ -226,7 +239,7 @@ func (s *Server) makePublicListener(cfgSnap *proxycfg.ConfigSnapshot, token stri
|
|||||||
}
|
}
|
||||||
l = makeListener(PublicListenerName, addr, cfgSnap.Port)
|
l = makeListener(PublicListenerName, addr, cfgSnap.Port)
|
||||||
|
|
||||||
filter, err := makeListenerFilter(cfg.Protocol, "public_listener", LocalAppClusterName, "", true)
|
filter, err := makeListenerFilter(false, cfg.Protocol, "public_listener", LocalAppClusterName, "", true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -243,7 +256,7 @@ func (s *Server) makePublicListener(cfgSnap *proxycfg.ConfigSnapshot, token stri
|
|||||||
return l, err
|
return l, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) makeUpstreamListener(u *structs.Upstream) (proto.Message, error) {
|
func (s *Server) makeUpstreamListener(u *structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (proto.Message, error) {
|
||||||
cfg, err := ParseUpstreamConfig(u.Config)
|
cfg, err := ParseUpstreamConfig(u.Config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Don't hard fail on a config typo, just warn. The parse func returns
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
||||||
@ -260,11 +273,16 @@ func (s *Server) makeUpstreamListener(u *structs.Upstream) (proto.Message, error
|
|||||||
addr = "127.0.0.1"
|
addr = "127.0.0.1"
|
||||||
}
|
}
|
||||||
|
|
||||||
l := makeListener(u.Identifier(), addr, u.LocalBindPort)
|
upstreamID := u.Identifier()
|
||||||
filter, err := makeListenerFilter(cfg.Protocol, u.Identifier(), u.Identifier(), "upstream_", false)
|
|
||||||
|
clusterName := upstreamID
|
||||||
|
|
||||||
|
l := makeListener(upstreamID, addr, u.LocalBindPort)
|
||||||
|
filter, err := makeListenerFilter(false, cfg.Protocol, upstreamID, clusterName, "upstream_", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
l.FilterChains = []envoylistener.FilterChain{
|
l.FilterChains = []envoylistener.FilterChain{
|
||||||
{
|
{
|
||||||
Filters: []envoylistener.Filter{
|
Filters: []envoylistener.Filter{
|
||||||
@ -330,14 +348,44 @@ func (s *Server) makeGatewayListener(name, addr string, port int, cfgSnap *proxy
|
|||||||
return l, nil
|
return l, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeListenerFilter(protocol, filterName, cluster, statPrefix string, ingress bool) (envoylistener.Filter, error) {
|
func (s *Server) makeUpstreamListenerForDiscoveryChain(
|
||||||
|
u *structs.Upstream,
|
||||||
|
chain *structs.CompiledDiscoveryChain,
|
||||||
|
cfgSnap *proxycfg.ConfigSnapshot,
|
||||||
|
) (proto.Message, error) {
|
||||||
|
// TODO(rb): make the listener escape hatch work again
|
||||||
|
|
||||||
|
addr := u.LocalBindAddress
|
||||||
|
if addr == "" {
|
||||||
|
addr = "127.0.0.1"
|
||||||
|
}
|
||||||
|
|
||||||
|
upstreamID := u.Identifier()
|
||||||
|
|
||||||
|
l := makeListener(upstreamID, addr, u.LocalBindPort)
|
||||||
|
filter, err := makeListenerFilter(true, chain.Protocol, upstreamID, "", "upstream_", false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
l.FilterChains = []envoylistener.FilterChain{
|
||||||
|
{
|
||||||
|
Filters: []envoylistener.Filter{
|
||||||
|
filter,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return l, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeListenerFilter(useRDS bool, protocol, filterName, cluster, statPrefix string, ingress bool) (envoylistener.Filter, error) {
|
||||||
switch protocol {
|
switch protocol {
|
||||||
case "grpc":
|
case "grpc":
|
||||||
return makeHTTPFilter(filterName, cluster, statPrefix, ingress, true, true)
|
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, true, true)
|
||||||
case "http2":
|
case "http2":
|
||||||
return makeHTTPFilter(filterName, cluster, statPrefix, ingress, false, true)
|
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, false, true)
|
||||||
case "http":
|
case "http":
|
||||||
return makeHTTPFilter(filterName, cluster, statPrefix, ingress, false, false)
|
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, false, false)
|
||||||
case "tcp":
|
case "tcp":
|
||||||
fallthrough
|
fallthrough
|
||||||
default:
|
default:
|
||||||
@ -375,7 +423,11 @@ func makeStatPrefix(protocol, prefix, filterName string) string {
|
|||||||
return fmt.Sprintf("%s%s_%s", prefix, strings.Replace(filterName, ":", "_", -1), protocol)
|
return fmt.Sprintf("%s%s_%s", prefix, strings.Replace(filterName, ":", "_", -1), protocol)
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeHTTPFilter(filterName, cluster, statPrefix string, ingress, grpc, http2 bool) (envoylistener.Filter, error) {
|
func makeHTTPFilter(
|
||||||
|
useRDS bool,
|
||||||
|
filterName, cluster, statPrefix string,
|
||||||
|
ingress, grpc, http2 bool,
|
||||||
|
) (envoylistener.Filter, error) {
|
||||||
op := envoyhttp.INGRESS
|
op := envoyhttp.INGRESS
|
||||||
if !ingress {
|
if !ingress {
|
||||||
op = envoyhttp.EGRESS
|
op = envoyhttp.EGRESS
|
||||||
@ -387,7 +439,39 @@ func makeHTTPFilter(filterName, cluster, statPrefix string, ingress, grpc, http2
|
|||||||
cfg := &envoyhttp.HttpConnectionManager{
|
cfg := &envoyhttp.HttpConnectionManager{
|
||||||
StatPrefix: makeStatPrefix(proto, statPrefix, filterName),
|
StatPrefix: makeStatPrefix(proto, statPrefix, filterName),
|
||||||
CodecType: envoyhttp.AUTO,
|
CodecType: envoyhttp.AUTO,
|
||||||
RouteSpecifier: &envoyhttp.HttpConnectionManager_RouteConfig{
|
HttpFilters: []*envoyhttp.HttpFilter{
|
||||||
|
&envoyhttp.HttpFilter{
|
||||||
|
Name: "envoy.router",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Tracing: &envoyhttp.HttpConnectionManager_Tracing{
|
||||||
|
OperationName: op,
|
||||||
|
// Don't trace any requests by default unless the client application
|
||||||
|
// explicitly propagates trace headers that indicate this should be
|
||||||
|
// sampled.
|
||||||
|
RandomSampling: &envoytype.Percent{Value: 0.0},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if useRDS {
|
||||||
|
if cluster != "" {
|
||||||
|
return envoylistener.Filter{}, fmt.Errorf("cannot specify cluster name when using RDS")
|
||||||
|
}
|
||||||
|
cfg.RouteSpecifier = &envoyhttp.HttpConnectionManager_Rds{
|
||||||
|
Rds: &envoyhttp.Rds{
|
||||||
|
RouteConfigName: filterName,
|
||||||
|
ConfigSource: envoycore.ConfigSource{
|
||||||
|
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
||||||
|
Ads: &envoycore.AggregatedConfigSource{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if cluster == "" {
|
||||||
|
return envoylistener.Filter{}, fmt.Errorf("must specify cluster name when not using RDS")
|
||||||
|
}
|
||||||
|
cfg.RouteSpecifier = &envoyhttp.HttpConnectionManager_RouteConfig{
|
||||||
RouteConfig: &envoy.RouteConfiguration{
|
RouteConfig: &envoy.RouteConfiguration{
|
||||||
Name: filterName,
|
Name: filterName,
|
||||||
VirtualHosts: []envoyroute.VirtualHost{
|
VirtualHosts: []envoyroute.VirtualHost{
|
||||||
@ -418,19 +502,7 @@ func makeHTTPFilter(filterName, cluster, statPrefix string, ingress, grpc, http2
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
}
|
||||||
HttpFilters: []*envoyhttp.HttpFilter{
|
|
||||||
&envoyhttp.HttpFilter{
|
|
||||||
Name: "envoy.router",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Tracing: &envoyhttp.HttpConnectionManager_Tracing{
|
|
||||||
OperationName: op,
|
|
||||||
// Don't trace any requests by default unless the client application
|
|
||||||
// explicitly propagates trace headers that indicate this should be
|
|
||||||
// sampled.
|
|
||||||
RandomSampling: &envoytype.Percent{Value: 0.0},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if http2 {
|
if http2 {
|
||||||
|
@ -57,3 +57,7 @@ func makeAddressPtr(ip string, port int) *envoycore.Address {
|
|||||||
func makeUint32Value(n int) *prototypes.UInt32Value {
|
func makeUint32Value(n int) *prototypes.UInt32Value {
|
||||||
return &prototypes.UInt32Value{Value: uint32(n)}
|
return &prototypes.UInt32Value{Value: uint32(n)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeBoolValue(n bool) *prototypes.BoolValue {
|
||||||
|
return &prototypes.BoolValue{Value: n}
|
||||||
|
}
|
||||||
|
@ -2,18 +2,298 @@ package xds
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
|
|
||||||
|
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||||
|
envoyroute "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
|
||||||
"github.com/hashicorp/consul/agent/proxycfg"
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// routesFromSnapshot returns the xDS API representation of the "routes"
|
// routesFromSnapshot returns the xDS API representation of the "routes" in the
|
||||||
// in the snapshot.
|
// snapshot.
|
||||||
func routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
||||||
if cfgSnap == nil {
|
if cfgSnap == nil {
|
||||||
return nil, errors.New("nil config given")
|
return nil, errors.New("nil config given")
|
||||||
}
|
}
|
||||||
// We don't support routes yet but probably will later
|
|
||||||
return nil, nil
|
switch cfgSnap.Kind {
|
||||||
|
case structs.ServiceKindConnectProxy:
|
||||||
|
return routesFromSnapshotConnectProxy(cfgSnap, token)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// routesFromSnapshotConnectProxy returns the xDS API representation of the
|
||||||
|
// "routes" in the snapshot.
|
||||||
|
func routesFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
||||||
|
if cfgSnap == nil {
|
||||||
|
return nil, errors.New("nil config given")
|
||||||
|
}
|
||||||
|
|
||||||
|
var resources []proto.Message
|
||||||
|
|
||||||
|
for _, u := range cfgSnap.Proxy.Upstreams {
|
||||||
|
upstreamID := u.Identifier()
|
||||||
|
|
||||||
|
var chain *structs.CompiledDiscoveryChain
|
||||||
|
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
|
||||||
|
chain = cfgSnap.ConnectProxy.DiscoveryChain[upstreamID]
|
||||||
|
}
|
||||||
|
|
||||||
|
if chain == nil || chain.IsDefault() {
|
||||||
|
// TODO(rb): make this do the old school stuff too
|
||||||
|
} else {
|
||||||
|
upstreamRoute, err := makeUpstreamRouteForDiscoveryChain(&u, chain, cfgSnap)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if upstreamRoute != nil {
|
||||||
|
resources = append(resources, upstreamRoute)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(rb): make sure we don't generate an empty result
|
||||||
|
|
||||||
|
return resources, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeUpstreamRouteForDiscoveryChain(
|
||||||
|
u *structs.Upstream,
|
||||||
|
chain *structs.CompiledDiscoveryChain,
|
||||||
|
cfgSnap *proxycfg.ConfigSnapshot,
|
||||||
|
) (*envoy.RouteConfiguration, error) {
|
||||||
|
upstreamID := u.Identifier()
|
||||||
|
routeName := upstreamID
|
||||||
|
|
||||||
|
var routes []envoyroute.Route
|
||||||
|
|
||||||
|
switch chain.Node.Type {
|
||||||
|
case structs.DiscoveryGraphNodeTypeRouter:
|
||||||
|
routes = make([]envoyroute.Route, 0, len(chain.Node.Routes))
|
||||||
|
|
||||||
|
for _, discoveryRoute := range chain.Node.Routes {
|
||||||
|
routeMatch := makeRouteMatchForDiscoveryRoute(discoveryRoute, chain.Protocol)
|
||||||
|
|
||||||
|
// TODO(rb): handle PrefixRewrite
|
||||||
|
// TODO(rb): handle RequestTimeout
|
||||||
|
// TODO(rb): handle Retries
|
||||||
|
|
||||||
|
var (
|
||||||
|
routeAction *envoyroute.Route_Route
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
next := discoveryRoute.DestinationNode
|
||||||
|
if next.Type == structs.DiscoveryGraphNodeTypeSplitter {
|
||||||
|
routeAction, err = makeRouteActionForSplitter(upstreamID, cfgSnap.Datacenter, next.Splits)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if next.Type == structs.DiscoveryGraphNodeTypeGroupResolver {
|
||||||
|
groupResolver := next.GroupResolver
|
||||||
|
routeAction = makeRouteActionForSingleCluster(upstreamID, cfgSnap.Datacenter, groupResolver.Target)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("unexpected graph node after route %q", next.Type)
|
||||||
|
}
|
||||||
|
|
||||||
|
routes = append(routes, envoyroute.Route{
|
||||||
|
Match: routeMatch,
|
||||||
|
Action: routeAction,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
case structs.DiscoveryGraphNodeTypeSplitter:
|
||||||
|
routeAction, err := makeRouteActionForSplitter(upstreamID, cfgSnap.Datacenter, chain.Node.Splits)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultRoute := envoyroute.Route{
|
||||||
|
Match: makeDefaultRouteMatch(),
|
||||||
|
Action: routeAction,
|
||||||
|
}
|
||||||
|
|
||||||
|
routes = []envoyroute.Route{defaultRoute}
|
||||||
|
|
||||||
|
case structs.DiscoveryGraphNodeTypeGroupResolver:
|
||||||
|
groupResolver := chain.Node.GroupResolver
|
||||||
|
|
||||||
|
routeAction := makeRouteActionForSingleCluster(upstreamID, cfgSnap.Datacenter, groupResolver.Target)
|
||||||
|
|
||||||
|
defaultRoute := envoyroute.Route{
|
||||||
|
Match: makeDefaultRouteMatch(),
|
||||||
|
Action: routeAction,
|
||||||
|
}
|
||||||
|
|
||||||
|
routes = []envoyroute.Route{defaultRoute}
|
||||||
|
|
||||||
|
default:
|
||||||
|
panic("unknown top node in discovery chain of type: " + chain.Node.Type)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &envoy.RouteConfiguration{
|
||||||
|
Name: routeName,
|
||||||
|
VirtualHosts: []envoyroute.VirtualHost{
|
||||||
|
envoyroute.VirtualHost{
|
||||||
|
Name: routeName,
|
||||||
|
Domains: []string{"*"},
|
||||||
|
Routes: routes,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
// ValidateClusters defaults to true when defined statically and false
|
||||||
|
// when done via RDS. Re-set the sane value of true to prevent
|
||||||
|
// null-routing traffic.
|
||||||
|
ValidateClusters: makeBoolValue(true),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeRouteMatchForDiscoveryRoute(discoveryRoute *structs.DiscoveryRoute, protocol string) envoyroute.RouteMatch {
|
||||||
|
switch protocol {
|
||||||
|
case "http", "http2":
|
||||||
|
// The only match stanza is HTTP.
|
||||||
|
default:
|
||||||
|
return makeDefaultRouteMatch()
|
||||||
|
}
|
||||||
|
|
||||||
|
match := discoveryRoute.Definition.Match
|
||||||
|
if match == nil || match.IsEmpty() {
|
||||||
|
return makeDefaultRouteMatch()
|
||||||
|
}
|
||||||
|
|
||||||
|
em := envoyroute.RouteMatch{}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case match.HTTP.PathExact != "":
|
||||||
|
em.PathSpecifier = &envoyroute.RouteMatch_Path{Path: "/"}
|
||||||
|
case match.HTTP.PathPrefix != "":
|
||||||
|
em.PathSpecifier = &envoyroute.RouteMatch_Prefix{Prefix: "/"}
|
||||||
|
case match.HTTP.PathRegex != "":
|
||||||
|
em.PathSpecifier = &envoyroute.RouteMatch_Regex{Regex: "/"}
|
||||||
|
default:
|
||||||
|
em.PathSpecifier = &envoyroute.RouteMatch_Prefix{Prefix: "/"}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(match.HTTP.Header) > 0 {
|
||||||
|
em.Headers = make([]*envoyroute.HeaderMatcher, 0, len(match.HTTP.Header))
|
||||||
|
for _, hdr := range match.HTTP.Header {
|
||||||
|
eh := &envoyroute.HeaderMatcher{
|
||||||
|
Name: hdr.Name,
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case hdr.Exact != "":
|
||||||
|
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_ExactMatch{
|
||||||
|
ExactMatch: hdr.Exact,
|
||||||
|
}
|
||||||
|
case hdr.Regex != "":
|
||||||
|
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_RegexMatch{
|
||||||
|
RegexMatch: hdr.Regex,
|
||||||
|
}
|
||||||
|
case hdr.Prefix != "":
|
||||||
|
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_PrefixMatch{
|
||||||
|
PrefixMatch: hdr.Prefix,
|
||||||
|
}
|
||||||
|
case hdr.Suffix != "":
|
||||||
|
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_SuffixMatch{
|
||||||
|
SuffixMatch: hdr.Suffix,
|
||||||
|
}
|
||||||
|
case hdr.Present:
|
||||||
|
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_PresentMatch{
|
||||||
|
PresentMatch: true,
|
||||||
|
}
|
||||||
|
case hdr.Invert: // THIS HAS TO BE LAST
|
||||||
|
eh.HeaderMatchSpecifier = &envoyroute.HeaderMatcher_PresentMatch{
|
||||||
|
// We set this to the misleading value of 'true' here
|
||||||
|
// because we'll generically invert it next.
|
||||||
|
PresentMatch: true,
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
continue // skip this impossible situation
|
||||||
|
}
|
||||||
|
|
||||||
|
if hdr.Invert {
|
||||||
|
eh.InvertMatch = true
|
||||||
|
}
|
||||||
|
|
||||||
|
em.Headers = append(em.Headers, eh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(match.HTTP.QueryParam) > 0 {
|
||||||
|
em.QueryParameters = make([]*envoyroute.QueryParameterMatcher, 0, len(match.HTTP.QueryParam))
|
||||||
|
for _, qm := range match.HTTP.QueryParam {
|
||||||
|
eq := &envoyroute.QueryParameterMatcher{
|
||||||
|
Name: qm.Name,
|
||||||
|
Value: qm.Value,
|
||||||
|
Regex: makeBoolValue(qm.Regex),
|
||||||
|
}
|
||||||
|
|
||||||
|
em.QueryParameters = append(em.QueryParameters, eq)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return em
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeDefaultRouteMatch() envoyroute.RouteMatch {
|
||||||
|
return envoyroute.RouteMatch{
|
||||||
|
PathSpecifier: &envoyroute.RouteMatch_Prefix{
|
||||||
|
Prefix: "/",
|
||||||
|
},
|
||||||
|
// TODO(banks) Envoy supports matching only valid GRPC
|
||||||
|
// requests which might be nice to add here for gRPC services
|
||||||
|
// but it's not supported in our current envoy SDK version
|
||||||
|
// although docs say it was supported by 1.8.0. Going to defer
|
||||||
|
// that until we've updated the deps.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeRouteActionForSingleCluster(upstreamID, currentDatacenter string, target structs.DiscoveryTarget) *envoyroute.Route_Route {
|
||||||
|
clusterName := makeClusterName(upstreamID, target, currentDatacenter)
|
||||||
|
|
||||||
|
return &envoyroute.Route_Route{
|
||||||
|
Route: &envoyroute.RouteAction{
|
||||||
|
ClusterSpecifier: &envoyroute.RouteAction_Cluster{
|
||||||
|
Cluster: clusterName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeRouteActionForSplitter(upstreamID, currentDatacenter string, splits []*structs.DiscoverySplit) (*envoyroute.Route_Route, error) {
|
||||||
|
clusters := make([]*envoyroute.WeightedCluster_ClusterWeight, 0, len(splits))
|
||||||
|
for _, split := range splits {
|
||||||
|
if split.Node.Type != structs.DiscoveryGraphNodeTypeGroupResolver {
|
||||||
|
return nil, fmt.Errorf("unexpected splitter destination node type: %s", split.Node.Type)
|
||||||
|
}
|
||||||
|
groupResolver := split.Node.GroupResolver
|
||||||
|
target := groupResolver.Target
|
||||||
|
clusterName := makeClusterName(upstreamID, target, currentDatacenter)
|
||||||
|
|
||||||
|
// TODO(rb): scale up by 100 and adjust total weight
|
||||||
|
cw := &envoyroute.WeightedCluster_ClusterWeight{
|
||||||
|
Weight: makeUint32Value(int(split.Weight)),
|
||||||
|
Name: clusterName,
|
||||||
|
}
|
||||||
|
|
||||||
|
clusters = append(clusters, cw)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &envoyroute.Route_Route{
|
||||||
|
Route: &envoyroute.RouteAction{
|
||||||
|
ClusterSpecifier: &envoyroute.RouteAction_WeightedClusters{
|
||||||
|
WeightedClusters: &envoyroute.WeightedCluster{
|
||||||
|
Clusters: clusters,
|
||||||
|
TotalWeight: makeUint32Value(100),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
1
test/integration/connect/envoy/.gitignore
vendored
Normal file
1
test/integration/connect/envoy/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
/workdir
|
@ -111,10 +111,23 @@ function get_envoy_stats_flush_interval {
|
|||||||
echo "$output" | jq --raw-output '.configs[0].bootstrap.stats_flush_interval'
|
echo "$output" | jq --raw-output '.configs[0].bootstrap.stats_flush_interval'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# snapshot_envoy_admin is meant to be used from a teardown scriptlet from the host.
|
||||||
|
function snapshot_envoy_admin {
|
||||||
|
local HOSTPORT=$1
|
||||||
|
local ENVOY_NAME=$2
|
||||||
|
|
||||||
|
docker_wget "http://${HOSTPORT}/config_dump" -q -O - > "./workdir/envoy/${ENVOY_NAME}-config_dump.json"
|
||||||
|
docker_wget "http://${HOSTPORT}/clusters" -q -O - > "./workdir/envoy/${ENVOY_NAME}-clusters.out"
|
||||||
|
}
|
||||||
|
|
||||||
function docker_consul {
|
function docker_consul {
|
||||||
docker run -ti --rm --network container:envoy_consul_1 consul-dev $@
|
docker run -ti --rm --network container:envoy_consul_1 consul-dev $@
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function docker_wget {
|
||||||
|
docker run -ti --rm --network container:envoy_consul_1 alpine:3.9 wget $@
|
||||||
|
}
|
||||||
|
|
||||||
function must_match_in_statsd_logs {
|
function must_match_in_statsd_logs {
|
||||||
run cat /workdir/statsd/statsd.log
|
run cat /workdir/statsd/statsd.log
|
||||||
COUNT=$( echo "$output" | grep -Ec $1 )
|
COUNT=$( echo "$output" | grep -Ec $1 )
|
||||||
|
Loading…
x
Reference in New Issue
Block a user