mirror of https://github.com/status-im/consul.git
txn: add service operations
This commit is contained in:
parent
ab58986ac3
commit
7759e9ea8b
|
@ -735,6 +735,36 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService)
|
|||
return nil
|
||||
}
|
||||
|
||||
// ensureServiceCASTxn updates a service only if the existing index matches the given index.
|
||||
// Returns a bool indicating if a write happened and any error.
|
||||
func (s *Store) ensureServiceCASTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) (bool, error) {
|
||||
// Retrieve the existing entry.
|
||||
existing, err := tx.First("nodes", "id", node)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
|
||||
// Check if the we should do the set. A ModifyIndex of 0 means that
|
||||
// we are doing a set-if-not-exists.
|
||||
if svc.ModifyIndex == 0 && existing != nil {
|
||||
return false, nil
|
||||
}
|
||||
if svc.ModifyIndex != 0 && existing == nil {
|
||||
return false, nil
|
||||
}
|
||||
e, ok := existing.(*structs.Node)
|
||||
if ok && svc.ModifyIndex != 0 && svc.ModifyIndex != e.ModifyIndex {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Perform the update.
|
||||
if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// ensureServiceTxn is used to upsert a service registration within an
|
||||
// existing memdb transaction.
|
||||
func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error {
|
||||
|
@ -1111,15 +1141,26 @@ func (s *Store) NodeService(nodeName string, serviceID string) (uint64, *structs
|
|||
idx := maxIndexTxn(tx, "services")
|
||||
|
||||
// Query the service
|
||||
service, err := tx.First("services", "id", nodeName, serviceID)
|
||||
service, err := s.nodeServiceTxn(tx, nodeName, serviceID)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
|
||||
}
|
||||
|
||||
if service != nil {
|
||||
return idx, service.(*structs.ServiceNode).ToNodeService(), nil
|
||||
return idx, service, nil
|
||||
}
|
||||
|
||||
func (s *Store) nodeServiceTxn(tx *memdb.Txn, nodeName, serviceID string) (*structs.NodeService, error) {
|
||||
// Query the service
|
||||
service, err := tx.First("services", "id", nodeName, serviceID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
|
||||
}
|
||||
return idx, nil, nil
|
||||
|
||||
if service != nil {
|
||||
return service.(*structs.ServiceNode).ToNodeService(), nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// NodeServices is used to query service registrations by node name or UUID.
|
||||
|
@ -1214,6 +1255,35 @@ func serviceIndexName(name string) string {
|
|||
return fmt.Sprintf("service.%s", name)
|
||||
}
|
||||
|
||||
// deleteServiceCASTxn is used to try doing a service delete operation with a given
|
||||
// raft index. If the CAS index specified is not equal to the last observed index for
|
||||
// the given service, then the call is a noop, otherwise a normal delete is invoked.
|
||||
func (s *Store) deleteServiceCASTxn(tx *memdb.Txn, idx, cidx uint64, nodeName, serviceID string) (bool, error) {
|
||||
// Look up the service.
|
||||
service, err := tx.First("services", "id", nodeName, serviceID)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("check lookup failed: %s", err)
|
||||
}
|
||||
if service == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// If the existing index does not match the provided CAS
|
||||
// index arg, then we shouldn't update anything and can safely
|
||||
// return early here.
|
||||
existing, ok := service.(*structs.ServiceNode)
|
||||
if !ok || existing.ModifyIndex != cidx {
|
||||
return existing == nil, nil
|
||||
}
|
||||
|
||||
// Call the actual deletion if the above passed.
|
||||
if err := s.deleteServiceTxn(tx, idx, nodeName, serviceID); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// deleteServiceTxn is the inner method called to remove a service
|
||||
// registration within an existing transaction.
|
||||
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
|
||||
|
|
|
@ -175,6 +175,59 @@ func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (struc
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// txnService handles all Service-related operations.
|
||||
func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) {
|
||||
var entry *structs.NodeService
|
||||
var err error
|
||||
|
||||
switch op.Verb {
|
||||
case api.ServiceGet:
|
||||
entry, err = s.nodeServiceTxn(tx, op.Node, op.Service.ID)
|
||||
|
||||
case api.ServiceSet:
|
||||
err = s.ensureServiceTxn(tx, idx, op.Node, &op.Service)
|
||||
|
||||
case api.ServiceCAS:
|
||||
var ok bool
|
||||
ok, err = s.ensureServiceCASTxn(tx, idx, op.Node, &op.Service)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to set service %q on node %q, index is stale", op.Service.ID, op.Node)
|
||||
}
|
||||
|
||||
case api.ServiceDelete:
|
||||
err = s.deleteServiceTxn(tx, idx, op.Node, op.Service.ID)
|
||||
|
||||
case api.ServiceDeleteCAS:
|
||||
var ok bool
|
||||
ok, err = s.deleteServiceCASTxn(tx, idx, op.Service.ModifyIndex, op.Node, op.Service.ID)
|
||||
if !ok && err == nil {
|
||||
err = fmt.Errorf("failed to delete service %q on node %q, index is stale", op.Service.ID, op.Node)
|
||||
}
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown Service verb %q", op.Verb)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// For a GET we keep the value, otherwise we clone and blank out the
|
||||
// value (we have to clone so we don't modify the entry being used by
|
||||
// the state store).
|
||||
if entry != nil {
|
||||
if op.Verb == api.ServiceGet {
|
||||
result := structs.TxnResult{Service: entry}
|
||||
return structs.TxnResults{&result}, nil
|
||||
}
|
||||
|
||||
clone := *entry
|
||||
result := structs.TxnResult{Service: &clone}
|
||||
return structs.TxnResults{&result}, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// txnCheck handles all Check-related operations.
|
||||
func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) {
|
||||
var entry *structs.HealthCheck
|
||||
|
@ -247,6 +300,12 @@ func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (stru
|
|||
ret, err = s.txnKVS(tx, idx, op.KV)
|
||||
case op.Intention != nil:
|
||||
err = s.txnIntention(tx, idx, op.Intention)
|
||||
case op.Node != nil:
|
||||
ret, err = s.txnNode(tx, idx, op.Node)
|
||||
case op.Service != nil:
|
||||
ret, err = s.txnService(tx, idx, op.Service)
|
||||
case op.Check != nil:
|
||||
ret, err = s.txnCheck(tx, idx, op.Check)
|
||||
default:
|
||||
err = fmt.Errorf("no operation specified")
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ type TxnNodeResult *Node
|
|||
// a transaction.
|
||||
type TxnServiceOp struct {
|
||||
Verb api.ServiceOp
|
||||
Node string
|
||||
Service NodeService
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue