peering: add store.PeeringsForService implementation (#12957)

This commit is contained in:
Evan Culver 2022-05-06 12:35:31 -07:00 committed by GitHub
parent eafd91f35c
commit 9c8606e138
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 534 additions and 22 deletions

View File

@ -1157,6 +1157,24 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *acl.EnterpriseMeta,
return idx, results, nil
}
func serviceExists(tx ReadTxn, ws memdb.WatchSet, name string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, bool, error) {
idx := catalogServicesMaxIndex(tx, entMeta, peerName)
q := Query{
Value: name,
EnterpriseMeta: *entMeta,
PeerName: peerName,
}
watchCh, existing, err := tx.FirstWatch(tableServices, indexService, q)
if err != nil {
return idx, false, fmt.Errorf("failed querying for service: %s", err)
}
ws.Add(watchCh)
if existing == nil {
return idx, false, nil
}
return idx, true, nil
}
// ServicesByNodeMeta returns all services, filtered by the given node metadata.
func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.Services, error) {
tx := s.db.Txn(false)

View File

@ -1341,6 +1341,52 @@ func configEntryWithOverridesTxn(
return configEntryTxn(tx, ws, kind, name, entMeta)
}
// getExportedServicesConfigEntriesTxn fetches exported-service config entries and
// filters their exported services to only those that match serviceName and entMeta.
// Because the resulting config entries may have had their exported services modified,
// they *should not* be used in subsequent writes.
func getExportedServiceConfigEntriesTxn(
tx ReadTxn,
ws memdb.WatchSet,
serviceName string,
entMeta *acl.EnterpriseMeta,
) (uint64, []*structs.ExportedServicesConfigEntry, error) {
var exportedServicesEntries []*structs.ExportedServicesConfigEntry
// slice of names to match config entries against
matchCandidates := getExportedServicesMatchServiceNames(serviceName, entMeta)
// matcher func generator for currying the matcher func over EnterpriseMeta values
// from the associated config entry
matchFunc := func(matchMeta *acl.EnterpriseMeta) func(structs.ExportedService) bool {
return func(exportedService structs.ExportedService) bool {
matchSvcName := structs.NewServiceName(exportedService.Name, matchMeta)
for _, candidate := range matchCandidates {
if candidate.Matches(matchSvcName) {
return true
}
}
return false
}
}
idx, entries, err := configEntriesByKindTxn(tx, ws, structs.ExportedServices, entMeta)
if err != nil {
return 0, nil, err
}
for _, entry := range entries {
esEntry, ok := entry.(*structs.ExportedServicesConfigEntry)
if !ok {
return 0, nil, fmt.Errorf("type %T is not a %s config entry", esEntry, structs.ExportedServices)
}
// get a copy of the config entry with Services filtered to match serviceName
newEntry := filterExportedServices(esEntry, matchFunc(entry.GetEnterpriseMeta()))
// the filter will return a new entry, so checking to see if its services is empty says that there
// were matches and that we should include it in the results
if len(newEntry.Services) > 0 {
exportedServicesEntries = append(exportedServicesEntries, newEntry)
}
}
return idx, exportedServicesEntries, nil
}
// protocolForService returns the service graph protocol associated to the
// provided service, checking all relevant config entries.
func protocolForService(
@ -1383,6 +1429,23 @@ func protocolForService(
return maxIdx, chain.Protocol, nil
}
// filterExportedServices returns the slice of ExportedService that matc ffor matching service names
// returning a copy of entry with only the services that match one of the
// services in candidates.
func filterExportedServices(
entry *structs.ExportedServicesConfigEntry,
testFunc func(structs.ExportedService) bool,
) *structs.ExportedServicesConfigEntry {
newEntry := *entry
newEntry.Services = []structs.ExportedService{}
for _, ceSvc := range entry.Services {
if testFunc(ceSvc) {
newEntry.Services = append(newEntry.Services, ceSvc)
}
}
return &newEntry
}
func newConfigEntryQuery(c structs.ConfigEntry) configentry.KindName {
return configentry.NewKindName(c.GetKind(), c.GetName(), c.GetEnterpriseMeta())
}

View File

@ -60,3 +60,15 @@ func configIntentionsConvertToList(iter memdb.ResultIterator, _ *acl.EnterpriseM
}
return results
}
// getExportedServicesMatchServicesNames returns a list of service names that are considered matches when
// found in a list of exported-services config entries. For OSS, namespace is not considered, so a match is one of:
// - the service name matches
// - the service name is a wildcard
// This value can be used to filter exported-services config entries for a given service name.
func getExportedServicesMatchServiceNames(serviceName string, entMeta *acl.EnterpriseMeta) []structs.ServiceName {
return []structs.ServiceName{
structs.NewServiceName(serviceName, entMeta),
structs.NewServiceName(structs.WildcardSpecifier, entMeta),
}
}

View File

@ -4,9 +4,12 @@
package state
import (
"testing"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
)
func testIndexerTableConfigEntries() map[string]indexerTestCase {
@ -36,3 +39,121 @@ func testIndexerTableConfigEntries() map[string]indexerTestCase {
},
}
}
func TestStore_ExportedServices(t *testing.T) {
type testCase struct {
name string
write []structs.ConfigEntry
query string
expect []*structs.ExportedServicesConfigEntry
}
cases := []testCase{
{
name: "empty everything",
write: []structs.ConfigEntry{},
query: "foo",
expect: []*structs.ExportedServicesConfigEntry{},
},
{
name: "no matching exported services",
write: []structs.ConfigEntry{
&structs.ProxyConfigEntry{Name: "foo"},
&structs.ProxyConfigEntry{Name: "bar"},
&structs.ExportedServicesConfigEntry{
Name: "baz",
Services: []structs.ExportedService{
{Name: "baz"},
},
},
},
query: "foo",
expect: []*structs.ExportedServicesConfigEntry{},
},
{
name: "exact match service name",
write: []structs.ConfigEntry{
&structs.ExportedServicesConfigEntry{
Name: "foo",
Services: []structs.ExportedService{
{Name: "foo"},
},
},
&structs.ExportedServicesConfigEntry{
Name: "bar",
Services: []structs.ExportedService{
{Name: "bar"},
},
},
},
query: "bar",
expect: []*structs.ExportedServicesConfigEntry{
{
Name: "bar",
Services: []structs.ExportedService{
{Name: "bar"},
},
},
},
},
{
name: "wildcard match on service name",
write: []structs.ConfigEntry{
&structs.ExportedServicesConfigEntry{
Name: "foo",
Services: []structs.ExportedService{
{Name: "foo"},
},
},
&structs.ExportedServicesConfigEntry{
Name: "wildcard",
Services: []structs.ExportedService{
{Name: structs.WildcardSpecifier},
},
},
},
query: "foo",
expect: []*structs.ExportedServicesConfigEntry{
{
Name: "foo",
Services: []structs.ExportedService{
{Name: "foo"},
},
},
{
Name: "wildcard",
Services: []structs.ExportedService{
{Name: structs.WildcardSpecifier},
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
s := testStateStore(t)
// Write the entries.
for idx, entry := range tc.write {
require.NoError(t, s.EnsureConfigEntry(uint64(idx+1), entry))
}
// Read the entries back.
tx := s.db.ReadTxn()
defer tx.Abort()
idx, entries, err := getExportedServiceConfigEntriesTxn(tx, nil, tc.query, acl.DefaultEnterpriseMeta())
require.NoError(t, err)
require.Equal(t, uint64(len(tc.write)), idx)
// Verify the result.
require.Len(t, entries, len(tc.expect))
for idx, got := range entries {
// ignore raft fields
got.ModifyIndex = 0
got.CreateIndex = 0
require.Equal(t, tc.expect[idx], got)
}
})
}
}

View File

@ -84,7 +84,7 @@ func (s *Store) PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeerin
tx := s.db.ReadTxn()
defer tx.Abort()
peering, err := peeringReadByIDTxn(ws, tx, id)
peering, err := peeringReadByIDTxn(tx, ws, id)
if err != nil {
return 0, nil, fmt.Errorf("failed to read peering by id: %w", err)
}
@ -96,17 +96,35 @@ func (s *Store) PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeerin
return peering.ModifyIndex, peering, nil
}
func peeringReadByIDTxn(tx ReadTxn, ws memdb.WatchSet, id string) (*pbpeering.Peering, error) {
watchCh, peeringRaw, err := tx.FirstWatch(tablePeering, indexID, id)
if err != nil {
return nil, fmt.Errorf("failed peering lookup: %w", err)
}
ws.Add(watchCh)
peering, ok := peeringRaw.(*pbpeering.Peering)
if peeringRaw != nil && !ok {
return nil, fmt.Errorf("invalid type %T", peering)
}
return peering, nil
}
func (s *Store) PeeringRead(ws memdb.WatchSet, q Query) (uint64, *pbpeering.Peering, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
return peeringReadTxn(tx, ws, q)
}
func peeringReadTxn(tx ReadTxn, ws memdb.WatchSet, q Query) (uint64, *pbpeering.Peering, error) {
watchCh, peeringRaw, err := tx.FirstWatch(tablePeering, indexName, q)
if err != nil {
return 0, nil, fmt.Errorf("failed peering lookup: %w", err)
}
peering, ok := peeringRaw.(*pbpeering.Peering)
if peering != nil && !ok {
if peeringRaw != nil && !ok {
return 0, nil, fmt.Errorf("invalid type %T", peering)
}
ws.Add(watchCh)
@ -115,23 +133,10 @@ func (s *Store) PeeringRead(ws memdb.WatchSet, q Query) (uint64, *pbpeering.Peer
// Return the tables index so caller can watch it for changes if the peering doesn't exist
return maxIndexWatchTxn(tx, ws, partitionedIndexEntryName(tablePeering, q.PartitionOrDefault())), nil, nil
}
return peering.ModifyIndex, peering, nil
}
func peeringReadByIDTxn(ws memdb.WatchSet, tx ReadTxn, id string) (*pbpeering.Peering, error) {
watchCh, peeringRaw, err := tx.FirstWatch(tablePeering, indexID, id)
if err != nil {
return nil, fmt.Errorf("failed peering lookup: %w", err)
}
ws.Add(watchCh)
peering, ok := peeringRaw.(*pbpeering.Peering)
if peering != nil && !ok {
return nil, fmt.Errorf("invalid type %T", peering)
}
return peering, nil
}
func (s *Store) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
@ -166,7 +171,7 @@ func generatePeeringUUID(tx ReadTxn) (string, error) {
if err != nil {
return "", fmt.Errorf("failed to generate UUID: %w", err)
}
existing, err := peeringReadByIDTxn(nil, tx, uuid)
existing, err := peeringReadByIDTxn(tx, nil, uuid)
if err != nil {
return "", fmt.Errorf("failed to read peering: %w", err)
}
@ -249,7 +254,7 @@ func (s *Store) PeeringTerminateByID(idx uint64, id string) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
existing, err := peeringReadByIDTxn(nil, tx, id)
existing, err := peeringReadByIDTxn(tx, nil, id)
if err != nil {
return fmt.Errorf("failed to read peering %q: %w", id, err)
}
@ -285,7 +290,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
tx := s.db.ReadTxn()
defer tx.Abort()
peering, err := peeringReadByIDTxn(ws, tx, peerID)
peering, err := peeringReadByIDTxn(tx, ws, peerID)
if err != nil {
return 0, nil, fmt.Errorf("failed to read peering: %w", err)
}
@ -369,6 +374,53 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
return maxIdx, resp, nil
}
// PeeringsForService returns the list of peerings that are associated with the service name provided in the query.
// This is used to configure connect proxies for a given service. The result is generated by querying for exported
// service config entries and filtering for those that match the given service.
// TODO(peering): this implementation does all of the work on read to materialize this list of peerings, we should explore
// writing to a separate index that has service peerings prepared ahead of time should this become a performance bottleneck.
func (s *Store) PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
// short-circuit if the service does not exist in the context of the query -- this prevents "leaking" services
// when there are wildcard rules in place.
if svcIdx, svcExists, err := serviceExists(tx, ws, serviceName, &entMeta, ""); err != nil {
return 0, nil, fmt.Errorf("failed to check if service exists: %w", err)
} else if !svcExists {
// if the service does not exist, return the max index for the services table so caller can watch for changes
return svcIdx, nil, nil
}
// config entries must be defined in the default namespace, so we only need the partition here
meta := structs.DefaultEnterpriseMetaInPartition(entMeta.PartitionOrDefault())
// return the idx of the config entry that was last modified so caller can watch for changes
idx, peeredServices, err := readPeeredServicesFromConfigEntriesTxn(tx, ws, serviceName, meta)
if err != nil {
return 0, nil, fmt.Errorf("failed to read peered services for service name: %w", err)
}
var peerings []*pbpeering.Peering
// lookup the peering for each matching peered service
for _, peeredService := range peeredServices {
readQuery := Query{
Value: peeredService.PeerName,
EnterpriseMeta: peeredService.Name.EnterpriseMeta,
}
_, peering, err := peeringReadTxn(tx, ws, readQuery)
if err != nil {
return 0, nil, fmt.Errorf("failed to read peering: %w", err)
}
if peering == nil {
continue
}
peerings = append(peerings, peering)
}
// see note above about idx
return idx, peerings, nil
}
// PeeringTrustBundleRead returns the peering trust bundle for the peer name given as the query value.
func (s *Store) PeeringTrustBundleRead(ws memdb.WatchSet, q Query) (uint64, *pbpeering.PeeringTrustBundle, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
@ -379,7 +431,7 @@ func (s *Store) PeeringTrustBundleRead(ws memdb.WatchSet, q Query) (uint64, *pbp
}
ptb, ok := ptbRaw.(*pbpeering.PeeringTrustBundle)
if ptb != nil && !ok {
if ptbRaw != nil && !ok {
return 0, nil, fmt.Errorf("invalid type %T", ptb)
}
ws.Add(watchCh)
@ -477,10 +529,57 @@ func (r *Restore) PeeringTrustBundle(ptb *pbpeering.PeeringTrustBundle) error {
if err := r.tx.Insert(tablePeeringTrustBundles, ptb); err != nil {
return fmt.Errorf("failed restoring peering trust bundle: %w", err)
}
if err := updatePeeringTrustBundlesTableIndexes(r.tx, ptb.ModifyIndex, ptb.PartitionOrDefault()); err != nil {
return err
}
return nil
}
// readPeeredServicesFromConfigEntriesTxn queries exported-service config entries to return peers for serviceName
// in the form of a []structs.PeeredService.
func readPeeredServicesFromConfigEntriesTxn(
tx ReadTxn,
ws memdb.WatchSet,
serviceName string,
entMeta *acl.EnterpriseMeta,
) (uint64, []structs.PeeredService, error) {
var results []structs.PeeredService
// Get all exported-service config entries for that have exports for serviceName. This assumes the result
// has exported services filtered to only those matching serviceName so no futher filtering is needed.
idx, exportedServicesEntries, err := getExportedServiceConfigEntriesTxn(tx, ws, serviceName, entMeta)
if err != nil {
return 0, nil, err
}
// dedupe results by peer name
resultSet := make(map[string]struct{})
// filter entries to only those that have a peer consumer defined
for _, entry := range exportedServicesEntries {
for _, service := range entry.Services {
// entries must have consumers
if service.Consumers == nil || len(service.Consumers) == 0 {
continue
}
for _, consumer := range service.Consumers {
// and consumers must have a peer
if consumer.PeerName == "" {
continue
}
// if we get here, we have a peer consumer, but we should dedupe peer names, so skip if it's already in the set
if _, ok := resultSet[consumer.PeerName]; ok {
continue
}
// if we got here, we can add to the result set
resultSet[consumer.PeerName] = struct{}{}
result := structs.PeeredService{
Name: structs.NewServiceName(serviceName, entry.GetEnterpriseMeta()),
PeerName: consumer.PeerName,
}
results = append(results, result)
}
}
}
return idx, results, nil
}

View File

@ -809,3 +809,196 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
require.Empty(t, exported)
})
}
func TestStateStore_PeeringsForService(t *testing.T) {
type testCase struct {
name string
services []structs.ServiceName
peerings []*pbpeering.Peering
entries []*structs.ExportedServicesConfigEntry
query []string
expect [][]*pbpeering.Peering
expectIdx uint64
}
run := func(t *testing.T, tc testCase) {
s := testStateStore(t)
var lastIdx uint64
// Create peerings
for _, peering := range tc.peerings {
lastIdx++
require.NoError(t, s.PeeringWrite(lastIdx, peering))
// make sure it got created
q := Query{Value: peering.Name}
_, p, err := s.PeeringRead(nil, q)
require.NoError(t, err)
require.NotNil(t, p)
}
// Create a Nodes for services
svcNode := &structs.Node{Node: "foo", Address: "127.0.0.1"}
lastIdx++
require.NoError(t, s.EnsureNode(lastIdx, svcNode))
// Create the test services
for _, svc := range tc.services {
lastIdx++
require.NoError(t, s.EnsureService(lastIdx, svcNode.Node, &structs.NodeService{
ID: svc.Name,
Service: svc.Name,
Port: 8080,
}))
}
// Write the config entries.
for _, entry := range tc.entries {
lastIdx++
require.NoError(t, s.EnsureConfigEntry(lastIdx, entry))
}
// Query for peers.
for resultIdx, q := range tc.query {
tx := s.db.ReadTxn()
defer tx.Abort()
idx, peers, err := s.PeeringsForService(nil, q, *acl.DefaultEnterpriseMeta())
require.NoError(t, err)
require.Equal(t, tc.expectIdx, idx)
// Verify the result, ignoring generated fields
require.Len(t, peers, len(tc.expect[resultIdx]))
for _, got := range peers {
got.ID = ""
got.ModifyIndex = 0
got.CreateIndex = 0
}
require.ElementsMatch(t, tc.expect[resultIdx], peers)
}
}
cases := []testCase{
{
name: "no exported services",
services: []structs.ServiceName{
{Name: "foo"},
},
peerings: []*pbpeering.Peering{},
entries: []*structs.ExportedServicesConfigEntry{},
query: []string{"foo"},
expect: [][]*pbpeering.Peering{{}},
},
{
name: "service does not exist",
services: []structs.ServiceName{
{Name: "foo"},
},
peerings: []*pbpeering.Peering{},
entries: []*structs.ExportedServicesConfigEntry{},
query: []string{"bar"},
expect: [][]*pbpeering.Peering{{}},
expectIdx: uint64(2), // catalog services max index
},
{
name: "config entry with exact service name",
services: []structs.ServiceName{
{Name: "foo"},
{Name: "bar"},
},
peerings: []*pbpeering.Peering{
{Name: "peer1", State: pbpeering.PeeringState_INITIAL},
{Name: "peer2", State: pbpeering.PeeringState_INITIAL},
},
entries: []*structs.ExportedServicesConfigEntry{
{
Name: "ce1",
Services: []structs.ExportedService{
{
Name: "foo",
Consumers: []structs.ServiceConsumer{
{
PeerName: "peer1",
},
},
},
{
Name: "bar",
Consumers: []structs.ServiceConsumer{
{
PeerName: "peer2",
},
},
},
},
},
},
query: []string{"foo", "bar"},
expect: [][]*pbpeering.Peering{
{
{Name: "peer1", State: pbpeering.PeeringState_INITIAL},
},
{
{Name: "peer2", State: pbpeering.PeeringState_INITIAL},
},
},
expectIdx: uint64(6), // config entries max index
},
{
name: "config entry with wildcard service name",
services: []structs.ServiceName{
{Name: "foo"},
{Name: "bar"},
},
peerings: []*pbpeering.Peering{
{Name: "peer1", State: pbpeering.PeeringState_INITIAL},
{Name: "peer2", State: pbpeering.PeeringState_INITIAL},
{Name: "peer3", State: pbpeering.PeeringState_INITIAL},
},
entries: []*structs.ExportedServicesConfigEntry{
{
Name: "ce1",
Services: []structs.ExportedService{
{
Name: "*",
Consumers: []structs.ServiceConsumer{
{
PeerName: "peer1",
},
{
PeerName: "peer2",
},
},
},
{
Name: "bar",
Consumers: []structs.ServiceConsumer{
{
PeerName: "peer3",
},
},
},
},
},
},
query: []string{"foo", "bar"},
expect: [][]*pbpeering.Peering{
{
{Name: "peer1", State: pbpeering.PeeringState_INITIAL},
{Name: "peer2", State: pbpeering.PeeringState_INITIAL},
},
{
{Name: "peer1", State: pbpeering.PeeringState_INITIAL},
{Name: "peer2", State: pbpeering.PeeringState_INITIAL},
{Name: "peer3", State: pbpeering.PeeringState_INITIAL},
},
},
expectIdx: uint64(7),
},
}
for _, tc := range cases {
runStep(t, tc.name, func(t *testing.T) {
run(t, tc)
})
}
}

View File

@ -7,3 +7,9 @@ type PeeringToken struct {
ServerName string
PeerID string
}
// PeeredService is a service that has been configured with an exported-service config entry to be exported to a peer.
type PeeredService struct {
Name ServiceName
PeerName string
}