Single DB txn for ServiceTopology and other PR comments

This commit is contained in:
freddygv 2020-10-01 18:10:49 -06:00
parent 7c26a71b4b
commit cf7b7fcdd6
7 changed files with 289 additions and 140 deletions

View File

@ -1410,7 +1410,7 @@ func (f *aclFilter) filterCheckServiceNodes(nodes *structs.CheckServiceNodes) {
} }
// filterServiceTopology is used to filter upstreams/downstreams based on ACL rules. // filterServiceTopology is used to filter upstreams/downstreams based on ACL rules.
// this filter is unlike other in that it also returns whether the result was filtered by ACLs // this filter is unlike others in that it also returns whether the result was filtered by ACLs
func (f *aclFilter) filterServiceTopology(topology *structs.ServiceTopology) bool { func (f *aclFilter) filterServiceTopology(topology *structs.ServiceTopology) bool {
numUp := len(topology.Upstreams) numUp := len(topology.Upstreams)
numDown := len(topology.Downstreams) numDown := len(topology.Downstreams)
@ -1418,10 +1418,7 @@ func (f *aclFilter) filterServiceTopology(topology *structs.ServiceTopology) boo
f.filterCheckServiceNodes(&topology.Upstreams) f.filterCheckServiceNodes(&topology.Upstreams)
f.filterCheckServiceNodes(&topology.Downstreams) f.filterCheckServiceNodes(&topology.Downstreams)
if numUp != len(topology.Upstreams) || numDown != len(topology.Downstreams) { return numUp != len(topology.Upstreams) || numDown != len(topology.Downstreams)
return true
}
return false
} }
// filterDatacenterCheckServiceNodes is used to filter nodes based on ACL rules. // filterDatacenterCheckServiceNodes is used to filter nodes based on ACL rules.

View File

@ -830,7 +830,6 @@ func ensureServiceTxn(tx *txn, idx uint64, node string, preserveIndexes bool, sv
return fmt.Errorf("failed updating gateway mapping: %s", err) return fmt.Errorf("failed updating gateway mapping: %s", err)
} }
// Update upstream/downstream mappings if it's a connect service // Update upstream/downstream mappings if it's a connect service
// TODO (freddy) What to do about Connect native services that don't define upstreams?
if svc.Kind == structs.ServiceKindConnectProxy { if svc.Kind == structs.ServiceKindConnectProxy {
if err = updateMeshTopology(tx, idx, node, svc, existing); err != nil { if err = updateMeshTopology(tx, idx, node, svc, existing); err != nil {
return fmt.Errorf("failed updating upstream/downstream association") return fmt.Errorf("failed updating upstream/downstream association")
@ -1541,7 +1540,7 @@ func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string
return err return err
} }
if err := cleanupMeshTopology(tx, idx, svc); err != nil { if err := cleanupMeshTopology(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err) return fmt.Errorf("failed to clean up mesh-topology associations for %q: %v", name.String(), err)
} }
if _, remainingService, err := firstWatchWithTxn(tx, "services", "service", svc.ServiceName, entMeta); err == nil { if _, remainingService, err := firstWatchWithTxn(tx, "services", "service", svc.ServiceName, entMeta); err == nil {
@ -2914,12 +2913,22 @@ func (s *Store) ServiceTopology(
dc, service string, dc, service string,
entMeta *structs.EnterpriseMeta, entMeta *structs.EnterpriseMeta,
) (uint64, *structs.ServiceTopology, error) { ) (uint64, *structs.ServiceTopology, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
var ( var (
maxIdx uint64 maxIdx uint64
sn = structs.NewServiceName(service, entMeta) sn = structs.NewServiceName(service, entMeta)
) )
idx, upstreamNames, err := s.UpstreamsForService(ws, dc, sn)
idx, upstreamNames, err := s.upstreamsForServiceTxn(tx, ws, dc, sn)
if err != nil {
return 0, nil, err
}
if idx > maxIdx {
maxIdx = idx
}
idx, upstreams, err := s.combinedServiceNodesTxn(tx, ws, upstreamNames)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err) return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err)
} }
@ -2927,29 +2936,14 @@ func (s *Store) ServiceTopology(
maxIdx = idx maxIdx = idx
} }
var upstreams structs.CheckServiceNodes idx, downstreamNames, err := s.downstreamsForServiceTxn(tx, ws, dc, sn)
for _, u := range upstreamNames { if err != nil {
// Collect both typical and connect endpoints, this allows aggregating check statuses across both return 0, nil, err
idx, csn, err := s.CheckServiceNodes(ws, u.Name, &u.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get upstream nodes for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
upstreams = append(upstreams, csn...)
idx, csn, err = s.CheckConnectServiceNodes(ws, u.Name, &u.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get upstream connect nodes for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
upstreams = append(upstreams, csn...)
} }
if idx > maxIdx {
idx, downstreamNames, err := s.DownstreamsForService(ws, dc, sn) maxIdx = idx
}
idx, downstreams, err := s.combinedServiceNodesTxn(tx, ws, downstreamNames)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err) return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err)
} }
@ -2957,43 +2951,49 @@ func (s *Store) ServiceTopology(
maxIdx = idx maxIdx = idx
} }
var downstreams structs.CheckServiceNodes
for _, u := range downstreamNames {
// Collect both typical and connect endpoints, this allows aggregating check statuses across both
idx, csn, err := s.CheckServiceNodes(ws, u.Name, &u.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstream nodes for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
downstreams = append(downstreams, csn...)
idx, csn, err = s.CheckConnectServiceNodes(ws, u.Name, &u.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstream connect nodes for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
downstreams = append(downstreams, csn...)
}
resp := &structs.ServiceTopology{ resp := &structs.ServiceTopology{
Upstreams: upstreams, Upstreams: upstreams,
Downstreams: downstreams, Downstreams: downstreams,
} }
return 0, resp, nil return maxIdx, resp, nil
} }
// UpstreamsForService will find all upstream services that the input could route traffic to. // combinedServiceNodesTxn returns typical and connect endpoints for a list of services.
// This enabled aggregating checks statuses across both.
func (s *Store) combinedServiceNodesTxn(tx *txn, ws memdb.WatchSet, names []structs.ServiceName) (uint64, structs.CheckServiceNodes, error) {
var (
maxIdx uint64
resp structs.CheckServiceNodes
)
for _, u := range names {
// Collect typical then connect instances
idx, csn, err := checkServiceNodesTxn(tx, ws, u.Name, false, &u.EnterpriseMeta)
if err != nil {
return 0, nil, err
}
if idx > maxIdx {
maxIdx = idx
}
resp = append(resp, csn...)
idx, csn, err = checkServiceNodesTxn(tx, ws, u.Name, true, &u.EnterpriseMeta)
if err != nil {
return 0, nil, err
}
if idx > maxIdx {
maxIdx = idx
}
resp = append(resp, csn...)
}
return maxIdx, resp, nil
}
// upstreamsForServiceTxn will find all upstream services that the input could route traffic to.
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams. // There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
// TODO (freddy): Account for ingress gateways // TODO (freddy): Account for ingress gateways
func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { // TODO (freddy): Account for multi-dc upstreams
tx := s.db.ReadTxn() func (s *Store) upstreamsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, dc string, sn structs.ServiceName) (uint64, []structs.ServiceName, error) {
defer tx.Abort() idx, upstreams, err := upstreamsFromRegistrationTxn(tx, ws, sn)
idx, upstreams, err := upstreamsFromRegistration(ws, tx, sn)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to get registration upstreams for %q: %v", sn.String(), err) return 0, nil, fmt.Errorf("failed to get registration upstreams for %q: %v", sn.String(), err)
} }
@ -3009,7 +3009,7 @@ func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.Ser
) )
for _, u := range upstreams { for _, u := range upstreams {
// Evaluate the targets from the upstream's discovery chain // Evaluate the targets from the upstream's discovery chain
idx, targets, err := s.discoveryChainTargets(ws, dc, u.Name, &u.EnterpriseMeta) idx, targets, err := s.discoveryChainTargetsTxn(tx, ws, dc, u.Name, &u.EnterpriseMeta)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to get discovery chain targets for %q: %v", u.String(), err) return 0, nil, fmt.Errorf("failed to get discovery chain targets for %q: %v", u.String(), err)
} }
@ -3030,15 +3030,12 @@ func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.Ser
return maxIdx, resp, nil return maxIdx, resp, nil
} }
// DownstreamsForService will find all downstream services that could route traffic to the input service. // downstreamsForServiceTxn will find all downstream services that could route traffic to the input service.
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams. // There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
// TODO (freddy): Account for ingress gateways // TODO (freddy): Account for ingress gateways
func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service structs.ServiceName) (uint64, []structs.ServiceName, error) { func (s *Store) downstreamsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, dc string, service structs.ServiceName) (uint64, []structs.ServiceName, error) {
tx := s.db.ReadTxn() // First fetch services that have discovery chains that eventually route to the target service
defer tx.Abort() idx, sources, err := s.discoveryChainSourcesTxn(tx, ws, dc, service)
// First fetch services with discovery chains that list the input as a target
idx, sources, err := s.discoveryChainSources(ws, tx, dc, service)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", service.String(), err) return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", service.String(), err)
} }
@ -3053,8 +3050,8 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service stru
seen = make(map[structs.ServiceName]bool) seen = make(map[structs.ServiceName]bool)
) )
for _, s := range sources { for _, s := range sources {
// We then follow these discovery chain sources one level down to the services defining them as an upstream. // We then follow these sources one level down to the services defining them as an upstream.
idx, downstreams, err := downstreamsFromRegistration(ws, tx, s) idx, downstreams, err := downstreamsFromRegistrationTxn(tx, ws, s)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to get registration downstreams for %q: %v", s.String(), err) return 0, nil, fmt.Errorf("failed to get registration downstreams for %q: %v", s.String(), err)
} }
@ -3068,35 +3065,20 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service stru
} }
} }
} }
// Also append services that directly listed the input as an upstream
idx, downstreams, err := downstreamsFromRegistration(ws, tx, service)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", service.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
for _, d := range downstreams {
if !seen[d] {
resp = append(resp, d)
seen[d] = true
}
}
return maxIdx, resp, nil return maxIdx, resp, nil
} }
// upstreamsFromRegistration returns the ServiceNames of the upstreams defined across instances of the input // upstreamsFromRegistrationTxn returns the ServiceNames of the upstreams defined across instances of the input
func upstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { func upstreamsFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, sn structs.ServiceName) (uint64, []structs.ServiceName, error) {
return linkedFromRegistration(ws, tx, sn, false) return linkedFromRegistrationTxn(tx, ws, sn, false)
} }
// downstreamsFromRegistration returns the ServiceNames of downstream services based on registrations across instances of the input // downstreamsFromRegistrationTxn returns the ServiceNames of downstream services based on registrations across instances of the input
func downstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { func downstreamsFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, sn structs.ServiceName) (uint64, []structs.ServiceName, error) {
return linkedFromRegistration(ws, tx, sn, true) return linkedFromRegistrationTxn(tx, ws, sn, true)
} }
func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, service structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) { func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) {
// To fetch upstreams we query services that have the input listed as a downstream // To fetch upstreams we query services that have the input listed as a downstream
// To fetch downstreams we query services that have the input listed as an upstream // To fetch downstreams we query services that have the input listed as an upstream
index := "downstream" index := "downstream"
@ -3177,7 +3159,7 @@ func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeServi
if !ok { if !ok {
return fmt.Errorf("unexpected topology type %T", rawCopy) return fmt.Errorf("unexpected topology type %T", rawCopy)
} }
mapping.Refs[uid] = true mapping.Refs[uid] = struct{}{}
mapping.ModifyIndex = idx mapping.ModifyIndex = idx
inserted[upstream] = true inserted[upstream] = true
@ -3186,7 +3168,7 @@ func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeServi
mapping = &structs.UpstreamDownstream{ mapping = &structs.UpstreamDownstream{
Upstream: upstream, Upstream: upstream,
Downstream: downstream, Downstream: downstream,
Refs: map[string]bool{uid: true}, Refs: map[string]struct{}{uid: {}},
RaftIndex: structs.RaftIndex{ RaftIndex: structs.RaftIndex{
CreateIndex: idx, CreateIndex: idx,
ModifyIndex: idx, ModifyIndex: idx,

View File

@ -6136,7 +6136,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
// Watch should fire since the admin <-> web-proxy pairing was inserted into the topology table // Watch should fire since the admin <-> web-proxy pairing was inserted into the topology table
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
tx := s.db.ReadTxn() tx := s.db.ReadTxn()
idx, names, err := downstreamsFromRegistration(ws, tx, admin) idx, names, err := downstreamsFromRegistrationTxn(tx, ws, admin)
require.NoError(t, err) require.NoError(t, err)
assert.Zero(t, idx) assert.Zero(t, idx)
assert.Len(t, names, 0) assert.Len(t, names, 0)
@ -6165,7 +6165,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
tx = s.db.ReadTxn() tx = s.db.ReadTxn()
idx, names, err = downstreamsFromRegistration(ws, tx, admin) idx, names, err = downstreamsFromRegistrationTxn(tx, ws, admin)
require.NoError(t, err) require.NoError(t, err)
exp := expect{ exp := expect{
@ -6194,7 +6194,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
tx = s.db.ReadTxn() tx = s.db.ReadTxn()
idx, _, err = downstreamsFromRegistration(ws, tx, admin) idx, _, err = downstreamsFromRegistrationTxn(tx, ws, admin)
require.NoError(t, err) require.NoError(t, err)
exp = expect{ exp = expect{
@ -6207,7 +6207,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
// Should still be able to get downstream for one of the other upstreams // Should still be able to get downstream for one of the other upstreams
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
tx = s.db.ReadTxn() tx = s.db.ReadTxn()
idx, names, err = downstreamsFromRegistration(ws, tx, cache) idx, names, err = downstreamsFromRegistrationTxn(tx, ws, cache)
require.NoError(t, err) require.NoError(t, err)
exp = expect{ exp = expect{
@ -6225,7 +6225,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
tx = s.db.ReadTxn() tx = s.db.ReadTxn()
idx, _, err = downstreamsFromRegistration(ws, tx, cache) idx, _, err = downstreamsFromRegistrationTxn(tx, ws, cache)
require.NoError(t, err) require.NoError(t, err)
@ -6354,7 +6354,7 @@ func TestCatalog_catalogDownstreams(t *testing.T) {
} }
tx := s.db.ReadTxn() tx := s.db.ReadTxn()
idx, names, err := downstreamsFromRegistration(ws, tx, structs.NewServiceName("admin", structs.DefaultEnterpriseMeta())) idx, names, err := downstreamsFromRegistrationTxn(tx, ws, structs.NewServiceName("admin", structs.DefaultEnterpriseMeta()))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx) require.Equal(t, tc.expect.idx, idx)
@ -6529,7 +6529,7 @@ func TestCatalog_upstreamsFromRegistration(t *testing.T) {
} }
tx := s.db.ReadTxn() tx := s.db.ReadTxn()
idx, names, err := upstreamsFromRegistration(ws, tx, structs.NewServiceName("api", structs.DefaultEnterpriseMeta())) idx, names, err := upstreamsFromRegistrationTxn(tx, ws, structs.NewServiceName("api", structs.DefaultEnterpriseMeta()))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx) require.Equal(t, tc.expect.idx, idx)
@ -6556,7 +6556,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
tx := s.db.ReadTxn() tx := s.db.ReadTxn()
idx, names, err := upstreamsFromRegistration(ws, tx, web) idx, names, err := upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err) require.NoError(t, err)
assert.Zero(t, idx) assert.Zero(t, idx)
assert.Len(t, names, 0) assert.Len(t, names, 0)
@ -6586,7 +6586,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
tx = s.db.ReadTxn() tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistration(ws, tx, web) idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err) require.NoError(t, err)
exp := expect{ exp := expect{
@ -6613,7 +6613,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
tx = s.db.ReadTxn() tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistration(ws, tx, web) idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err) require.NoError(t, err)
exp = expect{ exp = expect{
@ -6655,7 +6655,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
tx = s.db.ReadTxn() tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistration(ws, tx, web) idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err) require.NoError(t, err)
exp = expect{ exp = expect{
@ -6676,7 +6676,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
tx = s.db.ReadTxn() tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistration(ws, tx, web) idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err) require.NoError(t, err)
exp = expect{ exp = expect{
@ -6696,7 +6696,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
tx = s.db.ReadTxn() tx = s.db.ReadTxn()
idx, _, err = upstreamsFromRegistration(ws, tx, web) idx, _, err = upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err) require.NoError(t, err)
@ -6860,9 +6860,12 @@ func TestCatalog_UpstreamsForService(t *testing.T) {
i++ i++
} }
tx := s.db.ReadTxn()
defer tx.Abort()
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta()) sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta())
idx, names, err := s.UpstreamsForService(ws, "dc1", sn) idx, names, err := s.upstreamsForServiceTxn(tx, ws, "dc1", sn)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx) require.Equal(t, tc.expect.idx, idx)
@ -6957,9 +6960,9 @@ func TestCatalog_DownstreamsForService(t *testing.T) {
expect: expect{ expect: expect{
idx: 4, idx: 4,
names: []structs.ServiceName{ names: []structs.ServiceName{
// get web from old-admin routing to admin and web listing old-admin as an upstream // get web from listing admin directly as an upstream
{Name: "web", EnterpriseMeta: *defaultMeta}, {Name: "web", EnterpriseMeta: *defaultMeta},
// get api from listing admin directly as an upstream // get api from old-admin routing to admin and web listing old-admin as an upstream
{Name: "api", EnterpriseMeta: *defaultMeta}, {Name: "api", EnterpriseMeta: *defaultMeta},
}, },
}, },
@ -6993,9 +6996,12 @@ func TestCatalog_DownstreamsForService(t *testing.T) {
i++ i++
} }
tx := s.db.ReadTxn()
defer tx.Abort()
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
sn := structs.NewServiceName("admin", structs.DefaultEnterpriseMeta()) sn := structs.NewServiceName("admin", structs.DefaultEnterpriseMeta())
idx, names, err := s.DownstreamsForService(ws, "dc1", sn) idx, names, err := s.downstreamsForServiceTxn(tx, ws, "dc1", sn)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx) require.Equal(t, tc.expect.idx, idx)
@ -7003,3 +7009,130 @@ func TestCatalog_DownstreamsForService(t *testing.T) {
}) })
} }
} }
func TestCatalog_DownstreamsForService_Updates(t *testing.T) {
var (
defaultMeta = structs.DefaultEnterpriseMeta()
target = structs.NewServiceName("admin", defaultMeta)
)
s := testStateStore(t)
ca := &structs.CAConfiguration{
Provider: "consul",
}
err := s.CASetConfig(1, ca)
require.NoError(t, err)
require.NoError(t, s.EnsureNode(2, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
// Register a service with our target as an upstream, and it should show up as a downstream
web := structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(3, "foo", &web))
ws := memdb.NewWatchSet()
tx := s.db.ReadTxn()
idx, names, err := s.downstreamsForServiceTxn(tx, ws, "dc1", target)
require.NoError(t, err)
tx.Abort()
expect := []structs.ServiceName{
{Name: "web", EnterpriseMeta: *defaultMeta},
}
require.Equal(t, uint64(3), idx)
require.ElementsMatch(t, expect, names)
// Register a service WITHOUT our target as an upstream, and the watch should not fire
api := structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "old-admin",
},
},
},
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(4, "foo", &api))
require.False(t, watchFired(ws))
// Update the routing so that api's upstream routes to our target and watches should fire
defaults := structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
}
require.NoError(t, defaults.Normalize())
require.NoError(t, s.EnsureConfigEntry(5, &defaults, nil))
router := structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "old-admin",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/v2",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "admin",
},
},
},
}
require.NoError(t, router.Normalize())
require.NoError(t, s.EnsureConfigEntry(6, &router, nil))
// We updated a relevant config entry
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = s.downstreamsForServiceTxn(tx, ws, "dc1", target)
require.NoError(t, err)
tx.Abort()
expect = []structs.ServiceName{
// get web from listing admin directly as an upstream
{Name: "web", EnterpriseMeta: *defaultMeta},
// get api from old-admin routing to admin and web listing old-admin as an upstream
{Name: "api", EnterpriseMeta: *defaultMeta},
}
require.Equal(t, uint64(6), idx)
require.ElementsMatch(t, expect, names)
}

View File

@ -374,17 +374,15 @@ var serviceGraphKinds = []string{
} }
// discoveryChainTargets will return a list of services listed as a target for the input's discovery chain // discoveryChainTargets will return a list of services listed as a target for the input's discovery chain
func (s *Store) discoveryChainTargets(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
source := structs.NewServiceName(service, entMeta) source := structs.NewServiceName(service, entMeta)
req := discoverychain.CompileRequest{ req := discoverychain.CompileRequest{
ServiceName: source.Name, ServiceName: source.Name,
EvaluateInNamespace: source.NamespaceOrDefault(), EvaluateInNamespace: source.NamespaceOrDefault(),
// TODO(freddy) : Should these be anything other than the known DC?
EvaluateInDatacenter: dc, EvaluateInDatacenter: dc,
UseInDatacenter: dc, UseInDatacenter: dc,
} }
idx, chain, err := s.ServiceDiscoveryChain(ws, source.Name, entMeta, req) idx, chain, err := s.serviceDiscoveryChainTxn(tx, ws, source.Name, entMeta, req)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err) return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err)
} }
@ -402,11 +400,11 @@ func (s *Store) discoveryChainTargets(ws memdb.WatchSet, dc, service string, ent
return idx, resp, nil return idx, resp, nil
} }
// discoveryChainSources will return a list of services whose discovery chains have the input service as a target // discoveryChainSourcesTxn will return a list of services whose discovery chains have the given service as a target
func (s *Store) discoveryChainSources(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) { func (s *Store) discoveryChainSourcesTxn(tx ReadTxn, ws memdb.WatchSet, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) {
queue := []structs.ServiceName{destination} seenLink := map[structs.ServiceName]bool{destination: true}
seenLink := make(map[structs.ServiceName]bool) queue := []structs.ServiceName{destination}
for len(queue) > 0 { for len(queue) > 0 {
// The "link" index returns config entries that reference a service // The "link" index returns config entries that reference a service
iter, err := tx.Get(configTableName, "link", queue[0].ToServiceID()) iter, err := tx.Get(configTableName, "link", queue[0].ToServiceID())
@ -428,22 +426,20 @@ func (s *Store) discoveryChainSources(ws memdb.WatchSet, tx ReadTxn, dc string,
} }
var ( var (
maxIdx uint64 maxIdx uint64 = 1
resp []structs.ServiceName resp []structs.ServiceName
) )
// Only return the services that directly target the destination // Only return the services that target the destination anywhere in their discovery chains.
seenSource := make(map[structs.ServiceName]bool) seenSource := make(map[structs.ServiceName]bool)
for sn := range seenLink { for sn := range seenLink {
req := discoverychain.CompileRequest{ req := discoverychain.CompileRequest{
ServiceName: sn.Name, ServiceName: sn.Name,
EvaluateInNamespace: sn.NamespaceOrDefault(), EvaluateInNamespace: sn.NamespaceOrDefault(),
// TODO(freddy) : Should these be anything other than the known DC?
EvaluateInDatacenter: dc, EvaluateInDatacenter: dc,
UseInDatacenter: dc, UseInDatacenter: dc,
} }
idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, &sn.EnterpriseMeta, req) idx, chain, err := s.serviceDiscoveryChainTxn(tx, ws, sn.Name, &sn.EnterpriseMeta, req)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err) return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err)
} }
@ -657,8 +653,21 @@ func (s *Store) ServiceDiscoveryChain(
entMeta *structs.EnterpriseMeta, entMeta *structs.EnterpriseMeta,
req discoverychain.CompileRequest, req discoverychain.CompileRequest,
) (uint64, *structs.CompiledDiscoveryChain, error) { ) (uint64, *structs.CompiledDiscoveryChain, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
index, entries, err := s.readDiscoveryChainConfigEntries(ws, serviceName, nil, entMeta) return s.serviceDiscoveryChainTxn(tx, ws, serviceName, entMeta, req)
}
func (s *Store) serviceDiscoveryChainTxn(
tx ReadTxn,
ws memdb.WatchSet,
serviceName string,
entMeta *structs.EnterpriseMeta,
req discoverychain.CompileRequest,
) (uint64, *structs.CompiledDiscoveryChain, error) {
index, entries, err := readDiscoveryChainConfigEntriesTxn(tx, ws, serviceName, nil, entMeta)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }

View File

@ -1457,14 +1457,24 @@ func TestSourcesForTarget(t *testing.T) {
defaultMeta := *structs.DefaultEnterpriseMeta() defaultMeta := *structs.DefaultEnterpriseMeta()
type expect struct { type expect struct {
idx uint64 idx uint64
ids []structs.ServiceName names []structs.ServiceName
} }
tt := []struct { tt := []struct {
name string name string
entries []structs.ConfigEntry entries []structs.ConfigEntry
expect expect expect expect
}{ }{
{
name: "no relevant config entries",
entries: []structs.ConfigEntry{},
expect: expect{
idx: 1,
names: []structs.ServiceName{
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
{ {
name: "from route match", name: "from route match",
entries: []structs.ConfigEntry{ entries: []structs.ConfigEntry{
@ -1494,8 +1504,9 @@ func TestSourcesForTarget(t *testing.T) {
}, },
expect: expect{ expect: expect{
idx: 2, idx: 2,
ids: []structs.ServiceName{ names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta}, {Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
}, },
}, },
}, },
@ -1519,8 +1530,9 @@ func TestSourcesForTarget(t *testing.T) {
}, },
expect: expect{ expect: expect{
idx: 2, idx: 2,
ids: []structs.ServiceName{ names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta}, {Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
}, },
}, },
}, },
@ -1547,8 +1559,9 @@ func TestSourcesForTarget(t *testing.T) {
}, },
expect: expect{ expect: expect{
idx: 2, idx: 2,
ids: []structs.ServiceName{ names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta}, {Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
}, },
}, },
}, },
@ -1573,8 +1586,9 @@ func TestSourcesForTarget(t *testing.T) {
}, },
expect: expect{ expect: expect{
idx: 2, idx: 2,
ids: []structs.ServiceName{ names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta}, {Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
}, },
}, },
}, },
@ -1614,9 +1628,10 @@ func TestSourcesForTarget(t *testing.T) {
}, },
expect: expect{ expect: expect{
idx: 3, idx: 3,
ids: []structs.ServiceName{ names: []structs.ServiceName{
{Name: "source", EnterpriseMeta: defaultMeta}, {Name: "source", EnterpriseMeta: defaultMeta},
{Name: "routed", EnterpriseMeta: defaultMeta}, {Name: "routed", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
}, },
}, },
}, },
@ -1682,11 +1697,12 @@ func TestSourcesForTarget(t *testing.T) {
}, },
expect: expect{ expect: expect{
idx: 6, idx: 6,
ids: []structs.ServiceName{ names: []structs.ServiceName{
{Name: "split", EnterpriseMeta: defaultMeta}, {Name: "split", EnterpriseMeta: defaultMeta},
{Name: "failed-over", EnterpriseMeta: defaultMeta}, {Name: "failed-over", EnterpriseMeta: defaultMeta},
{Name: "redirected", EnterpriseMeta: defaultMeta}, {Name: "redirected", EnterpriseMeta: defaultMeta},
{Name: "routed", EnterpriseMeta: defaultMeta}, {Name: "routed", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
}, },
}, },
}, },
@ -1714,11 +1730,11 @@ func TestSourcesForTarget(t *testing.T) {
defer tx.Abort() defer tx.Abort()
sn := structs.NewServiceName("sink", structs.DefaultEnterpriseMeta()) sn := structs.NewServiceName("sink", structs.DefaultEnterpriseMeta())
idx, ids, err := s.discoveryChainSources(ws, tx, "dc1", sn) idx, names, err := s.discoveryChainSourcesTxn(tx, ws, "dc1", sn)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx) require.Equal(t, tc.expect.idx, idx)
require.ElementsMatch(t, tc.expect.ids, ids) require.ElementsMatch(t, tc.expect.names, names)
}) })
} }
} }
@ -1915,7 +1931,7 @@ func TestTargetsForSource(t *testing.T) {
tx := s.db.ReadTxn() tx := s.db.ReadTxn()
defer tx.Abort() defer tx.Abort()
idx, ids, err := s.discoveryChainTargets(ws, "dc1", "web", nil) idx, ids, err := s.discoveryChainTargetsTxn(tx, ws, "dc1", "web", nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx) require.Equal(t, tc.expect.idx, idx)

View File

@ -1033,6 +1033,8 @@ func (ns *NodeService) CompoundServiceName() ServiceName {
// UniqueID is a unique identifier for a service instance within a datacenter by encoding: // UniqueID is a unique identifier for a service instance within a datacenter by encoding:
// node/namespace/service_id // node/namespace/service_id
//
// Note: We do not have strict character restrictions in all node names, so this should NOT be split on / to retrieve components.
func UniqueID(node string, compoundID string) string { func UniqueID(node string, compoundID string) string {
return fmt.Sprintf("%s/%s", node, compoundID) return fmt.Sprintf("%s/%s", node, compoundID)
} }
@ -2412,7 +2414,14 @@ func (r *KeyringResponses) New() interface{} {
type UpstreamDownstream struct { type UpstreamDownstream struct {
Upstream ServiceName Upstream ServiceName
Downstream ServiceName Downstream ServiceName
Refs map[string]bool
// Upstream/Downstream pairs come from individual service registrations, and they can be updated individually.
// Refs stores the registrations that contain this pairing.
// When there are no remaining Refs, the UpstreamDownstream can be deleted.
//
// Note: This map must be treated as immutable when accessed in MemDB.
// The entire UpstreamDownstream structure must be deep copied on updates.
Refs map[string]struct{}
RaftIndex RaftIndex
} }

View File

@ -255,6 +255,9 @@ RPC:
return prepSummaryOutput(summaries, false), nil return prepSummaryOutput(summaries, false), nil
} }
// UIServiceTopology returns the list of upstreams and downstreams for a Connect enabled service.
// - Downstreams are services that may route to the input service.
// - Upstreams are the upstreams defined in the target service's proxy registrations
func (s *HTTPHandlers) UIServiceTopology(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPHandlers) UIServiceTopology(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Parse arguments // Parse arguments
args := structs.ServiceSpecificRequest{} args := structs.ServiceSpecificRequest{}