diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 0825f86cc8..3824095c05 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -899,24 +899,32 @@ func maxIndexAndWatchChsForServiceNodes(tx ReadTxn, func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { tx := s.db.ReadTxn() defer tx.Abort() - return serviceNodesTxn(tx, ws, serviceName, true, entMeta) + + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + q := Query{Value: serviceName, EnterpriseMeta: *entMeta} + return serviceNodesTxn(tx, ws, indexConnect, q) } // ServiceNodes returns the nodes associated with a given service name. func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { tx := s.db.ReadTxn() defer tx.Abort() - return serviceNodesTxn(tx, ws, serviceName, false, entMeta) + + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + q := Query{Value: serviceName, EnterpriseMeta: *entMeta} + return serviceNodesTxn(tx, ws, indexService, q) } -func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { - // Function for lookup - index := "service" - if connect { - index = "connect" - } - - services, err := catalogServiceNodeList(tx, serviceName, index, entMeta) +func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, index string, q Query) (uint64, structs.ServiceNodes, error) { + connect := index == indexConnect + serviceName := q.Value + services, err := tx.Get(tableServices, index, q) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } @@ -934,7 +942,7 @@ func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect var idx uint64 if connect { // Look up gateway nodes associated with the service - gwIdx, nodes, err := serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta) + gwIdx, nodes, err := serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, &q.EnterpriseMeta) if err != nil { return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err) } @@ -965,7 +973,7 @@ func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect // Get the table index. // TODO (gateways) (freddy) Why do we always consider the main service index here? // This doesn't seem to make sense for Connect when there's more than 1 result - svcIdx := maxIndexForService(tx, serviceName, len(results) > 0, false, entMeta) + svcIdx := maxIndexForService(tx, serviceName, len(results) > 0, false, &q.EnterpriseMeta) if idx < svcIdx { idx = svcIdx } @@ -979,8 +987,13 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string tx := s.db.Txn(false) defer tx.Abort() - // List all the services. - services, err := catalogServiceNodeList(tx, service, "service", entMeta) + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + + q := Query{Value: service, EnterpriseMeta: *entMeta} + services, err := tx.Get(tableServices, indexService, q) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } @@ -1328,8 +1341,8 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st } // Delete any checks associated with the service. This will invalidate // sessions as necessary. - q := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta} - checks, err := tx.Get(tableChecks, indexNodeService, q) + nsq := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta} + checks, err := tx.Get(tableChecks, indexNodeService, nsq) if err != nil { return fmt.Errorf("failed service check lookup: %s", err) } @@ -1351,7 +1364,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st } // Delete the service and update the index - if err := tx.Delete("services", service); err != nil { + if err := tx.Delete(tableServices, service); err != nil { return fmt.Errorf("failed deleting service: %s", err) } if err := catalogUpdateServicesIndexes(tx, idx, entMeta); err != nil { @@ -1368,7 +1381,8 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st return fmt.Errorf("failed to clean up mesh-topology associations for %q: %v", name.String(), err) } - if _, remainingService, err := firstWatchWithTxn(tx, "services", "service", svc.ServiceName, entMeta); err == nil { + q := Query{Value: svc.ServiceName, EnterpriseMeta: *entMeta} + if remainingService, err := tx.First(tableServices, indexService, q); err == nil { if remainingService != nil { // We have at least one remaining service, update the index if err := catalogUpdateServiceIndexes(tx, svc.ServiceName, idx, entMeta); err != nil { @@ -1951,14 +1965,18 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect } func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { - // Function for lookup - index := "service" + index := indexService if connect { - index = "connect" + index = indexConnect } - // Query the state store for the service. - iter, err := catalogServiceNodeList(tx, serviceName, index, entMeta) + // TODO: accept non-pointer + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + + q := Query{Value: serviceName, EnterpriseMeta: *entMeta} + iter, err := tx.Get(tableServices, index, q) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } @@ -2081,8 +2099,13 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags tx := s.db.Txn(false) defer tx.Abort() - // Query the state store for the service. - iter, err := catalogServiceNodeList(tx, serviceName, "service", entMeta) + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + + q := Query{Value: serviceName, EnterpriseMeta: *entMeta} + iter, err := tx.Get(tableServices, indexService, q) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } @@ -2283,8 +2306,11 @@ func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, // entries idx := catalogServiceKindMaxIndex(tx, ws, kind, entMeta) - // Query the state store for the service. - services, err := catalogServiceListByKind(tx, kind, entMeta) + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + q := Query{Value: string(kind), EnterpriseMeta: *entMeta} + services, err := tx.Get(tableServices, indexKind, q) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } @@ -2527,7 +2553,11 @@ func terminatingConfigGatewayServices( // updateGatewayNamespace is used to target all services within a namespace func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error { - services, err := catalogServiceListByKind(tx, structs.ServiceKindTypical, entMeta) + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + q := Query{Value: string(structs.ServiceKindTypical), EnterpriseMeta: *entMeta} + services, err := tx.Get(tableServices, indexKind, q) if err != nil { return fmt.Errorf("failed querying services: %s", err) } @@ -2739,7 +2769,8 @@ func serviceGatewayNodes(tx ReadTxn, ws memdb.WatchSet, service string, kind str maxIdx = lib.MaxUint64(maxIdx, mapping.ModifyIndex) // Look up nodes for gateway - gwServices, err := catalogServiceNodeList(tx, mapping.Gateway.Name, "service", &mapping.Gateway.EnterpriseMeta) + q := Query{Value: mapping.Gateway.Name, EnterpriseMeta: mapping.Gateway.EnterpriseMeta} + gwServices, err := tx.Get(tableServices, indexService, q) if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 4e407baeb3..92f4745795 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -170,7 +170,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event n := changeObject(change).(*structs.Node) markNode(n.Node, changeTypeFromChange(change)) - case "services": + case tableServices: sn := changeObject(change).(*structs.ServiceNode) srvChange := serviceChange{changeType: changeTypeFromChange(change), change: change} markService(newNodeServiceTupleFromServiceNode(sn), srvChange) @@ -303,7 +303,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event for serviceName, gsChange := range serviceChanges { gs := changeObject(gsChange.change).(*structs.GatewayService) - _, nodes, err := serviceNodesTxn(tx, nil, gs.Gateway.Name, false, &gatewayName.EnterpriseMeta) + q := Query{Value: gs.Gateway.Name, EnterpriseMeta: gatewayName.EnterpriseMeta} + _, nodes, err := serviceNodesTxn(tx, nil, indexService, q) if err != nil { return nil, err } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 7ec173d0e2..49b0f6d2df 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -28,7 +28,7 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) s func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { // overall services index - if err := indexUpdateMaxTxn(tx, idx, "services"); err != nil { + if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -62,7 +62,7 @@ func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.Ent func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error { // Insert the service and update the index - if err := tx.Insert("services", svc); err != nil { + if err := tx.Insert(tableServices, svc); err != nil { return fmt.Errorf("failed inserting service: %s", err) } @@ -82,7 +82,7 @@ func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error { } func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 { - return maxIndexTxn(tx, "services") + return maxIndexTxn(tx, tableServices) } func catalogServiceMaxIndex(tx ReadTxn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { @@ -97,34 +97,26 @@ func catalogServiceListNoWildcard(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb. return tx.Get(tableServices, indexID) } -func catalogServiceListByKind(tx ReadTxn, kind structs.ServiceKind, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get("services", "kind", string(kind)) -} - func catalogServiceListByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) { return tx.Get(tableServices, indexNode, Query{Value: node}) } -func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get("services", index, name) -} - func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) { return tx.First(tableIndex, "id", indexServiceExtinction) } func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 { if checks { - return maxIndexTxn(tx, "nodes", "services", "checks") + return maxIndexTxn(tx, "nodes", tableServices, "checks") } - return maxIndexTxn(tx, "nodes", "services") + return maxIndexTxn(tx, "nodes", tableServices) } func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 { if checks { - return maxIndexWatchTxn(tx, ws, "nodes", "services", "checks") + return maxIndexWatchTxn(tx, ws, "nodes", tableServices, "checks") } - return maxIndexWatchTxn(tx, ws, "nodes", "services") + return maxIndexWatchTxn(tx, ws, "nodes", tableServices) } func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go index 925c130c4b..c33b12df9a 100644 --- a/agent/consul/state/catalog_oss_test.go +++ b/agent/consul/state/catalog_oss_test.go @@ -168,6 +168,12 @@ func testIndexerTableNodes() map[string]indexerTestCase { } func testIndexerTableServices() map[string]indexerTestCase { + obj := &structs.ServiceNode{ + Node: "NoDeId", + ServiceID: "SeRviCe", + ServiceName: "ServiceName", + } + return map[string]indexerTestCase{ indexID: { read: indexValue{ @@ -178,10 +184,7 @@ func testIndexerTableServices() map[string]indexerTestCase { expected: []byte("nodeid\x00service\x00"), }, write: indexValue{ - source: &structs.ServiceNode{ - Node: "NoDeId", - ServiceID: "SeRviCe", - }, + source: obj, expected: []byte("nodeid\x00service\x00"), }, prefix: []indexValue{ @@ -202,16 +205,48 @@ func testIndexerTableServices() map[string]indexerTestCase { indexNode: { read: indexValue{ source: Query{ - Value: "NoDe", + Value: "NoDeId", }, - expected: []byte("node\x00"), + expected: []byte("nodeid\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("nodeid\x00"), + }, + }, + indexService: { + read: indexValue{ + source: Query{Value: "ServiceName"}, + expected: []byte("servicename\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("servicename\x00"), + }, + }, + indexConnect: { + read: indexValue{ + source: Query{Value: "ConnectName"}, + expected: []byte("connectname\x00"), }, write: indexValue{ source: &structs.ServiceNode{ - Node: "NoDe", - ServiceID: "SeRvIcE", + ServiceName: "ConnectName", + ServiceConnect: structs.ServiceConnect{Native: true}, }, - expected: []byte("node\x00"), + expected: []byte("connectname\x00"), + }, + }, + indexKind: { + read: indexValue{ + source: Query{Value: "connect-proxy"}, + expected: []byte("connect-proxy\x00"), + }, + write: indexValue{ + source: &structs.ServiceNode{ + ServiceKind: structs.ServiceKindConnectProxy, + }, + expected: []byte("connect-proxy\x00"), }, }, } diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index aa0ee50177..e9915d9c9a 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -106,22 +106,28 @@ func servicesTableSchema() *memdb.TableSchema { Name: indexService, AllowMissing: true, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "ServiceName", - Lowercase: true, + Indexer: indexerSingle{ + readIndex: indexFromQuery, + writeIndex: indexServiceNameFromServiceNode, }, }, indexConnect: { Name: indexConnect, AllowMissing: true, Unique: false, - Indexer: &IndexConnectService{}, + Indexer: indexerSingle{ + readIndex: indexFromQuery, + writeIndex: indexConnectNameFromServiceNode, + }, }, indexKind: { Name: indexKind, AllowMissing: false, Unique: false, - Indexer: &IndexServiceKind{}, + Indexer: indexerSingle{ + readIndex: indexFromQuery, + writeIndex: indexKindFromServiceNode, + }, }, }, } @@ -173,6 +179,64 @@ func indexFromNodeIdentity(raw interface{}) ([]byte, error) { return b.Bytes(), nil } +func indexServiceNameFromServiceNode(raw interface{}) ([]byte, error) { + n, ok := raw.(*structs.ServiceNode) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw) + } + + if n.Node == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(n.ServiceName)) + return b.Bytes(), nil +} + +func indexConnectNameFromServiceNode(raw interface{}) ([]byte, error) { + n, ok := raw.(*structs.ServiceNode) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw) + } + + name, ok := connectNameFromServiceNode(n) + if !ok { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(name)) + return b.Bytes(), nil +} + +func connectNameFromServiceNode(sn *structs.ServiceNode) (string, bool) { + switch { + case sn.ServiceKind == structs.ServiceKindConnectProxy: + // For proxies, this service supports Connect for the destination + return sn.ServiceProxy.DestinationServiceName, true + + case sn.ServiceConnect.Native: + // For native, this service supports Connect directly + return sn.ServiceName, true + + default: + // Doesn't support Connect at all + return "", false + } +} + +func indexKindFromServiceNode(raw interface{}) ([]byte, error) { + n, ok := raw.(*structs.ServiceNode) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw) + } + + var b indexBuilder + b.String(strings.ToLower(string(n.ServiceKind))) + return b.Bytes(), nil +} + // checksTableSchema returns a new table schema used for storing and indexing // health check information. Health checks have a number of different attributes // we want to filter by, so this table is a bit more complex. diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index b68a76aca2..ee3f8db14b 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1316,7 +1316,7 @@ func TestStateStore_DeleteNode(t *testing.T) { } // Indexes were updated. - for _, tbl := range []string{"nodes", "services", "checks"} { + for _, tbl := range []string{"nodes", tableServices, "checks"} { if idx := s.maxIndex(tbl); idx != 3 { t.Fatalf("bad index: %d (%s)", idx, tbl) } @@ -1479,7 +1479,7 @@ func TestStateStore_EnsureService(t *testing.T) { } // Index tables were updated. - if idx := s.maxIndex("services"); idx != 30 { + if idx := s.maxIndex(tableServices); idx != 30 { t.Fatalf("bad index: %d", idx) } @@ -1510,7 +1510,7 @@ func TestStateStore_EnsureService(t *testing.T) { } // Index tables were updated. - if idx := s.maxIndex("services"); idx != 40 { + if idx := s.maxIndex(tableServices); idx != 40 { t.Fatalf("bad index: %d", idx) } } @@ -2073,7 +2073,7 @@ func TestStateStore_DeleteService(t *testing.T) { } // Index tables were updated. - if idx := s.maxIndex("services"); idx != 4 { + if idx := s.maxIndex(tableServices); idx != 4 { t.Fatalf("bad index: %d", idx) } if idx := s.maxIndex("checks"); idx != 4 { @@ -2085,7 +2085,7 @@ func TestStateStore_DeleteService(t *testing.T) { if err := s.DeleteService(5, "node1", "service1", nil); err != nil { t.Fatalf("err: %s", err) } - if idx := s.maxIndex("services"); idx != 4 { + if idx := s.maxIndex(tableServices); idx != 4 { t.Fatalf("bad index: %d", idx) } if watchFired(ws) { diff --git a/agent/consul/state/index_connect.go b/agent/consul/state/index_connect.go deleted file mode 100644 index 2cc455ad56..0000000000 --- a/agent/consul/state/index_connect.go +++ /dev/null @@ -1,54 +0,0 @@ -package state - -import ( - "fmt" - "strings" - - "github.com/hashicorp/consul/agent/structs" -) - -// IndexConnectService indexes a *struct.ServiceNode for querying by -// services that support Connect to some target service. This will -// properly index the proxy destination for proxies and the service name -// for native services. -type IndexConnectService struct{} - -func (idx *IndexConnectService) FromObject(obj interface{}) (bool, []byte, error) { - sn, ok := obj.(*structs.ServiceNode) - if !ok { - return false, nil, fmt.Errorf("Object must be ServiceNode, got %T", obj) - } - - var result []byte - switch { - case sn.ServiceKind == structs.ServiceKindConnectProxy: - // For proxies, this service supports Connect for the destination - result = []byte(strings.ToLower(sn.ServiceProxy.DestinationServiceName)) - - case sn.ServiceConnect.Native: - // For native, this service supports Connect directly - result = []byte(strings.ToLower(sn.ServiceName)) - - default: - // Doesn't support Connect at all - return false, nil, nil - } - - // Return the result with the null terminator appended so we can - // differentiate prefix vs. non-prefix matches. - return true, append(result, '\x00'), nil -} - -func (idx *IndexConnectService) FromArgs(args ...interface{}) ([]byte, error) { - if len(args) != 1 { - return nil, fmt.Errorf("must provide only a single argument") - } - - arg, ok := args[0].(string) - if !ok { - return nil, fmt.Errorf("argument must be a string: %#v", args[0]) - } - - // Add the null character as a terminator - return append([]byte(strings.ToLower(arg)), '\x00'), nil -} diff --git a/agent/consul/state/index_connect_test.go b/agent/consul/state/index_connect_test.go index 46371bfdfb..9331318e55 100644 --- a/agent/consul/state/index_connect_test.go +++ b/agent/consul/state/index_connect_test.go @@ -3,120 +3,53 @@ package state import ( "testing" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" ) -func TestIndexConnectService_FromObject(t *testing.T) { +func TestConnectNameFromServiceNode(t *testing.T) { cases := []struct { - Name string - Input interface{} - ExpectMatch bool - ExpectVal []byte - ExpectErr string + name string + input structs.ServiceNode + expected string + expectedOk bool }{ { - "not a ServiceNode", - 42, - false, - nil, - "ServiceNode", + name: "typical service, not native", + input: structs.ServiceNode{ServiceName: "db"}, + expectedOk: false, }, { - "typical service, not native", - &structs.ServiceNode{ - ServiceName: "db", - }, - false, - nil, - "", - }, - - { - "typical service, is native", - &structs.ServiceNode{ + name: "typical service, is native", + input: structs.ServiceNode{ ServiceName: "dB", ServiceConnect: structs.ServiceConnect{Native: true}, }, - true, - []byte("db\x00"), - "", + expectedOk: true, + expected: "dB", }, - { - "proxy service", - &structs.ServiceNode{ + name: "proxy service", + input: structs.ServiceNode{ ServiceKind: structs.ServiceKindConnectProxy, ServiceName: "db", ServiceProxy: structs.ConnectProxyConfig{DestinationServiceName: "fOo"}, }, - true, - []byte("foo\x00"), - "", + expectedOk: true, + expected: "fOo", }, } for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - require := require.New(t) - - var idx IndexConnectService - match, val, err := idx.FromObject(tc.Input) - if tc.ExpectErr != "" { - require.Error(err) - require.Contains(err.Error(), tc.ExpectErr) + t.Run(tc.name, func(t *testing.T) { + actual, ok := connectNameFromServiceNode(&tc.input) + if !tc.expectedOk { + require.False(t, ok, "expected no connect name") return } - require.NoError(err) - require.Equal(tc.ExpectMatch, match) - require.Equal(tc.ExpectVal, val) - }) - } -} - -func TestIndexConnectService_FromArgs(t *testing.T) { - cases := []struct { - Name string - Args []interface{} - ExpectVal []byte - ExpectErr string - }{ - { - "multiple arguments", - []interface{}{"foo", "bar"}, - nil, - "single", - }, - - { - "not a string", - []interface{}{42}, - nil, - "must be a string", - }, - - { - "string", - []interface{}{"fOO"}, - []byte("foo\x00"), - "", - }, - } - - for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - require := require.New(t) - - var idx IndexConnectService - val, err := idx.FromArgs(tc.Args...) - if tc.ExpectErr != "" { - require.Error(err) - require.Contains(err.Error(), tc.ExpectErr) - return - } - require.NoError(err) - require.Equal(tc.ExpectVal, val) + require.Equal(t, tc.expected, actual) }) } } diff --git a/agent/consul/state/index_service_kind.go b/agent/consul/state/index_service_kind.go deleted file mode 100644 index 4426aed30a..0000000000 --- a/agent/consul/state/index_service_kind.go +++ /dev/null @@ -1,38 +0,0 @@ -package state - -import ( - "fmt" - "strings" - - "github.com/hashicorp/consul/agent/structs" -) - -// IndexServiceKind indexes a *struct.ServiceNode for querying by -// the services kind. We need a custom indexer because of the default -// kind being the empty string. The StringFieldIndex in memdb seems to -// treate the empty string as missing and doesn't work correctly when we actually -// want to index "" -type IndexServiceKind struct{} - -func (idx *IndexServiceKind) FromObject(obj interface{}) (bool, []byte, error) { - sn, ok := obj.(*structs.ServiceNode) - if !ok { - return false, nil, fmt.Errorf("Object must be ServiceNode, got %T", obj) - } - - return true, append([]byte(strings.ToLower(string(sn.ServiceKind))), '\x00'), nil -} - -func (idx *IndexServiceKind) FromArgs(args ...interface{}) ([]byte, error) { - if len(args) != 1 { - return nil, fmt.Errorf("must provide only a single argument") - } - - arg, ok := args[0].(string) - if !ok { - return nil, fmt.Errorf("argument must be a structs.ServiceKind: %#v", args[0]) - } - - // Add the null character as a terminator - return append([]byte(strings.ToLower(arg)), '\x00'), nil -} diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index 985e7540ba..bcc9ad5933 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -283,7 +283,7 @@ func TestStateStore_maxIndex(t *testing.T) { testRegisterNode(t, s, 1, "bar") testRegisterService(t, s, 2, "foo", "consul") - if max := s.maxIndex("nodes", "services"); max != 2 { + if max := s.maxIndex("nodes", tableServices); max != 2 { t.Fatalf("bad max: %d", max) } } diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go index f9720b3e0f..536c49a6ca 100644 --- a/agent/consul/state/usage.go +++ b/agent/consul/state/usage.go @@ -70,7 +70,7 @@ func updateUsage(tx WriteTxn, changes Changes) error { switch change.Table { case "nodes": usageDeltas[change.Table] += delta - case "services": + case tableServices: svc := changeObject(change).(*structs.ServiceNode) usageDeltas[change.Table] += delta addEnterpriseServiceInstanceUsage(usageDeltas, change) @@ -107,7 +107,8 @@ func updateUsage(tx WriteTxn, changes Changes) error { func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) { serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges)) for svc, delta := range serviceNameChanges { - serviceIter, err := getWithTxn(tx, tableServices, "service", svc.Name, &svc.EnterpriseMeta) + q := Query{Value: svc.Name, EnterpriseMeta: svc.EnterpriseMeta} + serviceIter, err := tx.Get(tableServices, indexService, q) if err != nil { return nil, err }