mirror of https://github.com/status-im/consul.git
Merge pull request #9948 from hashicorp/dnephin/state-index-service
state: convert remaining services table indexers to functions
This commit is contained in:
commit
909348e546
|
@ -899,24 +899,32 @@ func maxIndexAndWatchChsForServiceNodes(tx ReadTxn,
|
|||
func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
return serviceNodesTxn(tx, ws, serviceName, true, entMeta)
|
||||
|
||||
// TODO: accept non-pointer value
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
q := Query{Value: serviceName, EnterpriseMeta: *entMeta}
|
||||
return serviceNodesTxn(tx, ws, indexConnect, q)
|
||||
}
|
||||
|
||||
// ServiceNodes returns the nodes associated with a given service name.
|
||||
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
return serviceNodesTxn(tx, ws, serviceName, false, entMeta)
|
||||
|
||||
// TODO: accept non-pointer value
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
q := Query{Value: serviceName, EnterpriseMeta: *entMeta}
|
||||
return serviceNodesTxn(tx, ws, indexService, q)
|
||||
}
|
||||
|
||||
func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
|
||||
// Function for lookup
|
||||
index := "service"
|
||||
if connect {
|
||||
index = "connect"
|
||||
}
|
||||
|
||||
services, err := catalogServiceNodeList(tx, serviceName, index, entMeta)
|
||||
func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, index string, q Query) (uint64, structs.ServiceNodes, error) {
|
||||
connect := index == indexConnect
|
||||
serviceName := q.Value
|
||||
services, err := tx.Get(tableServices, index, q)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -934,7 +942,7 @@ func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect
|
|||
var idx uint64
|
||||
if connect {
|
||||
// Look up gateway nodes associated with the service
|
||||
gwIdx, nodes, err := serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
|
||||
gwIdx, nodes, err := serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, &q.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err)
|
||||
}
|
||||
|
@ -965,7 +973,7 @@ func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect
|
|||
// Get the table index.
|
||||
// TODO (gateways) (freddy) Why do we always consider the main service index here?
|
||||
// This doesn't seem to make sense for Connect when there's more than 1 result
|
||||
svcIdx := maxIndexForService(tx, serviceName, len(results) > 0, false, entMeta)
|
||||
svcIdx := maxIndexForService(tx, serviceName, len(results) > 0, false, &q.EnterpriseMeta)
|
||||
if idx < svcIdx {
|
||||
idx = svcIdx
|
||||
}
|
||||
|
@ -979,8 +987,13 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
|
|||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// List all the services.
|
||||
services, err := catalogServiceNodeList(tx, service, "service", entMeta)
|
||||
// TODO: accept non-pointer value
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
|
||||
q := Query{Value: service, EnterpriseMeta: *entMeta}
|
||||
services, err := tx.Get(tableServices, indexService, q)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -1328,8 +1341,8 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||
}
|
||||
// Delete any checks associated with the service. This will invalidate
|
||||
// sessions as necessary.
|
||||
q := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta}
|
||||
checks, err := tx.Get(tableChecks, indexNodeService, q)
|
||||
nsq := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta}
|
||||
checks, err := tx.Get(tableChecks, indexNodeService, nsq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service check lookup: %s", err)
|
||||
}
|
||||
|
@ -1351,7 +1364,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||
}
|
||||
|
||||
// Delete the service and update the index
|
||||
if err := tx.Delete("services", service); err != nil {
|
||||
if err := tx.Delete(tableServices, service); err != nil {
|
||||
return fmt.Errorf("failed deleting service: %s", err)
|
||||
}
|
||||
if err := catalogUpdateServicesIndexes(tx, idx, entMeta); err != nil {
|
||||
|
@ -1368,7 +1381,8 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||
return fmt.Errorf("failed to clean up mesh-topology associations for %q: %v", name.String(), err)
|
||||
}
|
||||
|
||||
if _, remainingService, err := firstWatchWithTxn(tx, "services", "service", svc.ServiceName, entMeta); err == nil {
|
||||
q := Query{Value: svc.ServiceName, EnterpriseMeta: *entMeta}
|
||||
if remainingService, err := tx.First(tableServices, indexService, q); err == nil {
|
||||
if remainingService != nil {
|
||||
// We have at least one remaining service, update the index
|
||||
if err := catalogUpdateServiceIndexes(tx, svc.ServiceName, idx, entMeta); err != nil {
|
||||
|
@ -1951,14 +1965,18 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
|
|||
}
|
||||
|
||||
func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
|
||||
// Function for lookup
|
||||
index := "service"
|
||||
index := indexService
|
||||
if connect {
|
||||
index = "connect"
|
||||
index = indexConnect
|
||||
}
|
||||
|
||||
// Query the state store for the service.
|
||||
iter, err := catalogServiceNodeList(tx, serviceName, index, entMeta)
|
||||
// TODO: accept non-pointer
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
|
||||
q := Query{Value: serviceName, EnterpriseMeta: *entMeta}
|
||||
iter, err := tx.Get(tableServices, index, q)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -2081,8 +2099,13 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
|
|||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Query the state store for the service.
|
||||
iter, err := catalogServiceNodeList(tx, serviceName, "service", entMeta)
|
||||
// TODO: accept non-pointer value
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
|
||||
q := Query{Value: serviceName, EnterpriseMeta: *entMeta}
|
||||
iter, err := tx.Get(tableServices, indexService, q)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -2283,8 +2306,11 @@ func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind,
|
|||
// entries
|
||||
idx := catalogServiceKindMaxIndex(tx, ws, kind, entMeta)
|
||||
|
||||
// Query the state store for the service.
|
||||
services, err := catalogServiceListByKind(tx, kind, entMeta)
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
q := Query{Value: string(kind), EnterpriseMeta: *entMeta}
|
||||
services, err := tx.Get(tableServices, indexKind, q)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
@ -2527,7 +2553,11 @@ func terminatingConfigGatewayServices(
|
|||
|
||||
// updateGatewayNamespace is used to target all services within a namespace
|
||||
func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewayService, entMeta *structs.EnterpriseMeta) error {
|
||||
services, err := catalogServiceListByKind(tx, structs.ServiceKindTypical, entMeta)
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
q := Query{Value: string(structs.ServiceKindTypical), EnterpriseMeta: *entMeta}
|
||||
services, err := tx.Get(tableServices, indexKind, q)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed querying services: %s", err)
|
||||
}
|
||||
|
@ -2739,7 +2769,8 @@ func serviceGatewayNodes(tx ReadTxn, ws memdb.WatchSet, service string, kind str
|
|||
maxIdx = lib.MaxUint64(maxIdx, mapping.ModifyIndex)
|
||||
|
||||
// Look up nodes for gateway
|
||||
gwServices, err := catalogServiceNodeList(tx, mapping.Gateway.Name, "service", &mapping.Gateway.EnterpriseMeta)
|
||||
q := Query{Value: mapping.Gateway.Name, EnterpriseMeta: mapping.Gateway.EnterpriseMeta}
|
||||
gwServices, err := tx.Get(tableServices, indexService, q)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
n := changeObject(change).(*structs.Node)
|
||||
markNode(n.Node, changeTypeFromChange(change))
|
||||
|
||||
case "services":
|
||||
case tableServices:
|
||||
sn := changeObject(change).(*structs.ServiceNode)
|
||||
srvChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
|
||||
markService(newNodeServiceTupleFromServiceNode(sn), srvChange)
|
||||
|
@ -303,7 +303,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
for serviceName, gsChange := range serviceChanges {
|
||||
gs := changeObject(gsChange.change).(*structs.GatewayService)
|
||||
|
||||
_, nodes, err := serviceNodesTxn(tx, nil, gs.Gateway.Name, false, &gatewayName.EnterpriseMeta)
|
||||
q := Query{Value: gs.Gateway.Name, EnterpriseMeta: gatewayName.EnterpriseMeta}
|
||||
_, nodes, err := serviceNodesTxn(tx, nil, indexService, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *structs.EnterpriseMeta) s
|
|||
|
||||
func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
|
||||
// overall services index
|
||||
if err := indexUpdateMaxTxn(tx, idx, "services"); err != nil {
|
||||
if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
|
@ -62,7 +62,7 @@ func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.Ent
|
|||
|
||||
func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
|
||||
// Insert the service and update the index
|
||||
if err := tx.Insert("services", svc); err != nil {
|
||||
if err := tx.Insert(tableServices, svc); err != nil {
|
||||
return fmt.Errorf("failed inserting service: %s", err)
|
||||
}
|
||||
|
||||
|
@ -82,7 +82,7 @@ func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
|
|||
}
|
||||
|
||||
func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
|
||||
return maxIndexTxn(tx, "services")
|
||||
return maxIndexTxn(tx, tableServices)
|
||||
}
|
||||
|
||||
func catalogServiceMaxIndex(tx ReadTxn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
||||
|
@ -97,34 +97,26 @@ func catalogServiceListNoWildcard(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.
|
|||
return tx.Get(tableServices, indexID)
|
||||
}
|
||||
|
||||
func catalogServiceListByKind(tx ReadTxn, kind structs.ServiceKind, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
return tx.Get("services", "kind", string(kind))
|
||||
}
|
||||
|
||||
func catalogServiceListByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
|
||||
return tx.Get(tableServices, indexNode, Query{Value: node})
|
||||
}
|
||||
|
||||
func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
return tx.Get("services", index, name)
|
||||
}
|
||||
|
||||
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
|
||||
return tx.First(tableIndex, "id", indexServiceExtinction)
|
||||
}
|
||||
|
||||
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {
|
||||
if checks {
|
||||
return maxIndexTxn(tx, "nodes", "services", "checks")
|
||||
return maxIndexTxn(tx, "nodes", tableServices, "checks")
|
||||
}
|
||||
return maxIndexTxn(tx, "nodes", "services")
|
||||
return maxIndexTxn(tx, "nodes", tableServices)
|
||||
}
|
||||
|
||||
func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 {
|
||||
if checks {
|
||||
return maxIndexWatchTxn(tx, ws, "nodes", "services", "checks")
|
||||
return maxIndexWatchTxn(tx, ws, "nodes", tableServices, "checks")
|
||||
}
|
||||
return maxIndexWatchTxn(tx, ws, "nodes", "services")
|
||||
return maxIndexWatchTxn(tx, ws, "nodes", tableServices)
|
||||
}
|
||||
|
||||
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
|
||||
|
|
|
@ -168,6 +168,12 @@ func testIndexerTableNodes() map[string]indexerTestCase {
|
|||
}
|
||||
|
||||
func testIndexerTableServices() map[string]indexerTestCase {
|
||||
obj := &structs.ServiceNode{
|
||||
Node: "NoDeId",
|
||||
ServiceID: "SeRviCe",
|
||||
ServiceName: "ServiceName",
|
||||
}
|
||||
|
||||
return map[string]indexerTestCase{
|
||||
indexID: {
|
||||
read: indexValue{
|
||||
|
@ -178,10 +184,7 @@ func testIndexerTableServices() map[string]indexerTestCase {
|
|||
expected: []byte("nodeid\x00service\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: &structs.ServiceNode{
|
||||
Node: "NoDeId",
|
||||
ServiceID: "SeRviCe",
|
||||
},
|
||||
source: obj,
|
||||
expected: []byte("nodeid\x00service\x00"),
|
||||
},
|
||||
prefix: []indexValue{
|
||||
|
@ -202,16 +205,48 @@ func testIndexerTableServices() map[string]indexerTestCase {
|
|||
indexNode: {
|
||||
read: indexValue{
|
||||
source: Query{
|
||||
Value: "NoDe",
|
||||
Value: "NoDeId",
|
||||
},
|
||||
expected: []byte("node\x00"),
|
||||
expected: []byte("nodeid\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: obj,
|
||||
expected: []byte("nodeid\x00"),
|
||||
},
|
||||
},
|
||||
indexService: {
|
||||
read: indexValue{
|
||||
source: Query{Value: "ServiceName"},
|
||||
expected: []byte("servicename\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: obj,
|
||||
expected: []byte("servicename\x00"),
|
||||
},
|
||||
},
|
||||
indexConnect: {
|
||||
read: indexValue{
|
||||
source: Query{Value: "ConnectName"},
|
||||
expected: []byte("connectname\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: &structs.ServiceNode{
|
||||
Node: "NoDe",
|
||||
ServiceID: "SeRvIcE",
|
||||
ServiceName: "ConnectName",
|
||||
ServiceConnect: structs.ServiceConnect{Native: true},
|
||||
},
|
||||
expected: []byte("node\x00"),
|
||||
expected: []byte("connectname\x00"),
|
||||
},
|
||||
},
|
||||
indexKind: {
|
||||
read: indexValue{
|
||||
source: Query{Value: "connect-proxy"},
|
||||
expected: []byte("connect-proxy\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: &structs.ServiceNode{
|
||||
ServiceKind: structs.ServiceKindConnectProxy,
|
||||
},
|
||||
expected: []byte("connect-proxy\x00"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -106,22 +106,28 @@ func servicesTableSchema() *memdb.TableSchema {
|
|||
Name: indexService,
|
||||
AllowMissing: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "ServiceName",
|
||||
Lowercase: true,
|
||||
Indexer: indexerSingle{
|
||||
readIndex: indexFromQuery,
|
||||
writeIndex: indexServiceNameFromServiceNode,
|
||||
},
|
||||
},
|
||||
indexConnect: {
|
||||
Name: indexConnect,
|
||||
AllowMissing: true,
|
||||
Unique: false,
|
||||
Indexer: &IndexConnectService{},
|
||||
Indexer: indexerSingle{
|
||||
readIndex: indexFromQuery,
|
||||
writeIndex: indexConnectNameFromServiceNode,
|
||||
},
|
||||
},
|
||||
indexKind: {
|
||||
Name: indexKind,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &IndexServiceKind{},
|
||||
Indexer: indexerSingle{
|
||||
readIndex: indexFromQuery,
|
||||
writeIndex: indexKindFromServiceNode,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -173,6 +179,64 @@ func indexFromNodeIdentity(raw interface{}) ([]byte, error) {
|
|||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func indexServiceNameFromServiceNode(raw interface{}) ([]byte, error) {
|
||||
n, ok := raw.(*structs.ServiceNode)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw)
|
||||
}
|
||||
|
||||
if n.Node == "" {
|
||||
return nil, errMissingValueForIndex
|
||||
}
|
||||
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(n.ServiceName))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func indexConnectNameFromServiceNode(raw interface{}) ([]byte, error) {
|
||||
n, ok := raw.(*structs.ServiceNode)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw)
|
||||
}
|
||||
|
||||
name, ok := connectNameFromServiceNode(n)
|
||||
if !ok {
|
||||
return nil, errMissingValueForIndex
|
||||
}
|
||||
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(name))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func connectNameFromServiceNode(sn *structs.ServiceNode) (string, bool) {
|
||||
switch {
|
||||
case sn.ServiceKind == structs.ServiceKindConnectProxy:
|
||||
// For proxies, this service supports Connect for the destination
|
||||
return sn.ServiceProxy.DestinationServiceName, true
|
||||
|
||||
case sn.ServiceConnect.Native:
|
||||
// For native, this service supports Connect directly
|
||||
return sn.ServiceName, true
|
||||
|
||||
default:
|
||||
// Doesn't support Connect at all
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
|
||||
func indexKindFromServiceNode(raw interface{}) ([]byte, error) {
|
||||
n, ok := raw.(*structs.ServiceNode)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw)
|
||||
}
|
||||
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(string(n.ServiceKind)))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
// checksTableSchema returns a new table schema used for storing and indexing
|
||||
// health check information. Health checks have a number of different attributes
|
||||
// we want to filter by, so this table is a bit more complex.
|
||||
|
|
|
@ -1316,7 +1316,7 @@ func TestStateStore_DeleteNode(t *testing.T) {
|
|||
}
|
||||
|
||||
// Indexes were updated.
|
||||
for _, tbl := range []string{"nodes", "services", "checks"} {
|
||||
for _, tbl := range []string{"nodes", tableServices, "checks"} {
|
||||
if idx := s.maxIndex(tbl); idx != 3 {
|
||||
t.Fatalf("bad index: %d (%s)", idx, tbl)
|
||||
}
|
||||
|
@ -1479,7 +1479,7 @@ func TestStateStore_EnsureService(t *testing.T) {
|
|||
}
|
||||
|
||||
// Index tables were updated.
|
||||
if idx := s.maxIndex("services"); idx != 30 {
|
||||
if idx := s.maxIndex(tableServices); idx != 30 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
|
@ -1510,7 +1510,7 @@ func TestStateStore_EnsureService(t *testing.T) {
|
|||
}
|
||||
|
||||
// Index tables were updated.
|
||||
if idx := s.maxIndex("services"); idx != 40 {
|
||||
if idx := s.maxIndex(tableServices); idx != 40 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
}
|
||||
|
@ -2073,7 +2073,7 @@ func TestStateStore_DeleteService(t *testing.T) {
|
|||
}
|
||||
|
||||
// Index tables were updated.
|
||||
if idx := s.maxIndex("services"); idx != 4 {
|
||||
if idx := s.maxIndex(tableServices); idx != 4 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if idx := s.maxIndex("checks"); idx != 4 {
|
||||
|
@ -2085,7 +2085,7 @@ func TestStateStore_DeleteService(t *testing.T) {
|
|||
if err := s.DeleteService(5, "node1", "service1", nil); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx := s.maxIndex("services"); idx != 4 {
|
||||
if idx := s.maxIndex(tableServices); idx != 4 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if watchFired(ws) {
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// IndexConnectService indexes a *struct.ServiceNode for querying by
|
||||
// services that support Connect to some target service. This will
|
||||
// properly index the proxy destination for proxies and the service name
|
||||
// for native services.
|
||||
type IndexConnectService struct{}
|
||||
|
||||
func (idx *IndexConnectService) FromObject(obj interface{}) (bool, []byte, error) {
|
||||
sn, ok := obj.(*structs.ServiceNode)
|
||||
if !ok {
|
||||
return false, nil, fmt.Errorf("Object must be ServiceNode, got %T", obj)
|
||||
}
|
||||
|
||||
var result []byte
|
||||
switch {
|
||||
case sn.ServiceKind == structs.ServiceKindConnectProxy:
|
||||
// For proxies, this service supports Connect for the destination
|
||||
result = []byte(strings.ToLower(sn.ServiceProxy.DestinationServiceName))
|
||||
|
||||
case sn.ServiceConnect.Native:
|
||||
// For native, this service supports Connect directly
|
||||
result = []byte(strings.ToLower(sn.ServiceName))
|
||||
|
||||
default:
|
||||
// Doesn't support Connect at all
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
// Return the result with the null terminator appended so we can
|
||||
// differentiate prefix vs. non-prefix matches.
|
||||
return true, append(result, '\x00'), nil
|
||||
}
|
||||
|
||||
func (idx *IndexConnectService) FromArgs(args ...interface{}) ([]byte, error) {
|
||||
if len(args) != 1 {
|
||||
return nil, fmt.Errorf("must provide only a single argument")
|
||||
}
|
||||
|
||||
arg, ok := args[0].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
|
||||
}
|
||||
|
||||
// Add the null character as a terminator
|
||||
return append([]byte(strings.ToLower(arg)), '\x00'), nil
|
||||
}
|
|
@ -3,120 +3,53 @@ package state
|
|||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func TestIndexConnectService_FromObject(t *testing.T) {
|
||||
func TestConnectNameFromServiceNode(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
Input interface{}
|
||||
ExpectMatch bool
|
||||
ExpectVal []byte
|
||||
ExpectErr string
|
||||
name string
|
||||
input structs.ServiceNode
|
||||
expected string
|
||||
expectedOk bool
|
||||
}{
|
||||
{
|
||||
"not a ServiceNode",
|
||||
42,
|
||||
false,
|
||||
nil,
|
||||
"ServiceNode",
|
||||
name: "typical service, not native",
|
||||
input: structs.ServiceNode{ServiceName: "db"},
|
||||
expectedOk: false,
|
||||
},
|
||||
|
||||
{
|
||||
"typical service, not native",
|
||||
&structs.ServiceNode{
|
||||
ServiceName: "db",
|
||||
},
|
||||
false,
|
||||
nil,
|
||||
"",
|
||||
},
|
||||
|
||||
{
|
||||
"typical service, is native",
|
||||
&structs.ServiceNode{
|
||||
name: "typical service, is native",
|
||||
input: structs.ServiceNode{
|
||||
ServiceName: "dB",
|
||||
ServiceConnect: structs.ServiceConnect{Native: true},
|
||||
},
|
||||
true,
|
||||
[]byte("db\x00"),
|
||||
"",
|
||||
expectedOk: true,
|
||||
expected: "dB",
|
||||
},
|
||||
|
||||
{
|
||||
"proxy service",
|
||||
&structs.ServiceNode{
|
||||
name: "proxy service",
|
||||
input: structs.ServiceNode{
|
||||
ServiceKind: structs.ServiceKindConnectProxy,
|
||||
ServiceName: "db",
|
||||
ServiceProxy: structs.ConnectProxyConfig{DestinationServiceName: "fOo"},
|
||||
},
|
||||
true,
|
||||
[]byte("foo\x00"),
|
||||
"",
|
||||
expectedOk: true,
|
||||
expected: "fOo",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
var idx IndexConnectService
|
||||
match, val, err := idx.FromObject(tc.Input)
|
||||
if tc.ExpectErr != "" {
|
||||
require.Error(err)
|
||||
require.Contains(err.Error(), tc.ExpectErr)
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actual, ok := connectNameFromServiceNode(&tc.input)
|
||||
if !tc.expectedOk {
|
||||
require.False(t, ok, "expected no connect name")
|
||||
return
|
||||
}
|
||||
require.NoError(err)
|
||||
require.Equal(tc.ExpectMatch, match)
|
||||
require.Equal(tc.ExpectVal, val)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexConnectService_FromArgs(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
Args []interface{}
|
||||
ExpectVal []byte
|
||||
ExpectErr string
|
||||
}{
|
||||
{
|
||||
"multiple arguments",
|
||||
[]interface{}{"foo", "bar"},
|
||||
nil,
|
||||
"single",
|
||||
},
|
||||
|
||||
{
|
||||
"not a string",
|
||||
[]interface{}{42},
|
||||
nil,
|
||||
"must be a string",
|
||||
},
|
||||
|
||||
{
|
||||
"string",
|
||||
[]interface{}{"fOO"},
|
||||
[]byte("foo\x00"),
|
||||
"",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
var idx IndexConnectService
|
||||
val, err := idx.FromArgs(tc.Args...)
|
||||
if tc.ExpectErr != "" {
|
||||
require.Error(err)
|
||||
require.Contains(err.Error(), tc.ExpectErr)
|
||||
return
|
||||
}
|
||||
require.NoError(err)
|
||||
require.Equal(tc.ExpectVal, val)
|
||||
require.Equal(t, tc.expected, actual)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// IndexServiceKind indexes a *struct.ServiceNode for querying by
|
||||
// the services kind. We need a custom indexer because of the default
|
||||
// kind being the empty string. The StringFieldIndex in memdb seems to
|
||||
// treate the empty string as missing and doesn't work correctly when we actually
|
||||
// want to index ""
|
||||
type IndexServiceKind struct{}
|
||||
|
||||
func (idx *IndexServiceKind) FromObject(obj interface{}) (bool, []byte, error) {
|
||||
sn, ok := obj.(*structs.ServiceNode)
|
||||
if !ok {
|
||||
return false, nil, fmt.Errorf("Object must be ServiceNode, got %T", obj)
|
||||
}
|
||||
|
||||
return true, append([]byte(strings.ToLower(string(sn.ServiceKind))), '\x00'), nil
|
||||
}
|
||||
|
||||
func (idx *IndexServiceKind) FromArgs(args ...interface{}) ([]byte, error) {
|
||||
if len(args) != 1 {
|
||||
return nil, fmt.Errorf("must provide only a single argument")
|
||||
}
|
||||
|
||||
arg, ok := args[0].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("argument must be a structs.ServiceKind: %#v", args[0])
|
||||
}
|
||||
|
||||
// Add the null character as a terminator
|
||||
return append([]byte(strings.ToLower(arg)), '\x00'), nil
|
||||
}
|
|
@ -283,7 +283,7 @@ func TestStateStore_maxIndex(t *testing.T) {
|
|||
testRegisterNode(t, s, 1, "bar")
|
||||
testRegisterService(t, s, 2, "foo", "consul")
|
||||
|
||||
if max := s.maxIndex("nodes", "services"); max != 2 {
|
||||
if max := s.maxIndex("nodes", tableServices); max != 2 {
|
||||
t.Fatalf("bad max: %d", max)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ func updateUsage(tx WriteTxn, changes Changes) error {
|
|||
switch change.Table {
|
||||
case "nodes":
|
||||
usageDeltas[change.Table] += delta
|
||||
case "services":
|
||||
case tableServices:
|
||||
svc := changeObject(change).(*structs.ServiceNode)
|
||||
usageDeltas[change.Table] += delta
|
||||
addEnterpriseServiceInstanceUsage(usageDeltas, change)
|
||||
|
@ -107,7 +107,8 @@ func updateUsage(tx WriteTxn, changes Changes) error {
|
|||
func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
|
||||
serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
|
||||
for svc, delta := range serviceNameChanges {
|
||||
serviceIter, err := getWithTxn(tx, tableServices, "service", svc.Name, &svc.EnterpriseMeta)
|
||||
q := Query{Value: svc.Name, EnterpriseMeta: svc.EnterpriseMeta}
|
||||
serviceIter, err := tx.Get(tableServices, indexService, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue