mirror of
https://github.com/status-im/consul.git
synced 2025-01-29 23:15:08 +00:00
state: convert services table service and connect indexer
To the new functional indexer pattern
This commit is contained in:
parent
395ebce510
commit
0c61abcc31
@ -911,12 +911,18 @@ func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *str
|
|||||||
|
|
||||||
func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
|
func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
|
||||||
// Function for lookup
|
// Function for lookup
|
||||||
index := "service"
|
index := indexService
|
||||||
if connect {
|
if connect {
|
||||||
index = "connect"
|
index = indexConnect
|
||||||
}
|
}
|
||||||
|
|
||||||
services, err := catalogServiceNodeList(tx, serviceName, index, entMeta)
|
// TODO: accept non-pointer
|
||||||
|
if entMeta == nil {
|
||||||
|
entMeta = structs.DefaultEnterpriseMeta()
|
||||||
|
}
|
||||||
|
|
||||||
|
q := Query{Value: serviceName, EnterpriseMeta: *entMeta}
|
||||||
|
services, err := tx.Get(tableServices, index, q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -979,8 +985,13 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
|
|||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// List all the services.
|
// TODO: accept non-pointer value
|
||||||
services, err := catalogServiceNodeList(tx, service, "service", entMeta)
|
if entMeta == nil {
|
||||||
|
entMeta = structs.DefaultEnterpriseMeta()
|
||||||
|
}
|
||||||
|
|
||||||
|
q := Query{Value: service, EnterpriseMeta: *entMeta}
|
||||||
|
services, err := tx.Get(tableServices, indexService, q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1328,8 +1339,8 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||||||
}
|
}
|
||||||
// Delete any checks associated with the service. This will invalidate
|
// Delete any checks associated with the service. This will invalidate
|
||||||
// sessions as necessary.
|
// sessions as necessary.
|
||||||
q := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta}
|
nsq := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta}
|
||||||
checks, err := tx.Get(tableChecks, indexNodeService, q)
|
checks, err := tx.Get(tableChecks, indexNodeService, nsq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed service check lookup: %s", err)
|
return fmt.Errorf("failed service check lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1368,7 +1379,8 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||||||
return fmt.Errorf("failed to clean up mesh-topology associations for %q: %v", name.String(), err)
|
return fmt.Errorf("failed to clean up mesh-topology associations for %q: %v", name.String(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, remainingService, err := firstWatchWithTxn(tx, tableServices, indexService, svc.ServiceName, entMeta); err == nil {
|
q := Query{Value: svc.ServiceName, EnterpriseMeta: *entMeta}
|
||||||
|
if remainingService, err := tx.First(tableServices, indexService, q); err == nil {
|
||||||
if remainingService != nil {
|
if remainingService != nil {
|
||||||
// We have at least one remaining service, update the index
|
// We have at least one remaining service, update the index
|
||||||
if err := catalogUpdateServiceIndexes(tx, svc.ServiceName, idx, entMeta); err != nil {
|
if err := catalogUpdateServiceIndexes(tx, svc.ServiceName, idx, entMeta); err != nil {
|
||||||
@ -1951,14 +1963,18 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
|
|||||||
}
|
}
|
||||||
|
|
||||||
func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
|
func checkServiceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
|
||||||
// Function for lookup
|
index := indexService
|
||||||
index := "service"
|
|
||||||
if connect {
|
if connect {
|
||||||
index = "connect"
|
index = indexConnect
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query the state store for the service.
|
// TODO: accept non-pointer
|
||||||
iter, err := catalogServiceNodeList(tx, serviceName, index, entMeta)
|
if entMeta == nil {
|
||||||
|
entMeta = structs.DefaultEnterpriseMeta()
|
||||||
|
}
|
||||||
|
|
||||||
|
q := Query{Value: serviceName, EnterpriseMeta: *entMeta}
|
||||||
|
iter, err := tx.Get(tableServices, index, q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -2081,8 +2097,13 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
|
|||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// Query the state store for the service.
|
// TODO: accept non-pointer value
|
||||||
iter, err := catalogServiceNodeList(tx, serviceName, "service", entMeta)
|
if entMeta == nil {
|
||||||
|
entMeta = structs.DefaultEnterpriseMeta()
|
||||||
|
}
|
||||||
|
|
||||||
|
q := Query{Value: serviceName, EnterpriseMeta: *entMeta}
|
||||||
|
iter, err := tx.Get(tableServices, indexService, q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -2739,7 +2760,8 @@ func serviceGatewayNodes(tx ReadTxn, ws memdb.WatchSet, service string, kind str
|
|||||||
maxIdx = lib.MaxUint64(maxIdx, mapping.ModifyIndex)
|
maxIdx = lib.MaxUint64(maxIdx, mapping.ModifyIndex)
|
||||||
|
|
||||||
// Look up nodes for gateway
|
// Look up nodes for gateway
|
||||||
gwServices, err := catalogServiceNodeList(tx, mapping.Gateway.Name, "service", &mapping.Gateway.EnterpriseMeta)
|
q := Query{Value: mapping.Gateway.Name, EnterpriseMeta: mapping.Gateway.EnterpriseMeta}
|
||||||
|
gwServices, err := tx.Get(tableServices, indexService, q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -105,10 +105,6 @@ func catalogServiceListByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta
|
|||||||
return tx.Get(tableServices, indexNode, Query{Value: node})
|
return tx.Get(tableServices, indexNode, Query{Value: node})
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
|
||||||
return tx.Get(tableServices, index, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
|
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
|
||||||
return tx.First(tableIndex, "id", indexServiceExtinction)
|
return tx.First(tableIndex, "id", indexServiceExtinction)
|
||||||
}
|
}
|
||||||
|
@ -216,7 +216,7 @@ func testIndexerTableServices() map[string]indexerTestCase {
|
|||||||
},
|
},
|
||||||
indexService: {
|
indexService: {
|
||||||
read: indexValue{
|
read: indexValue{
|
||||||
source: "ServiceName",
|
source: Query{Value: "ServiceName"},
|
||||||
expected: []byte("servicename\x00"),
|
expected: []byte("servicename\x00"),
|
||||||
},
|
},
|
||||||
write: indexValue{
|
write: indexValue{
|
||||||
|
@ -106,16 +106,19 @@ func servicesTableSchema() *memdb.TableSchema {
|
|||||||
Name: indexService,
|
Name: indexService,
|
||||||
AllowMissing: true,
|
AllowMissing: true,
|
||||||
Unique: false,
|
Unique: false,
|
||||||
Indexer: &memdb.StringFieldIndex{
|
Indexer: indexerSingle{
|
||||||
Field: "ServiceName",
|
readIndex: indexFromQuery,
|
||||||
Lowercase: true,
|
writeIndex: indexServiceNameFromServiceNode,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
indexConnect: {
|
indexConnect: {
|
||||||
Name: indexConnect,
|
Name: indexConnect,
|
||||||
AllowMissing: true,
|
AllowMissing: true,
|
||||||
Unique: false,
|
Unique: false,
|
||||||
Indexer: &IndexConnectService{},
|
Indexer: indexerSingle{
|
||||||
|
readIndex: indexFromQuery,
|
||||||
|
writeIndex: indexConnectNameFromServiceNode,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
indexKind: {
|
indexKind: {
|
||||||
Name: indexKind,
|
Name: indexKind,
|
||||||
@ -173,6 +176,53 @@ func indexFromNodeIdentity(raw interface{}) ([]byte, error) {
|
|||||||
return b.Bytes(), nil
|
return b.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func indexServiceNameFromServiceNode(raw interface{}) ([]byte, error) {
|
||||||
|
n, ok := raw.(*structs.ServiceNode)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n.Node == "" {
|
||||||
|
return nil, errMissingValueForIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
var b indexBuilder
|
||||||
|
b.String(strings.ToLower(n.ServiceName))
|
||||||
|
return b.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func indexConnectNameFromServiceNode(raw interface{}) ([]byte, error) {
|
||||||
|
n, ok := raw.(*structs.ServiceNode)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unexpected type %T for structs.ServiceNode index", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
name, ok := connectNameFromServiceNode(n)
|
||||||
|
if !ok {
|
||||||
|
return nil, errMissingValueForIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
var b indexBuilder
|
||||||
|
b.String(strings.ToLower(name))
|
||||||
|
return b.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func connectNameFromServiceNode(sn *structs.ServiceNode) (string, bool) {
|
||||||
|
switch {
|
||||||
|
case sn.ServiceKind == structs.ServiceKindConnectProxy:
|
||||||
|
// For proxies, this service supports Connect for the destination
|
||||||
|
return sn.ServiceProxy.DestinationServiceName, true
|
||||||
|
|
||||||
|
case sn.ServiceConnect.Native:
|
||||||
|
// For native, this service supports Connect directly
|
||||||
|
return sn.ServiceName, true
|
||||||
|
|
||||||
|
default:
|
||||||
|
// Doesn't support Connect at all
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// checksTableSchema returns a new table schema used for storing and indexing
|
// checksTableSchema returns a new table schema used for storing and indexing
|
||||||
// health check information. Health checks have a number of different attributes
|
// health check information. Health checks have a number of different attributes
|
||||||
// we want to filter by, so this table is a bit more complex.
|
// we want to filter by, so this table is a bit more complex.
|
||||||
|
@ -1,54 +0,0 @@
|
|||||||
package state
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
)
|
|
||||||
|
|
||||||
// IndexConnectService indexes a *struct.ServiceNode for querying by
|
|
||||||
// services that support Connect to some target service. This will
|
|
||||||
// properly index the proxy destination for proxies and the service name
|
|
||||||
// for native services.
|
|
||||||
type IndexConnectService struct{}
|
|
||||||
|
|
||||||
func (idx *IndexConnectService) FromObject(obj interface{}) (bool, []byte, error) {
|
|
||||||
sn, ok := obj.(*structs.ServiceNode)
|
|
||||||
if !ok {
|
|
||||||
return false, nil, fmt.Errorf("Object must be ServiceNode, got %T", obj)
|
|
||||||
}
|
|
||||||
|
|
||||||
var result []byte
|
|
||||||
switch {
|
|
||||||
case sn.ServiceKind == structs.ServiceKindConnectProxy:
|
|
||||||
// For proxies, this service supports Connect for the destination
|
|
||||||
result = []byte(strings.ToLower(sn.ServiceProxy.DestinationServiceName))
|
|
||||||
|
|
||||||
case sn.ServiceConnect.Native:
|
|
||||||
// For native, this service supports Connect directly
|
|
||||||
result = []byte(strings.ToLower(sn.ServiceName))
|
|
||||||
|
|
||||||
default:
|
|
||||||
// Doesn't support Connect at all
|
|
||||||
return false, nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return the result with the null terminator appended so we can
|
|
||||||
// differentiate prefix vs. non-prefix matches.
|
|
||||||
return true, append(result, '\x00'), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (idx *IndexConnectService) FromArgs(args ...interface{}) ([]byte, error) {
|
|
||||||
if len(args) != 1 {
|
|
||||||
return nil, fmt.Errorf("must provide only a single argument")
|
|
||||||
}
|
|
||||||
|
|
||||||
arg, ok := args[0].(string)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the null character as a terminator
|
|
||||||
return append([]byte(strings.ToLower(arg)), '\x00'), nil
|
|
||||||
}
|
|
@ -3,120 +3,53 @@ package state
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestIndexConnectService_FromObject(t *testing.T) {
|
func TestConnectNameFromServiceNode(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
Name string
|
name string
|
||||||
Input interface{}
|
input structs.ServiceNode
|
||||||
ExpectMatch bool
|
expected string
|
||||||
ExpectVal []byte
|
expectedOk bool
|
||||||
ExpectErr string
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
"not a ServiceNode",
|
name: "typical service, not native",
|
||||||
42,
|
input: structs.ServiceNode{ServiceName: "db"},
|
||||||
false,
|
expectedOk: false,
|
||||||
nil,
|
|
||||||
"ServiceNode",
|
|
||||||
},
|
},
|
||||||
|
|
||||||
{
|
{
|
||||||
"typical service, not native",
|
name: "typical service, is native",
|
||||||
&structs.ServiceNode{
|
input: structs.ServiceNode{
|
||||||
ServiceName: "db",
|
|
||||||
},
|
|
||||||
false,
|
|
||||||
nil,
|
|
||||||
"",
|
|
||||||
},
|
|
||||||
|
|
||||||
{
|
|
||||||
"typical service, is native",
|
|
||||||
&structs.ServiceNode{
|
|
||||||
ServiceName: "dB",
|
ServiceName: "dB",
|
||||||
ServiceConnect: structs.ServiceConnect{Native: true},
|
ServiceConnect: structs.ServiceConnect{Native: true},
|
||||||
},
|
},
|
||||||
true,
|
expectedOk: true,
|
||||||
[]byte("db\x00"),
|
expected: "dB",
|
||||||
"",
|
|
||||||
},
|
},
|
||||||
|
|
||||||
{
|
{
|
||||||
"proxy service",
|
name: "proxy service",
|
||||||
&structs.ServiceNode{
|
input: structs.ServiceNode{
|
||||||
ServiceKind: structs.ServiceKindConnectProxy,
|
ServiceKind: structs.ServiceKindConnectProxy,
|
||||||
ServiceName: "db",
|
ServiceName: "db",
|
||||||
ServiceProxy: structs.ConnectProxyConfig{DestinationServiceName: "fOo"},
|
ServiceProxy: structs.ConnectProxyConfig{DestinationServiceName: "fOo"},
|
||||||
},
|
},
|
||||||
true,
|
expectedOk: true,
|
||||||
[]byte("foo\x00"),
|
expected: "fOo",
|
||||||
"",
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
t.Run(tc.Name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
require := require.New(t)
|
actual, ok := connectNameFromServiceNode(&tc.input)
|
||||||
|
if !tc.expectedOk {
|
||||||
var idx IndexConnectService
|
require.False(t, ok, "expected no connect name")
|
||||||
match, val, err := idx.FromObject(tc.Input)
|
|
||||||
if tc.ExpectErr != "" {
|
|
||||||
require.Error(err)
|
|
||||||
require.Contains(err.Error(), tc.ExpectErr)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
require.NoError(err)
|
require.Equal(t, tc.expected, actual)
|
||||||
require.Equal(tc.ExpectMatch, match)
|
|
||||||
require.Equal(tc.ExpectVal, val)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIndexConnectService_FromArgs(t *testing.T) {
|
|
||||||
cases := []struct {
|
|
||||||
Name string
|
|
||||||
Args []interface{}
|
|
||||||
ExpectVal []byte
|
|
||||||
ExpectErr string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
"multiple arguments",
|
|
||||||
[]interface{}{"foo", "bar"},
|
|
||||||
nil,
|
|
||||||
"single",
|
|
||||||
},
|
|
||||||
|
|
||||||
{
|
|
||||||
"not a string",
|
|
||||||
[]interface{}{42},
|
|
||||||
nil,
|
|
||||||
"must be a string",
|
|
||||||
},
|
|
||||||
|
|
||||||
{
|
|
||||||
"string",
|
|
||||||
[]interface{}{"fOO"},
|
|
||||||
[]byte("foo\x00"),
|
|
||||||
"",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range cases {
|
|
||||||
t.Run(tc.Name, func(t *testing.T) {
|
|
||||||
require := require.New(t)
|
|
||||||
|
|
||||||
var idx IndexConnectService
|
|
||||||
val, err := idx.FromArgs(tc.Args...)
|
|
||||||
if tc.ExpectErr != "" {
|
|
||||||
require.Error(err)
|
|
||||||
require.Contains(err.Error(), tc.ExpectErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
require.NoError(err)
|
|
||||||
require.Equal(tc.ExpectVal, val)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,8 @@ func updateUsage(tx WriteTxn, changes Changes) error {
|
|||||||
func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
|
func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
|
||||||
serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
|
serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
|
||||||
for svc, delta := range serviceNameChanges {
|
for svc, delta := range serviceNameChanges {
|
||||||
serviceIter, err := getWithTxn(tx, tableServices, indexService, svc.Name, &svc.EnterpriseMeta)
|
q := Query{Value: svc.Name, EnterpriseMeta: svc.EnterpriseMeta}
|
||||||
|
serviceIter, err := tx.Get(tableServices, indexService, q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user