mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 06:16:08 +00:00
Merge pull request #9916 from hashicorp/dnephin/state-index-checks-id
state: convert checks.ID index to the functional indexer pattern
This commit is contained in:
commit
2e917e3f9c
@ -141,7 +141,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
|
|||||||
// node info above to make sure we actually need to update the service
|
// node info above to make sure we actually need to update the service
|
||||||
// definition in order to prevent useless churn if nothing has changed.
|
// definition in order to prevent useless churn if nothing has changed.
|
||||||
if req.Service != nil {
|
if req.Service != nil {
|
||||||
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID})
|
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed service lookup: %s", err)
|
return fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -194,7 +194,7 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi
|
|||||||
if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID {
|
if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID {
|
||||||
// Look up the existing node's Serf health check to see if it's failed.
|
// Look up the existing node's Serf health check to see if it's failed.
|
||||||
// If it is, the node can be renamed.
|
// If it is, the node can be renamed.
|
||||||
_, enodeCheck, err := firstWatchCompoundWithTxn(tx, "checks", "id", structs.DefaultEnterpriseMeta(), enode.Node, string(structs.SerfCheckID))
|
enodeCheck, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err)
|
return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err)
|
||||||
}
|
}
|
||||||
@ -602,7 +602,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed")
|
|||||||
// Returns an error if the write didn't happen and nil if write was successful.
|
// Returns an error if the write didn't happen and nil if write was successful.
|
||||||
func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error {
|
func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error {
|
||||||
// Retrieve the existing service.
|
// Retrieve the existing service.
|
||||||
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
|
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed service lookup: %s", err)
|
return fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -627,7 +627,7 @@ func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.Node
|
|||||||
// existing memdb transaction.
|
// existing memdb transaction.
|
||||||
func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
|
func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error {
|
||||||
// Check for existing service
|
// Check for existing service
|
||||||
_, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
|
existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed service lookup: %s", err)
|
return fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1148,7 +1148,7 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Query the service
|
// Query the service
|
||||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
|
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
|
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
|
||||||
}
|
}
|
||||||
@ -1321,8 +1321,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||||||
entMeta = structs.DefaultEnterpriseMeta()
|
entMeta = structs.DefaultEnterpriseMeta()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Look up the service.
|
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
|
||||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed service lookup: %s", err)
|
return fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1471,7 +1470,7 @@ func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthChe
|
|||||||
// checks with no matching node or service.
|
// checks with no matching node or service.
|
||||||
func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
|
func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
|
||||||
// Check if we have an existing health check
|
// Check if we have an existing health check
|
||||||
_, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID))
|
existing, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed health check lookup: %s", err)
|
return fmt.Errorf("failed health check lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1503,7 +1502,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc
|
|||||||
// If the check is associated with a service, check that we have
|
// If the check is associated with a service, check that we have
|
||||||
// a registration for the service.
|
// a registration for the service.
|
||||||
if hc.ServiceID != "" {
|
if hc.ServiceID != "" {
|
||||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID})
|
service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed service lookup: %s", err)
|
return fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1578,8 +1577,13 @@ func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta
|
|||||||
// Get the table index.
|
// Get the table index.
|
||||||
idx := catalogChecksMaxIndex(tx, entMeta)
|
idx := catalogChecksMaxIndex(tx, entMeta)
|
||||||
|
|
||||||
|
// TODO: accept non-pointer value
|
||||||
|
if entMeta == nil {
|
||||||
|
entMeta = structs.DefaultEnterpriseMeta()
|
||||||
|
}
|
||||||
|
|
||||||
// Return the check.
|
// Return the check.
|
||||||
_, check, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, nodeName, string(checkID))
|
check, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -1702,7 +1706,7 @@ func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *stru
|
|||||||
var iter memdb.ResultIterator
|
var iter memdb.ResultIterator
|
||||||
var err error
|
var err error
|
||||||
if state == api.HealthAny {
|
if state == api.HealthAny {
|
||||||
iter, err = catalogListChecks(tx, entMeta)
|
iter, err = tx.Get(tableChecks, indexID+"_prefix", entMeta)
|
||||||
} else {
|
} else {
|
||||||
iter, err = catalogListChecksInState(tx, state, entMeta)
|
iter, err = catalogListChecksInState(tx, state, entMeta)
|
||||||
}
|
}
|
||||||
@ -1800,8 +1804,12 @@ type NodeServiceQuery struct {
|
|||||||
// deleteCheckTxn is the inner method used to call a health
|
// deleteCheckTxn is the inner method used to call a health
|
||||||
// check deletion within an existing transaction.
|
// check deletion within an existing transaction.
|
||||||
func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {
|
func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {
|
||||||
|
if entMeta == nil {
|
||||||
|
entMeta = structs.DefaultEnterpriseMeta()
|
||||||
|
}
|
||||||
|
|
||||||
// Try to retrieve the existing health check.
|
// Try to retrieve the existing health check.
|
||||||
_, hc, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, node, string(checkID))
|
hc, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("check lookup failed: %s", err)
|
return fmt.Errorf("check lookup failed: %s", err)
|
||||||
}
|
}
|
||||||
@ -1816,7 +1824,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, svcRaw, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID})
|
svcRaw, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed retrieving service from state store: %v", err)
|
return fmt.Errorf("failed retrieving service from state store: %v", err)
|
||||||
}
|
}
|
||||||
@ -2149,7 +2157,7 @@ func parseCheckServiceNodes(
|
|||||||
// We need a similar fallback for checks. Since services need the
|
// We need a similar fallback for checks. Since services need the
|
||||||
// status of node + service-specific checks, we pull in a top-level
|
// status of node + service-specific checks, we pull in a top-level
|
||||||
// watch over all checks.
|
// watch over all checks.
|
||||||
allChecks, err := tx.Get("checks", "id")
|
allChecks, err := tx.Get(tableChecks, indexID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed checks lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed checks lookup: %s", err)
|
||||||
}
|
}
|
||||||
@ -2310,7 +2318,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
|
|||||||
allServicesCh := allServices.WatchCh()
|
allServicesCh := allServices.WatchCh()
|
||||||
|
|
||||||
// We need a similar fallback for checks.
|
// We need a similar fallback for checks.
|
||||||
allChecks, err := tx.Get("checks", "id")
|
allChecks, err := tx.Get(tableChecks, indexID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed checks lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed checks lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -103,6 +103,38 @@ func indexFromServiceNode(raw interface{}) ([]byte, error) {
|
|||||||
return b.Bytes(), nil
|
return b.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func indexFromHealthCheck(raw interface{}) ([]byte, error) {
|
||||||
|
hc, ok := raw.(*structs.HealthCheck)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
if hc.Node == "" || hc.CheckID == "" {
|
||||||
|
return nil, errMissingValueForIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
var b indexBuilder
|
||||||
|
b.String(strings.ToLower(hc.Node))
|
||||||
|
b.String(strings.ToLower(string(hc.CheckID)))
|
||||||
|
return b.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func indexFromNodeCheckID(raw interface{}) ([]byte, error) {
|
||||||
|
hc, ok := raw.(NodeCheckQuery)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unexpected type %T for NodeCheckQuery index", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
if hc.Node == "" || hc.CheckID == "" {
|
||||||
|
return nil, errMissingValueForIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
var b indexBuilder
|
||||||
|
b.String(strings.ToLower(hc.Node))
|
||||||
|
b.String(strings.ToLower(hc.CheckID))
|
||||||
|
return b.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
|
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
|
||||||
return fmt.Sprintf("service.%s", name)
|
return fmt.Sprintf("service.%s", name)
|
||||||
}
|
}
|
||||||
@ -243,10 +275,6 @@ func catalogListChecksInState(tx ReadTxn, state string, _ *structs.EnterpriseMet
|
|||||||
return tx.Get("checks", "status", state)
|
return tx.Get("checks", "status", state)
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogListChecks(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
|
||||||
return tx.Get("checks", "id")
|
|
||||||
}
|
|
||||||
|
|
||||||
func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error {
|
func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error {
|
||||||
// Insert the check
|
// Insert the check
|
||||||
if err := tx.Insert("checks", chk); err != nil {
|
if err := tx.Insert("checks", chk); err != nil {
|
||||||
|
@ -2,10 +2,40 @@
|
|||||||
|
|
||||||
package state
|
package state
|
||||||
|
|
||||||
import "github.com/hashicorp/consul/agent/structs"
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
func testIndexerTableChecks() map[string]indexerTestCase {
|
func testIndexerTableChecks() map[string]indexerTestCase {
|
||||||
|
obj := &structs.HealthCheck{
|
||||||
|
Node: "NoDe",
|
||||||
|
ServiceID: "SeRvIcE",
|
||||||
|
CheckID: "CheckID",
|
||||||
|
}
|
||||||
return map[string]indexerTestCase{
|
return map[string]indexerTestCase{
|
||||||
|
indexID: {
|
||||||
|
read: indexValue{
|
||||||
|
source: NodeCheckQuery{
|
||||||
|
Node: "NoDe",
|
||||||
|
CheckID: "CheckId",
|
||||||
|
},
|
||||||
|
expected: []byte("node\x00checkid\x00"),
|
||||||
|
},
|
||||||
|
write: indexValue{
|
||||||
|
source: obj,
|
||||||
|
expected: []byte("node\x00checkid\x00"),
|
||||||
|
},
|
||||||
|
prefix: []indexValue{
|
||||||
|
{
|
||||||
|
source: structs.EnterpriseMeta{},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
source: Query{Value: "nOdE"},
|
||||||
|
expected: []byte("node\x00"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
indexNodeService: {
|
indexNodeService: {
|
||||||
read: indexValue{
|
read: indexValue{
|
||||||
source: NodeServiceQuery{
|
source: NodeServiceQuery{
|
||||||
@ -15,10 +45,7 @@ func testIndexerTableChecks() map[string]indexerTestCase {
|
|||||||
expected: []byte("node\x00service\x00"),
|
expected: []byte("node\x00service\x00"),
|
||||||
},
|
},
|
||||||
write: indexValue{
|
write: indexValue{
|
||||||
source: &structs.HealthCheck{
|
source: obj,
|
||||||
Node: "NoDe",
|
|
||||||
ServiceID: "SeRvIcE",
|
|
||||||
},
|
|
||||||
expected: []byte("node\x00service\x00"),
|
expected: []byte("node\x00service\x00"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -30,10 +57,7 @@ func testIndexerTableChecks() map[string]indexerTestCase {
|
|||||||
expected: []byte("node\x00"),
|
expected: []byte("node\x00"),
|
||||||
},
|
},
|
||||||
write: indexValue{
|
write: indexValue{
|
||||||
source: &structs.HealthCheck{
|
source: obj,
|
||||||
Node: "NoDe",
|
|
||||||
ServiceID: "SeRvIcE",
|
|
||||||
},
|
|
||||||
expected: []byte("node\x00"),
|
expected: []byte("node\x00"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -123,17 +123,10 @@ func checksTableSchema() *memdb.TableSchema {
|
|||||||
Name: indexID,
|
Name: indexID,
|
||||||
AllowMissing: false,
|
AllowMissing: false,
|
||||||
Unique: true,
|
Unique: true,
|
||||||
Indexer: &memdb.CompoundIndex{
|
Indexer: indexerSingleWithPrefix{
|
||||||
Indexes: []memdb.Indexer{
|
readIndex: readIndex(indexFromNodeCheckID),
|
||||||
&memdb.StringFieldIndex{
|
prefixIndex: prefixIndex(prefixIndexFromQuery),
|
||||||
Field: "Node",
|
writeIndex: writeIndex(indexFromHealthCheck),
|
||||||
Lowercase: true,
|
|
||||||
},
|
|
||||||
&memdb.StringFieldIndex{
|
|
||||||
Field: "CheckID",
|
|
||||||
Lowercase: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
indexStatus: {
|
indexStatus: {
|
||||||
@ -331,3 +324,10 @@ type upstreamDownstream struct {
|
|||||||
|
|
||||||
structs.RaftIndex
|
structs.RaftIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeCheckQuery is used to query the ID index of the checks table.
|
||||||
|
type NodeCheckQuery struct {
|
||||||
|
Node string
|
||||||
|
CheckID string
|
||||||
|
structs.EnterpriseMeta
|
||||||
|
}
|
||||||
|
@ -1307,7 +1307,7 @@ func TestStateStore_DeleteNode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Associated health check was removed.
|
// Associated health check was removed.
|
||||||
checks, err := getCompoundWithTxn(tx, "checks", "id", nil, "node1", "check1")
|
checks, err := tx.Get(tableChecks, indexID, NodeCheckQuery{Node: "node1", CheckID: "check1"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
@ -2067,7 +2067,7 @@ func TestStateStore_DeleteService(t *testing.T) {
|
|||||||
// that it actually is removed in the state store.
|
// that it actually is removed in the state store.
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
_, check, err := firstWatchCompoundWithTxn(tx, "checks", "id", nil, "node1", "check1")
|
check, err := tx.First(tableChecks, indexID, NodeCheckQuery{Node: "node1", CheckID: "check1"})
|
||||||
if err != nil || check != nil {
|
if err != nil || check != nil {
|
||||||
t.Fatalf("bad: %#v (err: %s)", check, err)
|
t.Fatalf("bad: %#v (err: %s)", check, err)
|
||||||
}
|
}
|
||||||
|
@ -3,8 +3,9 @@
|
|||||||
package state
|
package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func firstWithTxn(tx ReadTxn,
|
func firstWithTxn(tx ReadTxn,
|
||||||
@ -19,11 +20,6 @@ func firstWatchWithTxn(tx ReadTxn,
|
|||||||
return tx.FirstWatch(table, index, idxVal)
|
return tx.FirstWatch(table, index, idxVal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func firstWatchCompoundWithTxn(tx ReadTxn,
|
|
||||||
table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (<-chan struct{}, interface{}, error) {
|
|
||||||
return tx.FirstWatch(table, index, idxVals...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getWithTxn(tx ReadTxn,
|
func getWithTxn(tx ReadTxn,
|
||||||
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
|
|
||||||
|
@ -5,9 +5,10 @@ package state
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/go-memdb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func sessionIndexer() *memdb.UUIDFieldIndex {
|
func sessionIndexer() *memdb.UUIDFieldIndex {
|
||||||
@ -107,7 +108,7 @@ func sessionMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 {
|
|||||||
func validateSessionChecksTxn(tx *txn, session *structs.Session) error {
|
func validateSessionChecksTxn(tx *txn, session *structs.Session) error {
|
||||||
// Go over the session checks and ensure they exist.
|
// Go over the session checks and ensure they exist.
|
||||||
for _, checkID := range session.CheckIDs() {
|
for _, checkID := range session.CheckIDs() {
|
||||||
check, err := tx.First("checks", "id", session.Node, string(checkID))
|
check, err := tx.First(tableChecks, indexID, NodeCheckQuery{Node: session.Node, CheckID: string(checkID)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed check lookup: %s", err)
|
return fmt.Errorf("failed check lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -105,7 +105,7 @@ func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, s
|
|||||||
|
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
|
service, err := tx.First(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
@ -138,7 +138,7 @@ func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serv
|
|||||||
|
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
_, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
|
service, err := tx.First(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
@ -163,7 +163,7 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64,
|
|||||||
|
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
_, c, err := firstWatchCompoundWithTxn(tx, "checks", "id", nil, nodeID, string(checkID))
|
c, err := tx.First(tableChecks, indexID, NodeCheckQuery{Node: nodeID, CheckID: string(checkID)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -48,7 +48,7 @@ table=autopilot-config
|
|||||||
|
|
||||||
table=checks
|
table=checks
|
||||||
index=id unique
|
index=id unique
|
||||||
indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=CheckID Lowercase=true] AllowMissing=false
|
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingleWithPrefix readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeCheckID writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromHealthCheck prefixIndex=github.com/hashicorp/consul/agent/consul/state.prefixIndexFromQuery
|
||||||
index=node allow-missing
|
index=node allow-missing
|
||||||
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity
|
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity
|
||||||
index=node_service allow-missing
|
index=node_service allow-missing
|
||||||
|
Loading…
x
Reference in New Issue
Block a user