diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index e04f7ebc9c..c91070ea0d 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -2092,7 +2092,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru tx := s.db.Txn(false) defer tx.Abort() - iter, err := gatewayServices(tx, gateway, entMeta) + iter, err := tx.Get(tableGatewayServices, indexGateway, structs.NewServiceName(gateway, entMeta)) if err != nil { return 0, nil, fmt.Errorf("failed gateway services lookup: %s", err) } @@ -2386,7 +2386,7 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en // Delete all associated with gateway first, to avoid keeping mappings that were removed sn := structs.NewServiceName(conf.GetName(), entMeta) - if _, err := tx.DeleteAll(tableGatewayServices, "gateway", sn); err != nil { + if _, err := tx.DeleteAll(tableGatewayServices, indexGateway, sn); err != nil { return fmt.Errorf("failed to truncate gateway services table: %v", err) } if err := truncateGatewayServiceTopologyMappings(tx, idx, sn, conf.GetKind()); err != nil { @@ -2524,7 +2524,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer continue } - existing, err := tx.First(tableGatewayServices, "id", service.Gateway, sn.CompoundServiceName(), service.Port) + existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn.CompoundServiceName(), service.Port) if err != nil { return fmt.Errorf("gateway service lookup failed: %s", err) } @@ -2559,7 +2559,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error { // Check if mapping already exists in table if it's already in the table // Avoid insert if nothing changed - existing, err := tx.First(tableGatewayServices, "id", mapping.Gateway, mapping.Service, mapping.Port) + existing, err := tx.First(tableGatewayServices, indexID, mapping.Gateway, mapping.Service, mapping.Port) if err != nil { return fmt.Errorf("gateway service lookup failed: %s", err) } @@ -2597,7 +2597,8 @@ func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.NodeSe return nil } - svcGateways, err := serviceGateways(tx, structs.WildcardSpecifier, &svc.EnterpriseMeta) + sn := structs.ServiceName{Name: structs.WildcardSpecifier, EnterpriseMeta: svc.EnterpriseMeta} + svcGateways, err := tx.Get(tableGatewayServices, indexService, sn) if err != nil { return fmt.Errorf("failed gateway lookup for %q: %s", svc.Service, err) } @@ -2620,7 +2621,8 @@ func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.NodeSe func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) error { // Clean up association between service name and gateways if needed - gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta) + sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta} + gateways, err := tx.Get(tableGatewayServices, indexService, sn) if err != nil { return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err) } @@ -2652,21 +2654,11 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) return nil } -// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up -// all the gateways mapped to this service. -func serviceGateways(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get(tableGatewayServices, "service", structs.NewServiceName(name, entMeta)) -} - -func gatewayServices(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get(tableGatewayServices, "gateway", structs.NewServiceName(name, entMeta)) -} - func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayServices, error) { tx := s.db.ReadTxn() defer tx.Abort() - iter, err := tx.Get(tableGatewayServices, "id") + iter, err := tx.Get(tableGatewayServices, indexID) if err != nil { return 0, nil, fmt.Errorf("failed to dump gateway-services: %s", err) } @@ -2709,7 +2701,7 @@ func (s *Store) collectGatewayServices(tx ReadTxn, ws memdb.WatchSet, iter memdb // We might need something like the service_last_extinction index? func serviceGatewayNodes(tx ReadTxn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { // Look up gateway name associated with the service - gws, err := serviceGateways(tx, service, entMeta) + gws, err := tx.Get(tableGatewayServices, indexService, structs.NewServiceName(service, entMeta)) if err != nil { return 0, nil, fmt.Errorf("failed gateway lookup: %s", err) } @@ -2992,9 +2984,9 @@ func downstreamsFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, sn structs.Se func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) { // To fetch upstreams we query services that have the input listed as a downstream // To fetch downstreams we query services that have the input listed as an upstream - index := "downstream" + index := indexDownstream if downstreams { - index = "upstream" + index = indexUpstream } iter, err := tx.Get(tableMeshTopology, index, service) @@ -3008,7 +3000,7 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se resp []structs.ServiceName ) for raw := iter.Next(); raw != nil; raw = iter.Next() { - entry := raw.(*structs.UpstreamDownstream) + entry := raw.(*upstreamDownstream) if entry.ModifyIndex > idx { idx = entry.ModifyIndex } @@ -3053,20 +3045,20 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS upstreamMeta := structs.NewEnterpriseMeta(u.DestinationNamespace) upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta) - obj, err := tx.First(tableMeshTopology, "id", upstream, downstream) + obj, err := tx.First(tableMeshTopology, indexID, upstream, downstream) if err != nil { return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err) } sid := svc.CompoundServiceID() uid := structs.UniqueID(node, sid.String()) - var mapping *structs.UpstreamDownstream - if existing, ok := obj.(*structs.UpstreamDownstream); ok { + var mapping *upstreamDownstream + if existing, ok := obj.(*upstreamDownstream); ok { rawCopy, err := copystructure.Copy(existing) if err != nil { return fmt.Errorf("failed to copy existing topology mapping: %v", err) } - mapping, ok = rawCopy.(*structs.UpstreamDownstream) + mapping, ok = rawCopy.(*upstreamDownstream) if !ok { return fmt.Errorf("unexpected topology type %T", rawCopy) } @@ -3076,7 +3068,7 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS inserted[upstream] = true } if mapping == nil { - mapping = &structs.UpstreamDownstream{ + mapping = &upstreamDownstream{ Upstream: upstream, Downstream: downstream, Refs: map[string]struct{}{uid: {}}, @@ -3097,7 +3089,7 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS for u := range oldUpstreams { if !inserted[u] { - if _, err := tx.DeleteAll(tableMeshTopology, "id", u, downstream); err != nil { + if _, err := tx.DeleteAll(tableMeshTopology, indexID, u, downstream); err != nil { return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err) } if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { @@ -3119,14 +3111,14 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode) sid := service.CompoundServiceID() uid := structs.UniqueID(service.Node, sid.String()) - iter, err := tx.Get(tableMeshTopology, "downstream", sn) + iter, err := tx.Get(tableMeshTopology, indexDownstream, sn) if err != nil { return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err) } - mappings := make([]*structs.UpstreamDownstream, 0) + mappings := make([]*upstreamDownstream, 0) for raw := iter.Next(); raw != nil; raw = iter.Next() { - mappings = append(mappings, raw.(*structs.UpstreamDownstream)) + mappings = append(mappings, raw.(*upstreamDownstream)) } // Do the updates in a separate loop so we don't trash the iterator. @@ -3135,7 +3127,7 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode) if err != nil { return fmt.Errorf("failed to copy existing topology mapping: %v", err) } - copy, ok := rawCopy.(*structs.UpstreamDownstream) + copy, ok := rawCopy.(*upstreamDownstream) if !ok { return fmt.Errorf("unexpected topology type %T", rawCopy) } @@ -3169,7 +3161,7 @@ func insertGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga return nil } - mapping := structs.UpstreamDownstream{ + mapping := upstreamDownstream{ Upstream: gs.Service, Downstream: gs.Gateway, RaftIndex: gs.RaftIndex, @@ -3190,7 +3182,7 @@ func deleteGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga return nil } - if _, err := tx.DeleteAll(tableMeshTopology, "id", gs.Service, gs.Gateway); err != nil { + if _, err := tx.DeleteAll(tableMeshTopology, indexID, gs.Service, gs.Gateway); err != nil { return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err) } if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { @@ -3206,7 +3198,7 @@ func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway str return nil } - if _, err := tx.DeleteAll(tableMeshTopology, "downstream", gateway); err != nil { + if _, err := tx.DeleteAll(tableMeshTopology, indexDownstream, gateway); err != nil { return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err) } if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 065aa518e5..9c123c1f50 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -450,7 +450,12 @@ func connectEventsByServiceKind(tx ReadTxn, origEvent stream.Event) ([]stream.Ev case structs.ServiceKindTerminatingGateway: var result []stream.Event - iter, err := gatewayServices(tx, node.Service.Service, &node.Service.EnterpriseMeta) + + sn := structs.ServiceName{ + Name: node.Service.Service, + EnterpriseMeta: node.Service.EnterpriseMeta, + } + iter, err := tx.Get(tableGatewayServices, indexGateway, sn) if err != nil { return nil, err } diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go index 63cbff3cc3..cbaa1bf1f2 100644 --- a/agent/consul/state/catalog_oss_test.go +++ b/agent/consul/state/catalog_oss_test.go @@ -40,6 +40,94 @@ func testIndexerTableChecks() map[string]indexerTestCase { } } +func testIndexerTableMeshTopology() map[string]indexerTestCase { + obj := upstreamDownstream{ + Upstream: structs.ServiceName{Name: "UpStReAm"}, + Downstream: structs.ServiceName{Name: "DownStream"}, + } + + return map[string]indexerTestCase{ + indexID: { + read: indexValue{ + source: []interface{}{ + structs.ServiceName{Name: "UpStReAm"}, + structs.ServiceName{Name: "DownStream"}, + }, + expected: []byte("upstream\x00downstream\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("upstream\x00downstream\x00"), + }, + }, + indexUpstream: { + read: indexValue{ + source: structs.ServiceName{Name: "UpStReAm"}, + expected: []byte("upstream\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("upstream\x00"), + }, + }, + indexDownstream: { + read: indexValue{ + source: structs.ServiceName{Name: "DownStream"}, + expected: []byte("downstream\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("downstream\x00"), + }, + }, + } +} + +func testIndexerTableGatewayServices() map[string]indexerTestCase { + obj := &structs.GatewayService{ + Gateway: structs.ServiceName{Name: "GateWay"}, + Service: structs.ServiceName{Name: "SerVice"}, + Port: 50123, + } + encodedPort := string([]byte{0x96, 0x8f, 0x06, 0, 0, 0, 0, 0, 0, 0}) + return map[string]indexerTestCase{ + indexID: { + read: indexValue{ + source: []interface{}{ + structs.ServiceName{Name: "GateWay"}, + structs.ServiceName{Name: "SerVice"}, + 50123, + }, + expected: []byte("gateway\x00service\x00" + encodedPort), + }, + write: indexValue{ + source: obj, + expected: []byte("gateway\x00service\x00" + encodedPort), + }, + }, + indexGateway: { + read: indexValue{ + source: structs.ServiceName{Name: "GateWay"}, + expected: []byte("gateway\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("gateway\x00"), + }, + }, + indexService: { + read: indexValue{ + source: structs.ServiceName{Name: "SerVice"}, + expected: []byte("service\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("service\x00"), + }, + }, + } +} + func testIndexerTableNodes() map[string]indexerTestCase { return map[string]indexerTestCase{ indexID: { diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 8769d1e75b..35bd5405a4 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -18,12 +18,15 @@ const ( tableMeshTopology = "mesh-topology" indexID = "id" - indexServiceName = "service" + indexService = "service" indexConnect = "connect" indexKind = "kind" indexStatus = "status" indexNodeService = "node_service" indexNode = "node" + indexUpstream = "upstream" + indexDownstream = "downstream" + indexGateway = "gateway" ) // nodesTableSchema returns a new table schema used for storing struct.Node. @@ -91,8 +94,8 @@ func servicesTableSchema() *memdb.TableSchema { writeIndex: writeIndex(indexFromNodeIdentity), }, }, - indexServiceName: { - Name: indexServiceName, + indexService: { + Name: indexService, AllowMissing: true, Unique: false, Indexer: &memdb.StringFieldIndex{ @@ -149,8 +152,8 @@ func checksTableSchema() *memdb.TableSchema { Lowercase: false, }, }, - indexServiceName: { - Name: indexServiceName, + indexService: { + Name: indexService, AllowMissing: true, Unique: false, Indexer: &memdb.StringFieldIndex{ @@ -204,16 +207,16 @@ func gatewayServicesTableSchema() *memdb.TableSchema { }, }, }, - "gateway": { - Name: "gateway", + indexGateway: { + Name: indexGateway, AllowMissing: false, Unique: false, Indexer: &ServiceNameIndex{ Field: "Gateway", }, }, - "service": { - Name: "service", + indexService: { + Name: indexService, AllowMissing: true, Unique: false, Indexer: &ServiceNameIndex{ @@ -245,16 +248,16 @@ func meshTopologyTableSchema() *memdb.TableSchema { }, }, }, - "upstream": { - Name: "upstream", + indexUpstream: { + Name: indexUpstream, AllowMissing: true, Unique: false, Indexer: &ServiceNameIndex{ Field: "Upstream", }, }, - "downstream": { - Name: "downstream", + indexDownstream: { + Name: indexDownstream, AllowMissing: false, Unique: false, Indexer: &ServiceNameIndex{ @@ -320,3 +323,18 @@ func (index *ServiceNameIndex) PrefixFromArgs(args ...interface{}) ([]byte, erro } return val, nil } + +// upstreamDownstream pairs come from individual proxy registrations, which can be updated independently. +type upstreamDownstream struct { + Upstream structs.ServiceName + Downstream structs.ServiceName + + // Refs stores the registrations that contain this pairing. + // When there are no remaining Refs, the upstreamDownstream can be deleted. + // + // Note: This map must be treated as immutable when accessed in MemDB. + // The entire upstreamDownstream structure must be deep copied on updates. + Refs map[string]struct{} + + structs.RaftIndex +} diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 41fe9f7520..318178a2f0 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -269,7 +269,7 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *s sn := structs.NewServiceName(name, entMeta) if kind == structs.TerminatingGateway || kind == structs.IngressGateway { - if _, err := tx.DeleteAll(tableGatewayServices, "gateway", sn); err != nil { + if _, err := tx.DeleteAll(tableGatewayServices, indexGateway, sn); err != nil { return fmt.Errorf("failed to truncate gateway services table: %v", err) } if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil { @@ -278,7 +278,7 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *s } // Also clean up associations in the mesh topology table for ingress gateways if kind == structs.IngressGateway { - if _, err := tx.DeleteAll(tableMeshTopology, "downstream", sn); err != nil { + if _, err := tx.DeleteAll(tableMeshTopology, indexDownstream, sn); err != nil { return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err) } if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { diff --git a/agent/consul/state/schema_test.go b/agent/consul/state/schema_test.go index 52738051e5..4851a314d8 100644 --- a/agent/consul/state/schema_test.go +++ b/agent/consul/state/schema_test.go @@ -128,10 +128,12 @@ func TestNewDBSchema_Indexers(t *testing.T) { require.NoError(t, schema.Validate()) var testcases = map[string]func() map[string]indexerTestCase{ - tableChecks: testIndexerTableChecks, - tableServices: testIndexerTableServices, - tableNodes: testIndexerTableNodes, - tableConfigEntries: testIndexerTableConfigEntries, + tableChecks: testIndexerTableChecks, + tableServices: testIndexerTableServices, + tableNodes: testIndexerTableNodes, + tableConfigEntries: testIndexerTableConfigEntries, + tableMeshTopology: testIndexerTableMeshTopology, + tableGatewayServices: testIndexerTableGatewayServices, } for _, table := range schema.Tables { diff --git a/agent/structs/structs.go b/agent/structs/structs.go index e4e111e140..91dfd0b4bc 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -2474,18 +2474,3 @@ func (m MessageType) String() string { return "Unknown(" + strconv.Itoa(int(m)) + ")" } - -// UpstreamDownstream pairs come from individual proxy registrations, which can be updated independently. -type UpstreamDownstream struct { - Upstream ServiceName - Downstream ServiceName - - // Refs stores the registrations that contain this pairing. - // When there are no remaining Refs, the UpstreamDownstream can be deleted. - // - // Note: This map must be treated as immutable when accessed in MemDB. - // The entire UpstreamDownstream structure must be deep copied on updates. - Refs map[string]struct{} - - RaftIndex -}