From ab58986ac31f4d21477f0b9a3131bb7b2423c3f0 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Sun, 2 Dec 2018 23:51:18 -0800 Subject: [PATCH] txn: add node operations --- agent/consul/state/catalog.go | 59 +++++++++++++++++++++++++++++++++++ agent/consul/state/txn.go | 53 +++++++++++++++++++++++++++++++ agent/structs/txn.go | 30 ++++++++++++++++-- api/txn.go | 22 +++++++++++++ 4 files changed, 162 insertions(+), 2 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 89d21a267d..66e034366e 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -373,6 +373,36 @@ func (s *Store) ensureNoNodeWithSimilarNameTxn(tx *memdb.Txn, node *structs.Node return nil } +// ensureNodeCASTxn updates a node only if the existing index matches the given index. +// Returns a bool indicating if a write happened and any error. +func (s *Store) ensureNodeCASTxn(tx *memdb.Txn, idx uint64, node *structs.Node) (bool, error) { + // Retrieve the existing entry. + existing, err := tx.First("nodes", "id", node.Node) + if err != nil { + return false, fmt.Errorf("node lookup failed: %s", err) + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + if node.ModifyIndex == 0 && existing != nil { + return false, nil + } + if node.ModifyIndex != 0 && existing == nil { + return false, nil + } + e, ok := existing.(*structs.Node) + if ok && node.ModifyIndex != 0 && node.ModifyIndex != e.ModifyIndex { + return false, nil + } + + // Perform the update. + if err := s.ensureNodeTxn(tx, idx, node); err != nil { + return false, err + } + + return true, nil +} + // ensureNodeTxn is the inner function called to actually create a node // registration or modify an existing one in the state store. It allows // passing in a memdb transaction so it may be part of a larger txn. @@ -569,6 +599,35 @@ func (s *Store) DeleteNode(idx uint64, nodeName string) error { return nil } +// deleteNodeCASTxn is used to try doing a node delete operation with a given +// raft index. If the CAS index specified is not equal to the last observed index for +// the given check, then the call is a noop, otherwise a normal check delete is invoked. +func (s *Store) deleteNodeCASTxn(tx *memdb.Txn, idx, cidx uint64, nodeName string) (bool, error) { + // Look up the node. + node, err := tx.First("nodes", "id", nodeName) + if err != nil { + return false, fmt.Errorf("check lookup failed: %s", err) + } + if node == nil { + return false, nil + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + existing, ok := node.(*structs.Node) + if !ok || existing.ModifyIndex != cidx { + return existing == nil, nil + } + + // Call the actual deletion if the above passed. + if err := s.deleteNodeTxn(tx, idx, nodeName); err != nil { + return false, err + } + + return true, nil +} + // deleteNodeTxn is the inner method used for removing a node from // the store within a given transaction. func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error { diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index 61bcf042dd..082271cff1 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -122,6 +122,59 @@ func (s *Store) txnIntention(tx *memdb.Txn, idx uint64, op *structs.TxnIntention } } +// txnNode handles all Node-related operations. +func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) { + var entry *structs.Node + var err error + + switch op.Verb { + case api.NodeGet: + entry, err = getNodeIDTxn(tx, op.Node.ID) + + case api.NodeSet: + err = s.ensureNodeTxn(tx, idx, &op.Node) + + case api.NodeCAS: + var ok bool + ok, err = s.ensureNodeCASTxn(tx, idx, &op.Node) + if !ok && err == nil { + err = fmt.Errorf("failed to set node %q, index is stale", op.Node.Node) + } + + case api.NodeDelete: + err = s.deleteNodeTxn(tx, idx, op.Node.Node) + + case api.NodeDeleteCAS: + var ok bool + ok, err = s.deleteNodeCASTxn(tx, idx, op.Node.ModifyIndex, op.Node.Node) + if !ok && err == nil { + err = fmt.Errorf("failed to delete node %q, index is stale", op.Node.Node) + } + + default: + err = fmt.Errorf("unknown Node verb %q", op.Verb) + } + if err != nil { + return nil, err + } + + // For a GET we keep the value, otherwise we clone and blank out the + // value (we have to clone so we don't modify the entry being used by + // the state store). + if entry != nil { + if op.Verb == api.NodeGet { + result := structs.TxnResult{Node: entry} + return structs.TxnResults{&result}, nil + } + + clone := *entry + result := structs.TxnResult{Node: &clone} + return structs.TxnResults{&result}, nil + } + + return nil, nil +} + // txnCheck handles all Check-related operations. func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) { var entry *structs.HealthCheck diff --git a/agent/structs/txn.go b/agent/structs/txn.go index cdd97b05a7..12453bb3ad 100644 --- a/agent/structs/txn.go +++ b/agent/structs/txn.go @@ -19,6 +19,28 @@ type TxnKVOp struct { // inside a transaction. type TxnKVResult *DirEntry +// TxnNodeOp is used to define a single operation on a node in the catalog inside +// a transaction. +type TxnNodeOp struct { + Verb api.NodeOp + Node Node +} + +// TxnNodeResult is used to define the result of a single operation on a node +// in the catalog inside a transaction. +type TxnNodeResult *Node + +// TxnServiceOp is used to define a single operation on a service in the catalog inside +// a transaction. +type TxnServiceOp struct { + Verb api.ServiceOp + Service NodeService +} + +// TxnServiceResult is used to define the result of a single operation on a service +// in the catalog inside a transaction. +type TxnServiceResult *NodeService + // TxnCheckOp is used to define a single operation on a health check inside a // transaction. type TxnCheckOp struct { @@ -39,6 +61,8 @@ type TxnIntentionOp IntentionRequest type TxnOp struct { KV *TxnKVOp Intention *TxnIntentionOp + Node *TxnNodeOp + Service *TxnServiceOp Check *TxnCheckOp } @@ -87,8 +111,10 @@ type TxnErrors []*TxnError // TxnResult is used to define the result of a given operation inside a // transaction. Only one of the types should be filled out per entry. type TxnResult struct { - KV TxnKVResult - Check TxnCheckResult + KV TxnKVResult + Node TxnNodeResult + Service TxnServiceResult + Check TxnCheckResult } // TxnResults is a list of TxnResult entries. diff --git a/api/txn.go b/api/txn.go index 017b726723..59864045e6 100644 --- a/api/txn.go +++ b/api/txn.go @@ -89,6 +89,28 @@ type KVTxnResponse struct { Errors TxnErrors } +// NodeOp constants give possible operations available in a transaction. +type NodeOp string + +const ( + NodeGet NodeOp = "get" + NodeSet NodeOp = "set" + NodeCAS NodeOp = "cas" + NodeDelete NodeOp = "delete" + NodeDeleteCAS NodeOp = "delete-cas" +) + +// ServiceOp constants give possible operations available in a transaction. +type ServiceOp string + +const ( + ServiceGet ServiceOp = "get" + ServiceSet ServiceOp = "set" + ServiceCAS ServiceOp = "cas" + ServiceDelete ServiceOp = "delete" + ServiceDeleteCAS ServiceOp = "delete-cas" +) + // CheckOp constants give possible operations available in a transaction. type CheckOp string