mirror of https://github.com/status-im/consul.git
Add method for downstreams from disco chain
This commit is contained in:
parent
0174b1c7c6
commit
a86cf88a4a
|
@ -1,13 +1,11 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -53,39 +51,16 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs
|
|||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name, entMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, config, err := state.CAConfig(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if config == nil {
|
||||
return errors.New("no cluster ca config setup")
|
||||
}
|
||||
|
||||
// Build TrustDomain based on the ClusterID stored.
|
||||
signingID := connect.SpiffeIDSigningForCluster(config)
|
||||
if signingID == nil {
|
||||
// If CA is bootstrapped at all then this should never happen but be
|
||||
// defensive.
|
||||
return errors.New("no cluster trust domain setup")
|
||||
}
|
||||
currentTrustDomain := signingID.Host()
|
||||
|
||||
// Then we compile it into something useful.
|
||||
chain, err := discoverychain.Compile(discoverychain.CompileRequest{
|
||||
req := discoverychain.CompileRequest{
|
||||
ServiceName: args.Name,
|
||||
EvaluateInNamespace: entMeta.NamespaceOrDefault(),
|
||||
EvaluateInDatacenter: evalDC,
|
||||
EvaluateInTrustDomain: currentTrustDomain,
|
||||
UseInDatacenter: c.srv.config.Datacenter,
|
||||
OverrideMeshGateway: args.OverrideMeshGateway,
|
||||
OverrideProtocol: args.OverrideProtocol,
|
||||
OverrideConnectTimeout: args.OverrideConnectTimeout,
|
||||
Entries: entries,
|
||||
})
|
||||
}
|
||||
index, chain, err := state.ServiceDiscoveryChain(ws, args.Name, entMeta, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
|
@ -371,6 +373,72 @@ var serviceGraphKinds = []string{
|
|||
structs.ServiceResolver,
|
||||
}
|
||||
|
||||
// sourcesForTarget will return a list of services whose discovery chains have the input service as a target
|
||||
func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
|
||||
destination := structs.NewServiceName(service, entMeta)
|
||||
queue := []structs.ServiceName{destination}
|
||||
|
||||
seenLink := make(map[structs.ServiceName]bool)
|
||||
for len(queue) > 0 {
|
||||
// The "link" index returns config entries that reference a service
|
||||
iter, err := tx.Get(configTableName, "link", queue[0].ToServiceID())
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
ws.Add(iter.WatchCh())
|
||||
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
entry := raw.(structs.ConfigEntry)
|
||||
|
||||
sn := structs.NewServiceName(entry.GetName(), entry.GetEnterpriseMeta())
|
||||
if !seenLink[sn] {
|
||||
seenLink[sn] = true
|
||||
queue = append(queue, sn)
|
||||
}
|
||||
}
|
||||
queue = queue[1:]
|
||||
}
|
||||
|
||||
var (
|
||||
maxIdx uint64
|
||||
resp []structs.ServiceName
|
||||
)
|
||||
|
||||
// Only return the services that directly target the destination
|
||||
seenSource := make(map[structs.ServiceName]bool)
|
||||
for sn, _ := range seenLink {
|
||||
req := discoverychain.CompileRequest{
|
||||
ServiceName: sn.Name,
|
||||
EvaluateInNamespace: sn.NamespaceOrDefault(),
|
||||
|
||||
// TODO(freddy) : Should these be anything other than the known DC?
|
||||
EvaluateInDatacenter: dc,
|
||||
UseInDatacenter: dc,
|
||||
}
|
||||
idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, entMeta, req)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err)
|
||||
}
|
||||
|
||||
for _, t := range chain.Targets {
|
||||
em := structs.EnterpriseMetaInitializer(t.Namespace)
|
||||
candidate := structs.NewServiceName(t.Service, &em)
|
||||
|
||||
if !candidate.Matches(&destination) {
|
||||
continue
|
||||
}
|
||||
if idx > maxIdx {
|
||||
maxIdx = idx
|
||||
}
|
||||
if !seenSource[sn] {
|
||||
seenSource[sn] = true
|
||||
resp = append(resp, sn)
|
||||
}
|
||||
}
|
||||
}
|
||||
return maxIdx, resp, nil
|
||||
}
|
||||
|
||||
func validateProposedConfigEntryInServiceGraph(
|
||||
tx ReadTxn,
|
||||
kind, name string,
|
||||
|
@ -555,6 +623,44 @@ func testCompileDiscoveryChain(
|
|||
return chain.Protocol, chain.Nodes[chain.StartNode], nil
|
||||
}
|
||||
|
||||
func (s *Store) ServiceDiscoveryChain(
|
||||
ws memdb.WatchSet,
|
||||
serviceName string,
|
||||
entMeta *structs.EnterpriseMeta,
|
||||
req discoverychain.CompileRequest,
|
||||
) (uint64, *structs.CompiledDiscoveryChain, error) {
|
||||
|
||||
index, entries, err := s.readDiscoveryChainConfigEntries(ws, serviceName, nil, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
req.Entries = entries
|
||||
|
||||
_, config, err := s.CAConfig(ws)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
} else if config == nil {
|
||||
return 0, nil, errors.New("no cluster ca config setup")
|
||||
}
|
||||
|
||||
// Build TrustDomain based on the ClusterID stored.
|
||||
signingID := connect.SpiffeIDSigningForCluster(config)
|
||||
if signingID == nil {
|
||||
// If CA is bootstrapped at all then this should never happen but be
|
||||
// defensive.
|
||||
return 0, nil, errors.New("no cluster trust domain setup")
|
||||
}
|
||||
req.EvaluateInTrustDomain = signingID.Host()
|
||||
|
||||
// Then we compile it into something useful.
|
||||
chain, err := discoverychain.Compile(req)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to compile discovery chain: %v", err)
|
||||
}
|
||||
|
||||
return index, chain, nil
|
||||
}
|
||||
|
||||
// ReadDiscoveryChainConfigEntries will query for the full discovery chain for
|
||||
// the provided service name. All relevant config entries will be recursively
|
||||
// fetched and included in the result.
|
||||
|
|
|
@ -1246,7 +1246,7 @@ func TestStore_ReadDiscoveryChainConfigEntries_SubsetSplit(t *testing.T) {
|
|||
require.NoError(t, s.EnsureConfigEntry(0, entry, nil))
|
||||
}
|
||||
|
||||
_, entrySet, err := s.ReadDiscoveryChainConfigEntries(nil, "main", nil)
|
||||
_, entrySet, err := s.readDiscoveryChainConfigEntries(nil, "main", nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, entrySet.Routers, 0)
|
||||
|
@ -1452,3 +1452,272 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
|
|||
require.NoError(t, s.DeleteConfigEntry(5, structs.IngressGateway, "gateway", nil))
|
||||
})
|
||||
}
|
||||
|
||||
func TestSourcesForTarget(t *testing.T) {
|
||||
defaultMeta := *structs.DefaultEnterpriseMeta()
|
||||
|
||||
type expect struct {
|
||||
idx uint64
|
||||
ids []structs.ServiceName
|
||||
}
|
||||
tt := []struct {
|
||||
name string
|
||||
entries []structs.ConfigEntry
|
||||
expect expect
|
||||
}{
|
||||
{
|
||||
name: "from route match",
|
||||
entries: []structs.ConfigEntry{
|
||||
&structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
},
|
||||
&structs.ServiceRouterConfigEntry{
|
||||
Kind: structs.ServiceRouter,
|
||||
Name: "web",
|
||||
Routes: []structs.ServiceRoute{
|
||||
{
|
||||
Match: &structs.ServiceRouteMatch{
|
||||
HTTP: &structs.ServiceRouteHTTPMatch{
|
||||
PathExact: "/sink",
|
||||
},
|
||||
},
|
||||
Destination: &structs.ServiceRouteDestination{
|
||||
Service: "sink",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: expect{
|
||||
idx: 2,
|
||||
ids: []structs.ServiceName{
|
||||
{Name: "web", EnterpriseMeta: defaultMeta},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "from redirect",
|
||||
entries: []structs.ConfigEntry{
|
||||
&structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
},
|
||||
&structs.ServiceResolverConfigEntry{
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: "web",
|
||||
Redirect: &structs.ServiceResolverRedirect{
|
||||
Service: "sink",
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: expect{
|
||||
idx: 2,
|
||||
ids: []structs.ServiceName{
|
||||
{Name: "web", EnterpriseMeta: defaultMeta},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "from failover",
|
||||
entries: []structs.ConfigEntry{
|
||||
&structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
},
|
||||
&structs.ServiceResolverConfigEntry{
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: "web",
|
||||
Failover: map[string]structs.ServiceResolverFailover{
|
||||
"*": {
|
||||
Service: "sink",
|
||||
Datacenters: []string{"dc2", "dc3"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: expect{
|
||||
idx: 2,
|
||||
ids: []structs.ServiceName{
|
||||
{Name: "web", EnterpriseMeta: defaultMeta},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "from splitter",
|
||||
entries: []structs.ConfigEntry{
|
||||
&structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
},
|
||||
&structs.ServiceSplitterConfigEntry{
|
||||
Kind: structs.ServiceSplitter,
|
||||
Name: "web",
|
||||
Splits: []structs.ServiceSplit{
|
||||
{Weight: 90, Service: "web"},
|
||||
{Weight: 10, Service: "sink"},
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: expect{
|
||||
idx: 2,
|
||||
ids: []structs.ServiceName{
|
||||
{Name: "web", EnterpriseMeta: defaultMeta},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "chained route redirect",
|
||||
entries: []structs.ConfigEntry{
|
||||
&structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
},
|
||||
&structs.ServiceRouterConfigEntry{
|
||||
Kind: structs.ServiceRouter,
|
||||
Name: "source",
|
||||
Routes: []structs.ServiceRoute{
|
||||
{
|
||||
Match: &structs.ServiceRouteMatch{
|
||||
HTTP: &structs.ServiceRouteHTTPMatch{
|
||||
PathExact: "/route",
|
||||
},
|
||||
},
|
||||
Destination: &structs.ServiceRouteDestination{
|
||||
Service: "routed",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.ServiceResolverConfigEntry{
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: "routed",
|
||||
Redirect: &structs.ServiceResolverRedirect{
|
||||
Service: "sink",
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: expect{
|
||||
idx: 3,
|
||||
ids: []structs.ServiceName{
|
||||
{Name: "source", EnterpriseMeta: defaultMeta},
|
||||
{Name: "routed", EnterpriseMeta: defaultMeta},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "kitchen sink with multiple services referencing sink directly",
|
||||
entries: []structs.ConfigEntry{
|
||||
&structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
},
|
||||
&structs.ServiceRouterConfigEntry{
|
||||
Kind: structs.ServiceRouter,
|
||||
Name: "routed",
|
||||
Routes: []structs.ServiceRoute{
|
||||
{
|
||||
Match: &structs.ServiceRouteMatch{
|
||||
HTTP: &structs.ServiceRouteHTTPMatch{
|
||||
PathExact: "/sink",
|
||||
},
|
||||
},
|
||||
Destination: &structs.ServiceRouteDestination{
|
||||
Service: "sink",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.ServiceResolverConfigEntry{
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: "redirected",
|
||||
Redirect: &structs.ServiceResolverRedirect{
|
||||
Service: "sink",
|
||||
},
|
||||
},
|
||||
&structs.ServiceResolverConfigEntry{
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: "failed-over",
|
||||
Failover: map[string]structs.ServiceResolverFailover{
|
||||
"*": {
|
||||
Service: "sink",
|
||||
Datacenters: []string{"dc2", "dc3"},
|
||||
},
|
||||
},
|
||||
},
|
||||
&structs.ServiceSplitterConfigEntry{
|
||||
Kind: structs.ServiceSplitter,
|
||||
Name: "split",
|
||||
Splits: []structs.ServiceSplit{
|
||||
{Weight: 90, Service: "no-op"},
|
||||
{Weight: 10, Service: "sink"},
|
||||
},
|
||||
},
|
||||
&structs.ServiceSplitterConfigEntry{
|
||||
Kind: structs.ServiceSplitter,
|
||||
Name: "unrelated",
|
||||
Splits: []structs.ServiceSplit{
|
||||
{Weight: 90, Service: "zip"},
|
||||
{Weight: 10, Service: "zop"},
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: expect{
|
||||
idx: 6,
|
||||
ids: []structs.ServiceName{
|
||||
{Name: "split", EnterpriseMeta: defaultMeta},
|
||||
{Name: "failed-over", EnterpriseMeta: defaultMeta},
|
||||
{Name: "redirected", EnterpriseMeta: defaultMeta},
|
||||
{Name: "routed", EnterpriseMeta: defaultMeta},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tt {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
ws := memdb.NewWatchSet()
|
||||
|
||||
ca := &structs.CAConfiguration{
|
||||
Provider: "consul",
|
||||
}
|
||||
err := s.CASetConfig(0, ca)
|
||||
require.NoError(t, err)
|
||||
|
||||
var i uint64 = 1
|
||||
for _, entry := range tc.entries {
|
||||
require.NoError(t, entry.Normalize())
|
||||
require.NoError(t, s.EnsureConfigEntry(i, entry, nil))
|
||||
i++
|
||||
}
|
||||
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
idx, ids, err := s.sourcesForTarget(ws, tx, "dc1", "sink", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, tc.expect.idx, idx)
|
||||
require.ElementsMatch(t, tc.expect.ids, ids)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue