mirror of https://github.com/status-im/consul.git
consul: Implementing Check-And-Set
This commit is contained in:
parent
f87c47424c
commit
8d7c5fc9cd
|
@ -782,8 +782,51 @@ func (s *StateStore) KVSDeleteTree() error {
|
|||
}
|
||||
|
||||
// KVSCheckAndSet is used to perform an atomic check-and-set
|
||||
func (s *StateStore) KVSCheckAndSet() error {
|
||||
return nil
|
||||
func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) {
|
||||
// Start a new txn
|
||||
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the existing node
|
||||
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Get the existing node if any
|
||||
var exist *structs.DirEntry
|
||||
if len(res) > 0 {
|
||||
exist = res[0].(*structs.DirEntry)
|
||||
}
|
||||
|
||||
// Use the ModifyIndex as the constraint. A modify of time of 0
|
||||
// means we are doing a set-if-not-exists, while any other value
|
||||
// means we expect that modify time.
|
||||
if d.ModifyIndex == 0 && exist != nil {
|
||||
return false, nil
|
||||
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Set the create and modify times
|
||||
if exist == nil {
|
||||
d.CreateIndex = index
|
||||
} else {
|
||||
d.CreateIndex = exist.CreateIndex
|
||||
}
|
||||
d.ModifyIndex = index
|
||||
|
||||
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer s.watch[s.kvsTable].Notify()
|
||||
return true, tx.Commit()
|
||||
}
|
||||
|
||||
// Snapshot is used to create a point in time snapshot
|
||||
|
|
|
@ -1044,3 +1044,66 @@ func TestKVSDelete(t *testing.T) {
|
|||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSCheckAndSet(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// CAS should fail, no entry
|
||||
d := &structs.DirEntry{
|
||||
ModifyIndex: 100,
|
||||
Key: "/foo",
|
||||
Flags: 42,
|
||||
Value: []byte("test"),
|
||||
}
|
||||
ok, err := store.KVSCheckAndSet(1000, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("unexpected commit")
|
||||
}
|
||||
|
||||
// Constrain on not-exist, should work
|
||||
d.ModifyIndex = 0
|
||||
ok, err = store.KVSCheckAndSet(1001, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("expected commit")
|
||||
}
|
||||
|
||||
// Constrain on not-exist, should fail
|
||||
d.ModifyIndex = 0
|
||||
ok, err = store.KVSCheckAndSet(1002, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("unexpected commit")
|
||||
}
|
||||
|
||||
// Constrain on a wrong modify time
|
||||
d.ModifyIndex = 1000
|
||||
ok, err = store.KVSCheckAndSet(1003, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("unexpected commit")
|
||||
}
|
||||
|
||||
// Constrain on a correct modify time
|
||||
d.ModifyIndex = 1001
|
||||
ok, err = store.KVSCheckAndSet(1004, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("expected commit")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue