Splits existing KVS operations into *Txn helpers for later reuse.

This commit is contained in:
James Phillips 2016-05-04 14:20:11 -07:00
parent 9edca28203
commit 6c2aeb25ab
1 changed files with 65 additions and 10 deletions

View File

@ -123,6 +123,12 @@ func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
return s.kvsGetTxn(tx, key)
}
// kvsGetTxn is the inner method that gets a KVS entry inside an existing
// transaction.
func (s *StateStore) kvsGetTxn(tx *memdb.Txn, key string) (uint64, *structs.DirEntry, error) {
// Get the table index. // Get the table index.
idx := maxIndexTxn(tx, "kvs", "tombstones") idx := maxIndexTxn(tx, "kvs", "tombstones")
@ -313,6 +319,18 @@ func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
set, err := s.kvsDeleteCASTxn(tx, idx, cidx, key)
if !set || err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing
// transaction.
func (s *StateStore) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bool, error) {
// Retrieve the existing kvs entry, if any exists. // Retrieve the existing kvs entry, if any exists.
entry, err := tx.First("kvs", "id", key) entry, err := tx.First("kvs", "id", key)
if err != nil { if err != nil {
@ -331,8 +349,6 @@ func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
if err := s.kvsDeleteTxn(tx, idx, key); err != nil { if err := s.kvsDeleteTxn(tx, idx, key); err != nil {
return false, err return false, err
} }
tx.Commit()
return true, nil return true, nil
} }
@ -344,6 +360,18 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
set, err := s.kvsSetCASTxn(tx, idx, entry)
if !set || err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// kvsSetCASTxn is the inner method used to do a CAS inside an existing
// transaction.
func (s *StateStore) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Retrieve the existing entry. // Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key) existing, err := tx.First("kvs", "id", entry.Key)
if err != nil { if err != nil {
@ -367,8 +395,6 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error
if err := s.kvsSetTxn(tx, idx, entry, false); err != nil { if err := s.kvsSetTxn(tx, idx, entry, false); err != nil {
return false, err return false, err
} }
tx.Commit()
return true, nil return true, nil
} }
@ -379,6 +405,17 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
if err := s.kvsDeleteTreeTxn(tx, idx, prefix); err != nil {
return err
}
tx.Commit()
return nil
}
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
// existing transaction.
func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error {
// Get an iterator over all of the keys with the given prefix. // Get an iterator over all of the keys with the given prefix.
entries, err := tx.Get("kvs", "id_prefix", prefix) entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil { if err != nil {
@ -414,8 +451,6 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
} }
tx.Commit()
return nil return nil
} }
@ -431,6 +466,18 @@ func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error)
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
locked, err := s.kvsLockTxn(tx, idx, entry)
if !locked || err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// kvsLockTxn is the inner method that does a lock inside an existing
// transaction.
func (s *StateStore) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present. // Verify that a session is present.
if entry.Session == "" { if entry.Session == "" {
return false, fmt.Errorf("missing session") return false, fmt.Errorf("missing session")
@ -476,8 +523,6 @@ func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error)
if err := s.kvsSetTxn(tx, idx, entry, true); err != nil { if err := s.kvsSetTxn(tx, idx, entry, true); err != nil {
return false, err return false, err
} }
tx.Commit()
return true, nil return true, nil
} }
@ -487,6 +532,18 @@ func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
unlocked, err := s.kvsUnlockTxn(tx, idx, entry)
if !unlocked || err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// kvsUnlockTxn is the inner method that does an unlock inside an existing
// transaction.
func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present. // Verify that a session is present.
if entry.Session == "" { if entry.Session == "" {
return false, fmt.Errorf("missing session") return false, fmt.Errorf("missing session")
@ -519,7 +576,5 @@ func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error
if err := s.kvsSetTxn(tx, idx, entry, true); err != nil { if err := s.kvsSetTxn(tx, idx, entry, true); err != nil {
return false, err return false, err
} }
tx.Commit()
return true, nil return true, nil
} }