mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 22:34:55 +00:00
consul/state: add CAS method for kv set
This commit is contained in:
parent
8a70ba2cc5
commit
a0dc2ded8d
@ -848,3 +848,34 @@ func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
|
|||||||
tx.Commit()
|
tx.Commit()
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KVSSetCAS is used to do a check-and-set operation on a KV entry. The
|
||||||
|
// ModifyIndex in the provided entry is used to determine if we should
|
||||||
|
// write the entry to the state store or bail. Returns a bool indicating
|
||||||
|
// if a write happened and any error.
|
||||||
|
func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Retrieve the existing entry
|
||||||
|
existing, err := tx.First("kvs", "id", entry.Key)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed kvs lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the we should do the set. A ModifyIndex of 0 means that
|
||||||
|
// we are doing a set-if-not-exists.
|
||||||
|
if entry.ModifyIndex == 0 && existing != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if entry.ModifyIndex != 0 && existing == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
e, ok := existing.(*structs.DirEntry)
|
||||||
|
if ok && entry.ModifyIndex != 0 && entry.ModifyIndex != e.ModifyIndex {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we made it this far, we should perform the set.
|
||||||
|
return true, s.kvsSetTxn(idx, entry, tx)
|
||||||
|
}
|
||||||
|
@ -986,3 +986,78 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
|
|||||||
t.Fatalf("entry should be deleted")
|
t.Fatalf("entry should be deleted")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateStore_KVSSetCAS(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Doing a CAS with ModifyIndex != 0 and no existing entry
|
||||||
|
// is a no-op.
|
||||||
|
entry := &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("foo"),
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 1,
|
||||||
|
ModifyIndex: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ok, err := s.KVSSetCAS(2, entry)
|
||||||
|
if ok || err != nil {
|
||||||
|
t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that nothing was actually stored
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
if e, err := tx.First("kvs", "id", "foo"); e != nil || err != nil {
|
||||||
|
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", e, err)
|
||||||
|
}
|
||||||
|
tx.Abort()
|
||||||
|
|
||||||
|
// Doing a CAS with a ModifyIndex of zero when no entry exists
|
||||||
|
// performs the set and saves into the state store.
|
||||||
|
entry = &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("foo"),
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 0,
|
||||||
|
ModifyIndex: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ok, err = s.KVSSetCAS(2, entry)
|
||||||
|
if !ok || err != nil {
|
||||||
|
t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entry was inserted
|
||||||
|
tx = s.db.Txn(false)
|
||||||
|
if e, err := tx.First("kvs", "id", "foo"); e == nil || err != nil {
|
||||||
|
t.Fatalf("expected kvs to exist, got: (%#v, %#v)", e, err)
|
||||||
|
}
|
||||||
|
tx.Abort()
|
||||||
|
|
||||||
|
// Doing a CAS with a ModifyIndex which does not match the current
|
||||||
|
// index does not do anything.
|
||||||
|
entry = &structs.DirEntry{
|
||||||
|
Key: "foo",
|
||||||
|
Value: []byte("bar"),
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 3,
|
||||||
|
ModifyIndex: 3,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ok, err = s.KVSSetCAS(3, entry)
|
||||||
|
if ok || err != nil {
|
||||||
|
t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entry was not updated in the store
|
||||||
|
tx = s.db.Txn(false)
|
||||||
|
e, err := tx.First("kvs", "id", "foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
result, ok := e.(*structs.DirEntry)
|
||||||
|
if !ok || result.CreateIndex != 2 ||
|
||||||
|
result.ModifyIndex != 2 || string(result.Value) != "foo" {
|
||||||
|
t.Fatalf("bad: %#v", result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user