consul: Support KVSLock and KVSUnlock

This commit is contained in:
Armon Dadgar 2014-05-15 14:56:58 -07:00
parent 1c484e991d
commit dc1955526c
3 changed files with 216 additions and 35 deletions

View File

@ -23,6 +23,17 @@ const (
dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size
)
// kvMode is used internally to control which type of set
// operation we are performing
type kvMode int
const (
kvSet kvMode = iota
kvCAS
kvLock
kvUnlock
)
// The StateStore is responsible for maintaining all the Consul
// state. It is manipulated by the FSM which maintains consistency
// through the use of Raft. The goals of the StateStore are to provide
@ -919,35 +930,8 @@ func (s *StateStore) parseNodeInfo(tx *MDBTxn, res []interface{}, err error) str
// KVSSet is used to create or update a KV entry
func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error {
// Start a new txn
tx, err := s.kvsTable.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
// Get the existing node
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
if err != nil {
return err
}
// Set the create and modify times
if len(res) == 0 {
d.CreateIndex = index
} else {
d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex
}
d.ModifyIndex = index
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
return err
}
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
return tx.Commit()
_, err := s.kvsSet(index, d, kvSet)
return err
}
// KVSRestore is used to restore a DirEntry. It should only be used when
@ -1075,8 +1059,26 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts .
// KVSCheckAndSet is used to perform an atomic check-and-set
func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) {
return s.kvsSet(index, d, kvCAS)
}
// KVSLock works like KVSSet but only writes if the lock can be acquired
func (s *StateStore) KVSLock(index uint64, d *structs.DirEntry) (bool, error) {
return s.kvsSet(index, d, kvLock)
}
// KVSUnlock works like KVSSet but only writes if the lock can be unlocked
func (s *StateStore) KVSUnlock(index uint64, d *structs.DirEntry) (bool, error) {
return s.kvsSet(index, d, kvUnlock)
}
// kvsSet is the internal setter
func (s *StateStore) kvsSet(
index uint64,
d *structs.DirEntry,
mode kvMode) (bool, error) {
// Start a new txn
tx, err := s.kvsTable.StartTxn(false, nil)
tx, err := s.tables.StartTxn(false)
if err != nil {
return false, err
}
@ -1097,10 +1099,51 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er
// Use the ModifyIndex as the constraint. A modify of time of 0
// means we are doing a set-if-not-exists, while any other value
// means we expect that modify time.
if d.ModifyIndex == 0 && exist != nil {
return false, nil
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
return false, nil
if mode == kvCAS {
if d.ModifyIndex == 0 && exist != nil {
return false, nil
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
return false, nil
}
}
// If attempting to lock, check this is possible
if mode == kvLock {
// Verify we have a session
if d.Session == "" {
return false, fmt.Errorf("Missing session")
}
// Bail if it is already locked
if exist != nil && exist.Session != "" {
return false, nil
}
// Verify the session exists
res, err := s.sessionTable.GetTxn(tx, "id", d.Session)
if err != nil {
return false, err
}
if len(res) == 0 {
return false, fmt.Errorf("Invalid session")
}
// Update the lock index
if exist != nil {
exist.LockIndex++
exist.Session = d.Session
} else {
d.LockIndex = 1
}
}
// If attempting to unlock, verify the key exists and is held
if mode == kvUnlock {
if exist == nil || exist.Session != d.Session {
return false, nil
}
// Clear the session to unlock
exist.Session = ""
}
// Set the create and modify times
@ -1108,6 +1151,9 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er
d.CreateIndex = index
} else {
d.CreateIndex = exist.CreateIndex
d.LockIndex = exist.LockIndex
d.Session = exist.Session
}
d.ModifyIndex = index

View File

@ -1884,3 +1884,134 @@ func TestSessionInvalidate_DeleteNodeService(t *testing.T) {
t.Fatalf("session should be invalidated")
}
}
func TestKVSLock(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{Node: "foo"}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
// Lock with a non-existing keys should work
d := &structs.DirEntry{
Key: "/foo",
Flags: 42,
Value: []byte("test"),
Session: session.ID,
}
ok, err := store.KVSLock(5, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
if d.LockIndex != 1 {
t.Fatalf("bad: %v", d)
}
// Re-locking should fail
ok, err = store.KVSLock(6, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("expected fail")
}
// Set a normal key
k1 := &structs.DirEntry{
Key: "/bar",
Flags: 0,
Value: []byte("asdf"),
}
if err := store.KVSSet(7, k1); err != nil {
t.Fatalf("err: %v", err)
}
// Should acquire the lock
k1.Session = session.ID
ok, err = store.KVSLock(8, k1)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
// Re-acquire should fail
ok, err = store.KVSLock(9, k1)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("expected fail")
}
}
func TestKVSUnlock(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{Node: "foo"}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
// Unlock with a non-existing keys should fail
d := &structs.DirEntry{
Key: "/foo",
Flags: 42,
Value: []byte("test"),
Session: session.ID,
}
ok, err := store.KVSUnlock(5, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("expected fail")
}
// Lock should work
d.Session = session.ID
if ok, _ := store.KVSLock(6, d); !ok {
t.Fatalf("expected lock")
}
// Unlock should work
d.Session = session.ID
ok, err = store.KVSUnlock(7, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
// Re-lock should work
d.Session = session.ID
if ok, err := store.KVSLock(8, d); err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("expected lock")
}
if d.LockIndex != 2 {
t.Fatalf("bad: %v", d)
}
}

View File

@ -275,9 +275,11 @@ type IndexedNodeDump struct {
type DirEntry struct {
CreateIndex uint64
ModifyIndex uint64
LockIndex uint64
Key string
Flags uint64
Value []byte
Session string `json:",omitempty"`
}
type DirEntries []*DirEntry
@ -287,7 +289,9 @@ const (
KVSSet KVSOp = "set"
KVSDelete = "delete"
KVSDeleteTree = "delete-tree"
KVSCAS = "cas" // Check-and-set
KVSCAS = "cas" // Check-and-set
KVSLock = "lock" // Lock a key
KVSUnlock = "unlock" // Unlock a key
)
// KVSRequest is used to operate on the Key-Value store