diff --git a/consul/state/kvs.go b/consul/state/kvs.go new file mode 100644 index 0000000000..577dbb89b6 --- /dev/null +++ b/consul/state/kvs.go @@ -0,0 +1,525 @@ +package state + +import ( + "fmt" + "strings" + "time" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/go-memdb" +) + +// KVs is used to pull the full list of KVS entries for use during snapshots. +func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) { + iter, err := s.tx.Get("kvs", "id_prefix") + if err != nil { + return nil, err + } + return iter, nil +} + +// Tombstones is used to pull all the tombstones from the graveyard. +func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) { + return s.store.kvsGraveyard.DumpTxn(s.tx) +} + +// KVS is used when restoring from a snapshot. Use KVSSet for general inserts. +func (s *StateRestore) KVS(entry *structs.DirEntry) error { + if err := s.tx.Insert("kvs", entry); err != nil { + return fmt.Errorf("failed inserting kvs entry: %s", err) + } + + if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + // We have a single top-level KVS watch trigger instead of doing + // tons of prefix watches. + return nil +} + +// Tombstone is used when restoring from a snapshot. For general inserts, use +// Graveyard.InsertTxn. +func (s *StateRestore) Tombstone(stone *Tombstone) error { + if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil { + return fmt.Errorf("failed restoring tombstone: %s", err) + } + return nil +} + +// ReapTombstones is used to delete all the tombstones with an index +// less than or equal to the given index. This is used to prevent +// unbounded storage growth of the tombstones. +func (s *StateStore) ReapTombstones(index uint64) error { + tx := s.db.Txn(true) + defer tx.Abort() + + if err := s.kvsGraveyard.ReapTxn(tx, index); err != nil { + return fmt.Errorf("failed to reap kvs tombstones: %s", err) + } + + tx.Commit() + return nil +} + +// KVSSet is used to store a key/value pair. +func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Perform the actual set. + if err := s.kvsSetTxn(tx, idx, entry, false); err != nil { + return err + } + + tx.Commit() + return nil +} + +// kvsSetTxn is used to insert or update a key/value pair in the state +// store. It is the inner method used and handles only the actual storage. +// If updateSession is true, then the incoming entry will set the new +// session (should be validated before calling this). Otherwise, we will keep +// whatever the existing session is. +func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error { + // Retrieve an existing KV pair + existing, err := tx.First("kvs", "id", entry.Key) + if err != nil { + return fmt.Errorf("failed kvs lookup: %s", err) + } + + // Set the indexes. + if existing != nil { + entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex + } else { + entry.CreateIndex = idx + } + entry.ModifyIndex = idx + + // Preserve the existing session unless told otherwise. The "existing" + // session for a new entry is "no session". + if !updateSession { + if existing != nil { + entry.Session = existing.(*structs.DirEntry).Session + } else { + entry.Session = "" + } + } + + // Store the kv pair in the state store and update the index. + if err := tx.Insert("kvs", entry); err != nil { + return fmt.Errorf("failed inserting kvs entry: %s", err) + } + if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + tx.Defer(func() { s.kvsWatch.Notify(entry.Key, false) }) + return nil +} + +// KVSGet is used to retrieve a key/value pair from the state store. +func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, "kvs", "tombstones") + + // Retrieve the key. + entry, err := tx.First("kvs", "id", key) + if err != nil { + return 0, nil, fmt.Errorf("failed kvs lookup: %s", err) + } + if entry != nil { + return idx, entry.(*structs.DirEntry), nil + } + return idx, nil, nil +} + +// KVSList is used to list out all keys under a given prefix. If the +// prefix is left empty, all keys in the KVS will be returned. The returned +// is the max index of the returned kvs entries or applicable tombstones, or +// else it's the full table indexes for kvs and tombstones. +func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table indexes. + idx := maxIndexTxn(tx, "kvs", "tombstones") + + // Query the prefix and list the available keys + entries, err := tx.Get("kvs", "id_prefix", prefix) + if err != nil { + return 0, nil, fmt.Errorf("failed kvs lookup: %s", err) + } + + // Gather all of the keys found in the store + var ents structs.DirEntries + var lindex uint64 + for entry := entries.Next(); entry != nil; entry = entries.Next() { + e := entry.(*structs.DirEntry) + ents = append(ents, e) + if e.ModifyIndex > lindex { + lindex = e.ModifyIndex + } + } + + // Check for the highest index in the graveyard. If the prefix is empty + // then just use the full table indexes since we are listing everything. + if prefix != "" { + gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix) + if err != nil { + return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err) + } + if gindex > lindex { + lindex = gindex + } + } else { + lindex = idx + } + + // Use the sub index if it was set and there are entries, otherwise use + // the full table index from above. + if lindex != 0 { + idx = lindex + } + return idx, ents, nil +} + +// KVSListKeys is used to query the KV store for keys matching the given prefix. +// An optional separator may be specified, which can be used to slice off a part +// of the response so that only a subset of the prefix is returned. In this +// mode, the keys which are omitted are still counted in the returned index. +func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table indexes. + idx := maxIndexTxn(tx, "kvs", "tombstones") + + // Fetch keys using the specified prefix + entries, err := tx.Get("kvs", "id_prefix", prefix) + if err != nil { + return 0, nil, fmt.Errorf("failed kvs lookup: %s", err) + } + + prefixLen := len(prefix) + sepLen := len(sep) + + var keys []string + var lindex uint64 + var last string + for entry := entries.Next(); entry != nil; entry = entries.Next() { + e := entry.(*structs.DirEntry) + + // Accumulate the high index + if e.ModifyIndex > lindex { + lindex = e.ModifyIndex + } + + // Always accumulate if no separator provided + if sepLen == 0 { + keys = append(keys, e.Key) + continue + } + + // Parse and de-duplicate the returned keys based on the + // key separator, if provided. + after := e.Key[prefixLen:] + sepIdx := strings.Index(after, sep) + if sepIdx > -1 { + key := e.Key[:prefixLen+sepIdx+sepLen] + if key != last { + keys = append(keys, key) + last = key + } + } else { + keys = append(keys, e.Key) + } + } + + // Check for the highest index in the graveyard. If the prefix is empty + // then just use the full table indexes since we are listing everything. + if prefix != "" { + gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix) + if err != nil { + return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err) + } + if gindex > lindex { + lindex = gindex + } + } else { + lindex = idx + } + + // Use the sub index if it was set and there are entries, otherwise use + // the full table index from above. + if lindex != 0 { + idx = lindex + } + return idx, keys, nil +} + +// KVSDelete is used to perform a shallow delete on a single key in the +// the state store. +func (s *StateStore) KVSDelete(idx uint64, key string) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Perform the actual delete + if err := s.kvsDeleteTxn(tx, idx, key); err != nil { + return err + } + + tx.Commit() + return nil +} + +// kvsDeleteTxn is the inner method used to perform the actual deletion +// of a key/value pair within an existing transaction. +func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error { + // Look up the entry in the state store. + entry, err := tx.First("kvs", "id", key) + if err != nil { + return fmt.Errorf("failed kvs lookup: %s", err) + } + if entry == nil { + return nil + } + + // Create a tombstone. + if err := s.kvsGraveyard.InsertTxn(tx, key, idx); err != nil { + return fmt.Errorf("failed adding to graveyard: %s", err) + } + + // Delete the entry and update the index. + if err := tx.Delete("kvs", entry); err != nil { + return fmt.Errorf("failed deleting kvs entry: %s", err) + } + if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + tx.Defer(func() { s.kvsWatch.Notify(key, false) }) + return nil +} + +// KVSDeleteCAS is used to try doing a KV delete operation with a given +// raft index. If the CAS index specified is not equal to the last +// observed index for the given key, then the call is a noop, otherwise +// a normal KV delete is invoked. +func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Retrieve the existing kvs entry, if any exists. + entry, err := tx.First("kvs", "id", key) + if err != nil { + return false, fmt.Errorf("failed kvs lookup: %s", err) + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + e, ok := entry.(*structs.DirEntry) + if !ok || e.ModifyIndex != cidx { + return entry == nil, nil + } + + // Call the actual deletion if the above passed. + if err := s.kvsDeleteTxn(tx, idx, key); err != nil { + return false, err + } + + tx.Commit() + return true, nil +} + +// KVSSetCAS is used to do a check-and-set operation on a KV entry. The +// ModifyIndex in the provided entry is used to determine if we should +// write the entry to the state store or bail. Returns a bool indicating +// if a write happened and any error. +func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Retrieve the existing entry. + existing, err := tx.First("kvs", "id", entry.Key) + if err != nil { + return false, fmt.Errorf("failed kvs lookup: %s", err) + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + if entry.ModifyIndex == 0 && existing != nil { + return false, nil + } + if entry.ModifyIndex != 0 && existing == nil { + return false, nil + } + e, ok := existing.(*structs.DirEntry) + if ok && entry.ModifyIndex != 0 && entry.ModifyIndex != e.ModifyIndex { + return false, nil + } + + // If we made it this far, we should perform the set. + if err := s.kvsSetTxn(tx, idx, entry, false); err != nil { + return false, err + } + + tx.Commit() + return true, nil +} + +// KVSDeleteTree is used to do a recursive delete on a key prefix +// in the state store. If any keys are modified, the last index is +// set, otherwise this is a no-op. +func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Get an iterator over all of the keys with the given prefix. + entries, err := tx.Get("kvs", "id_prefix", prefix) + if err != nil { + return fmt.Errorf("failed kvs lookup: %s", err) + } + + // Go over all of the keys and remove them. We call the delete + // directly so that we only update the index once. We also add + // tombstones as we go. + var modified bool + var objs []interface{} + for entry := entries.Next(); entry != nil; entry = entries.Next() { + e := entry.(*structs.DirEntry) + if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil { + return fmt.Errorf("failed adding to graveyard: %s", err) + } + objs = append(objs, entry) + modified = true + } + + // Do the actual deletes in a separate loop so we don't trash the + // iterator as we go. + for _, obj := range objs { + if err := tx.Delete("kvs", obj); err != nil { + return fmt.Errorf("failed deleting kvs entry: %s", err) + } + } + + // Update the index + if modified { + tx.Defer(func() { s.kvsWatch.Notify(prefix, true) }) + if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } + + tx.Commit() + return nil +} + +// KVSLockDelay returns the expiration time for any lock delay associated with +// the given key. +func (s *StateStore) KVSLockDelay(key string) time.Time { + return s.lockDelay.GetExpiration(key) +} + +// KVSLock is similar to KVSSet but only performs the set if the lock can be +// acquired. +func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Verify that a session is present. + if entry.Session == "" { + return false, fmt.Errorf("missing session") + } + + // Verify that the session exists. + sess, err := tx.First("sessions", "id", entry.Session) + if err != nil { + return false, fmt.Errorf("failed session lookup: %s", err) + } + if sess == nil { + return false, fmt.Errorf("invalid session %#v", entry.Session) + } + + // Retrieve the existing entry. + existing, err := tx.First("kvs", "id", entry.Key) + if err != nil { + return false, fmt.Errorf("failed kvs lookup: %s", err) + } + + // Set up the entry, using the existing entry if present. + if existing != nil { + e := existing.(*structs.DirEntry) + if e.Session == entry.Session { + // We already hold this lock, good to go. + entry.CreateIndex = e.CreateIndex + entry.LockIndex = e.LockIndex + } else if e.Session != "" { + // Bail out, someone else holds this lock. + return false, nil + } else { + // Set up a new lock with this session. + entry.CreateIndex = e.CreateIndex + entry.LockIndex = e.LockIndex + 1 + } + } else { + entry.CreateIndex = idx + entry.LockIndex = 1 + } + entry.ModifyIndex = idx + + // If we made it this far, we should perform the set. + if err := s.kvsSetTxn(tx, idx, entry, true); err != nil { + return false, err + } + + tx.Commit() + return true, nil +} + +// KVSUnlock is similar to KVSSet but only performs the set if the lock can be +// unlocked (the key must already exist and be locked). +func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Verify that a session is present. + if entry.Session == "" { + return false, fmt.Errorf("missing session") + } + + // Retrieve the existing entry. + existing, err := tx.First("kvs", "id", entry.Key) + if err != nil { + return false, fmt.Errorf("failed kvs lookup: %s", err) + } + + // Bail if there's no existing key. + if existing == nil { + return false, nil + } + + // Make sure the given session is the lock holder. + e := existing.(*structs.DirEntry) + if e.Session != entry.Session { + return false, nil + } + + // Clear the lock and update the entry. + entry.Session = "" + entry.LockIndex = e.LockIndex + entry.CreateIndex = e.CreateIndex + entry.ModifyIndex = idx + + // If we made it this far, we should perform the set. + if err := s.kvsSetTxn(tx, idx, entry, true); err != nil { + return false, err + } + + tx.Commit() + return true, nil +} diff --git a/consul/state/kvs_test.go b/consul/state/kvs_test.go new file mode 100644 index 0000000000..7da1fec89a --- /dev/null +++ b/consul/state/kvs_test.go @@ -0,0 +1,1540 @@ +package state + +import ( + "reflect" + "strings" + "testing" + "time" + + "github.com/hashicorp/consul/consul/structs" +) + +func TestStateStore_GC(t *testing.T) { + // Build up a fast GC. + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("err: %s", err) + } + + // Enable it and attach it to the state store. + gc.SetEnabled(true) + s, err := NewStateStore(gc) + if err != nil { + t.Fatalf("err: %s", err) + } + + // Create some KV pairs. + testSetKey(t, s, 1, "foo", "foo") + testSetKey(t, s, 2, "foo/bar", "bar") + testSetKey(t, s, 3, "foo/baz", "bar") + testSetKey(t, s, 4, "foo/moo", "bar") + testSetKey(t, s, 5, "foo/zoo", "bar") + + // Delete a key and make sure the GC sees it. + if err := s.KVSDelete(6, "foo/zoo"); err != nil { + t.Fatalf("err: %s", err) + } + select { + case idx := <-gc.ExpireCh(): + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + case <-time.After(2 * ttl): + t.Fatalf("GC never fired") + } + + // Check for the same behavior with a tree delete. + if err := s.KVSDeleteTree(7, "foo/moo"); err != nil { + t.Fatalf("err: %s", err) + } + select { + case idx := <-gc.ExpireCh(): + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + case <-time.After(2 * ttl): + t.Fatalf("GC never fired") + } + + // Check for the same behavior with a CAS delete. + if ok, err := s.KVSDeleteCAS(8, 3, "foo/baz"); !ok || err != nil { + t.Fatalf("err: %s", err) + } + select { + case idx := <-gc.ExpireCh(): + if idx != 8 { + t.Fatalf("bad index: %d", idx) + } + case <-time.After(2 * ttl): + t.Fatalf("GC never fired") + } + + // Finally, try it with an expiring session. + testRegisterNode(t, s, 9, "node1") + session := &structs.Session{ + ID: testUUID(), + Node: "node1", + Behavior: structs.SessionKeysDelete, + } + if err := s.SessionCreate(10, session); err != nil { + t.Fatalf("err: %s", err) + } + d := &structs.DirEntry{ + Key: "lock", + Session: session.ID, + } + if ok, err := s.KVSLock(11, d); !ok || err != nil { + t.Fatalf("err: %v", err) + } + if err := s.SessionDestroy(12, session.ID); err != nil { + t.Fatalf("err: %s", err) + } + select { + case idx := <-gc.ExpireCh(): + if idx != 12 { + t.Fatalf("bad index: %d", idx) + } + case <-time.After(2 * ttl): + t.Fatalf("GC never fired") + } +} + +func TestStateStore_ReapTombstones(t *testing.T) { + s := testStateStore(t) + + // Create some KV pairs. + testSetKey(t, s, 1, "foo", "foo") + testSetKey(t, s, 2, "foo/bar", "bar") + testSetKey(t, s, 3, "foo/baz", "bar") + testSetKey(t, s, 4, "foo/moo", "bar") + testSetKey(t, s, 5, "foo/zoo", "bar") + + // Call a delete on some specific keys. + if err := s.KVSDelete(6, "foo/baz"); err != nil { + t.Fatalf("err: %s", err) + } + if err := s.KVSDelete(7, "foo/moo"); err != nil { + t.Fatalf("err: %s", err) + } + + // Pull out the list and check the index, which should come from the + // tombstones. + idx, _, err := s.KVSList("foo/") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + + // Reap the tombstones <= 6. + if err := s.ReapTombstones(6); err != nil { + t.Fatalf("err: %s", err) + } + + // Should still be good because 7 is in there. + idx, _, err = s.KVSList("foo/") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + + // Now reap them all. + if err := s.ReapTombstones(7); err != nil { + t.Fatalf("err: %s", err) + } + + // At this point the sub index will slide backwards. + idx, _, err = s.KVSList("foo/") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 5 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure the tombstones are actually gone. + snap := s.Snapshot() + defer snap.Close() + stones, err := snap.Tombstones() + if err != nil { + t.Fatalf("err: %s", err) + } + if stones.Next() != nil { + t.Fatalf("unexpected extra tombstones") + } +} + +func TestStateStore_KVSSet_KVSGet(t *testing.T) { + s := testStateStore(t) + + // Get on an nonexistent key returns nil. + idx, result, err := s.KVSGet("foo") + if result != nil || err != nil || idx != 0 { + t.Fatalf("expected (0, nil, nil), got : (%#v, %#v, %#v)", idx, result, err) + } + + // Write a new K/V entry to the store. + entry := &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + } + if err := s.KVSSet(1, entry); err != nil { + t.Fatalf("err: %s", err) + } + + // Retrieve the K/V entry again. + idx, result, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result == nil { + t.Fatalf("expected k/v pair, got nothing") + } + if idx != 1 { + t.Fatalf("bad index: %d", idx) + } + + // Check that the index was injected into the result. + if result.CreateIndex != 1 || result.ModifyIndex != 1 { + t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex) + } + + // Check that the value matches. + if v := string(result.Value); v != "bar" { + t.Fatalf("expected 'bar', got: '%s'", v) + } + + // Updating the entry works and changes the index. + update := &structs.DirEntry{ + Key: "foo", + Value: []byte("baz"), + } + if err := s.KVSSet(2, update); err != nil { + t.Fatalf("err: %s", err) + } + + // Fetch the kv pair and check. + idx, result, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.CreateIndex != 1 || result.ModifyIndex != 2 { + t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex) + } + if v := string(result.Value); v != "baz" { + t.Fatalf("expected 'baz', got '%s'", v) + } + if idx != 2 { + t.Fatalf("bad index: %d", idx) + } + + // Attempt to set the session during an update. + update = &structs.DirEntry{ + Key: "foo", + Value: []byte("zoo"), + Session: "nope", + } + if err := s.KVSSet(3, update); err != nil { + t.Fatalf("err: %s", err) + } + + // Fetch the kv pair and check. + idx, result, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.CreateIndex != 1 || result.ModifyIndex != 3 { + t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex) + } + if v := string(result.Value); v != "zoo" { + t.Fatalf("expected 'zoo', got '%s'", v) + } + if result.Session != "" { + t.Fatalf("expected empty session, got '%s", result.Session) + } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } + + // Make a real session and then lock the key to set the session. + testRegisterNode(t, s, 4, "node1") + session := testUUID() + if err := s.SessionCreate(5, &structs.Session{ID: session, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + update = &structs.DirEntry{ + Key: "foo", + Value: []byte("locked"), + Session: session, + } + ok, err := s.KVSLock(6, update) + if !ok || err != nil { + t.Fatalf("didn't get the lock: %v %s", ok, err) + } + + // Fetch the kv pair and check. + idx, result, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.CreateIndex != 1 || result.ModifyIndex != 6 { + t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex) + } + if v := string(result.Value); v != "locked" { + t.Fatalf("expected 'zoo', got '%s'", v) + } + if result.Session != session { + t.Fatalf("expected session, got '%s", result.Session) + } + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + + // Now make an update without the session and make sure it gets applied + // and doesn't take away the session (it is allowed to change the value). + update = &structs.DirEntry{ + Key: "foo", + Value: []byte("stoleit"), + } + if err := s.KVSSet(7, update); err != nil { + t.Fatalf("err: %s", err) + } + + // Fetch the kv pair and check. + idx, result, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.CreateIndex != 1 || result.ModifyIndex != 7 { + t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex) + } + if v := string(result.Value); v != "stoleit" { + t.Fatalf("expected 'zoo', got '%s'", v) + } + if result.Session != session { + t.Fatalf("expected session, got '%s", result.Session) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + + // Fetch a key that doesn't exist and make sure we get the right + // response. + idx, result, err = s.KVSGet("nope") + if result != nil || err != nil || idx != 7 { + t.Fatalf("expected (7, nil, nil), got : (%#v, %#v, %#v)", idx, result, err) + } +} + +func TestStateStore_KVSList(t *testing.T) { + s := testStateStore(t) + + // Listing an empty KVS returns nothing + idx, entries, err := s.KVSList("") + if idx != 0 || entries != nil || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, entries, err) + } + + // Create some KVS entries + testSetKey(t, s, 1, "foo", "foo") + testSetKey(t, s, 2, "foo/bar", "bar") + testSetKey(t, s, 3, "foo/bar/zip", "zip") + testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp") + testSetKey(t, s, 5, "foo/bar/baz", "baz") + + // List out all of the keys + idx, entries, err = s.KVSList("") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 5 { + t.Fatalf("bad index: %d", idx) + } + + // Check that all of the keys were returned + if n := len(entries); n != 5 { + t.Fatalf("expected 5 kvs entries, got: %d", n) + } + + // Try listing with a provided prefix + idx, entries, err = s.KVSList("foo/bar/zip") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 4 { + t.Fatalf("bad index: %d", idx) + } + + // Check that only the keys in the prefix were returned + if n := len(entries); n != 2 { + t.Fatalf("expected 2 kvs entries, got: %d", n) + } + if entries[0].Key != "foo/bar/zip" || entries[1].Key != "foo/bar/zip/zorp" { + t.Fatalf("bad: %#v", entries) + } + + // Delete a key and make sure the index comes from the tombstone. + if err := s.KVSDelete(6, "foo/bar/baz"); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSList("foo/bar/baz") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + + // Set a different key to bump the index. + testSetKey(t, s, 7, "some/other/key", "") + + // Make sure we get the right index from the tombstone. + idx, _, err = s.KVSList("foo/bar/baz") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + + // Now reap the tombstones and make sure we get the latest index + // since there are no matching keys. + if err := s.ReapTombstones(6); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSList("foo/bar/baz") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + + // List all the keys to make sure the index is also correct. + idx, _, err = s.KVSList("") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } +} + +func TestStateStore_KVSListKeys(t *testing.T) { + s := testStateStore(t) + + // Listing keys with no results returns nil. + idx, keys, err := s.KVSListKeys("", "") + if idx != 0 || keys != nil || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, keys, err) + } + + // Create some keys. + testSetKey(t, s, 1, "foo", "foo") + testSetKey(t, s, 2, "foo/bar", "bar") + testSetKey(t, s, 3, "foo/bar/baz", "baz") + testSetKey(t, s, 4, "foo/bar/zip", "zip") + testSetKey(t, s, 5, "foo/bar/zip/zam", "zam") + testSetKey(t, s, 6, "foo/bar/zip/zorp", "zorp") + testSetKey(t, s, 7, "some/other/prefix", "nack") + + // List all the keys. + idx, keys, err = s.KVSListKeys("", "") + if err != nil { + t.Fatalf("err: %s", err) + } + if len(keys) != 7 { + t.Fatalf("bad keys: %#v", keys) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + + // Query using a prefix and pass a separator. + idx, keys, err = s.KVSListKeys("foo/bar/", "/") + if err != nil { + t.Fatalf("err: %s", err) + } + if len(keys) != 3 { + t.Fatalf("bad keys: %#v", keys) + } + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + + // Subset of the keys was returned. + expect := []string{"foo/bar/baz", "foo/bar/zip", "foo/bar/zip/"} + if !reflect.DeepEqual(keys, expect) { + t.Fatalf("bad keys: %#v", keys) + } + + // Listing keys with no separator returns everything. + idx, keys, err = s.KVSListKeys("foo", "") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + expect = []string{"foo", "foo/bar", "foo/bar/baz", "foo/bar/zip", + "foo/bar/zip/zam", "foo/bar/zip/zorp"} + if !reflect.DeepEqual(keys, expect) { + t.Fatalf("bad keys: %#v", keys) + } + + // Delete a key and make sure the index comes from the tombstone. + if err := s.KVSDelete(8, "foo/bar/baz"); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSListKeys("foo/bar/baz", "") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 8 { + t.Fatalf("bad index: %d", idx) + } + + // Set a different key to bump the index. + testSetKey(t, s, 9, "some/other/key", "") + + // Make sure the index still comes from the tombstone. + idx, _, err = s.KVSListKeys("foo/bar/baz", "") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 8 { + t.Fatalf("bad index: %d", idx) + } + + // Now reap the tombstones and make sure we get the latest index + // since there are no matching keys. + if err := s.ReapTombstones(8); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSListKeys("foo/bar/baz", "") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 9 { + t.Fatalf("bad index: %d", idx) + } + + // List all the keys to make sure the index is also correct. + idx, _, err = s.KVSListKeys("", "") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 9 { + t.Fatalf("bad index: %d", idx) + } +} + +func TestStateStore_KVSDelete(t *testing.T) { + s := testStateStore(t) + + // Create some KV pairs + testSetKey(t, s, 1, "foo", "foo") + testSetKey(t, s, 2, "foo/bar", "bar") + + // Call a delete on a specific key + if err := s.KVSDelete(3, "foo"); err != nil { + t.Fatalf("err: %s", err) + } + + // The entry was removed from the state store + tx := s.db.Txn(false) + defer tx.Abort() + e, err := tx.First("kvs", "id", "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if e != nil { + t.Fatalf("expected kvs entry to be deleted, got: %#v", e) + } + + // Try fetching the other keys to ensure they still exist + e, err = tx.First("kvs", "id", "foo/bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if e == nil || string(e.(*structs.DirEntry).Value) != "bar" { + t.Fatalf("bad kvs entry: %#v", e) + } + + // Check that the index table was updated + if idx := s.maxIndex("kvs"); idx != 3 { + t.Fatalf("bad index: %d", idx) + } + + // Check that the tombstone was created and that prevents the index + // from sliding backwards. + idx, _, err := s.KVSList("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } + + // Now reap the tombstone and watch the index revert to the remaining + // foo/bar key's index. + if err := s.ReapTombstones(3); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSList("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 2 { + t.Fatalf("bad index: %d", idx) + } + + // Deleting a nonexistent key should be idempotent and not return an + // error + if err := s.KVSDelete(4, "foo"); err != nil { + t.Fatalf("err: %s", err) + } + if idx := s.maxIndex("kvs"); idx != 3 { + t.Fatalf("bad index: %d", idx) + } +} + +func TestStateStore_KVSDeleteCAS(t *testing.T) { + s := testStateStore(t) + + // Create some KV entries + testSetKey(t, s, 1, "foo", "foo") + testSetKey(t, s, 2, "bar", "bar") + testSetKey(t, s, 3, "baz", "baz") + + // Do a CAS delete with an index lower than the entry + ok, err := s.KVSDeleteCAS(4, 1, "bar") + if ok || err != nil { + t.Fatalf("expected (false, nil), got: (%v, %#v)", ok, err) + } + + // Check that the index is untouched and the entry + // has not been deleted. + idx, e, err := s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if e == nil { + t.Fatalf("expected a kvs entry, got nil") + } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } + + // Do another CAS delete, this time with the correct index + // which should cause the delete to take place. + ok, err = s.KVSDeleteCAS(4, 2, "bar") + if !ok || err != nil { + t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err) + } + + // Entry was deleted and index was updated + idx, e, err = s.KVSGet("bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if e != nil { + t.Fatalf("entry should be deleted") + } + if idx != 4 { + t.Fatalf("bad index: %d", idx) + } + + // Add another key to bump the index. + testSetKey(t, s, 5, "some/other/key", "baz") + + // Check that the tombstone was created and that prevents the index + // from sliding backwards. + idx, _, err = s.KVSList("bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 4 { + t.Fatalf("bad index: %d", idx) + } + + // Now reap the tombstone and watch the index move up to the table + // index since there are no matching keys. + if err := s.ReapTombstones(4); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSList("bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 5 { + t.Fatalf("bad index: %d", idx) + } + + // A delete on a nonexistent key should be idempotent and not return an + // error + ok, err = s.KVSDeleteCAS(6, 2, "bar") + if !ok || err != nil { + t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err) + } + if idx := s.maxIndex("kvs"); idx != 5 { + t.Fatalf("bad index: %d", idx) + } +} + +func TestStateStore_KVSSetCAS(t *testing.T) { + s := testStateStore(t) + + // Doing a CAS with ModifyIndex != 0 and no existing entry + // is a no-op. + entry := &structs.DirEntry{ + Key: "foo", + Value: []byte("foo"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 1, + ModifyIndex: 1, + }, + } + ok, err := s.KVSSetCAS(2, entry) + if ok || err != nil { + t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err) + } + + // Check that nothing was actually stored + tx := s.db.Txn(false) + if e, err := tx.First("kvs", "id", "foo"); e != nil || err != nil { + t.Fatalf("expected (nil, nil), got: (%#v, %#v)", e, err) + } + tx.Abort() + + // Index was not updated + if idx := s.maxIndex("kvs"); idx != 0 { + t.Fatalf("bad index: %d", idx) + } + + // Doing a CAS with a ModifyIndex of zero when no entry exists + // performs the set and saves into the state store. + entry = &structs.DirEntry{ + Key: "foo", + Value: []byte("foo"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 0, + ModifyIndex: 0, + }, + } + ok, err = s.KVSSetCAS(2, entry) + if !ok || err != nil { + t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err) + } + + // Entry was inserted + idx, entry, err := s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 { + t.Fatalf("bad entry: %#v", entry) + } + if idx != 2 { + t.Fatalf("bad index: %d", idx) + } + + // Doing a CAS with a ModifyIndex of zero when an entry exists does + // not do anything. + entry = &structs.DirEntry{ + Key: "foo", + Value: []byte("foo"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 0, + ModifyIndex: 0, + }, + } + ok, err = s.KVSSetCAS(3, entry) + if ok || err != nil { + t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err) + } + + // Doing a CAS with a ModifyIndex which does not match the current + // index does not do anything. + entry = &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 3, + }, + } + ok, err = s.KVSSetCAS(3, entry) + if ok || err != nil { + t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err) + } + + // Entry was not updated in the store + idx, entry, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 { + t.Fatalf("bad entry: %#v", entry) + } + if idx != 2 { + t.Fatalf("bad index: %d", idx) + } + + // Doing a CAS with the proper current index should make the + // modification. + entry = &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + } + ok, err = s.KVSSetCAS(3, entry) + if !ok || err != nil { + t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err) + } + + // Entry was updated + idx, entry, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if string(entry.Value) != "bar" || entry.CreateIndex != 2 || entry.ModifyIndex != 3 { + t.Fatalf("bad entry: %#v", entry) + } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } + + // Attempt to update the session during the CAS. + entry = &structs.DirEntry{ + Key: "foo", + Value: []byte("zoo"), + Session: "nope", + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 3, + }, + } + ok, err = s.KVSSetCAS(4, entry) + if !ok || err != nil { + t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err) + } + + // Entry was updated, but the session should have been ignored. + idx, entry, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if string(entry.Value) != "zoo" || entry.CreateIndex != 2 || entry.ModifyIndex != 4 || + entry.Session != "" { + t.Fatalf("bad entry: %#v", entry) + } + if idx != 4 { + t.Fatalf("bad index: %d", idx) + } + + // Now lock it and try the update, which should keep the session. + testRegisterNode(t, s, 5, "node1") + session := testUUID() + if err := s.SessionCreate(6, &structs.Session{ID: session, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + entry = &structs.DirEntry{ + Key: "foo", + Value: []byte("locked"), + Session: session, + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 4, + }, + } + ok, err = s.KVSLock(6, entry) + if !ok || err != nil { + t.Fatalf("didn't get the lock: %v %s", ok, err) + } + entry = &structs.DirEntry{ + Key: "foo", + Value: []byte("locked"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 6, + }, + } + ok, err = s.KVSSetCAS(7, entry) + if !ok || err != nil { + t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err) + } + + // Entry was updated, and the lock status should have stayed the same. + idx, entry, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if string(entry.Value) != "locked" || entry.CreateIndex != 2 || entry.ModifyIndex != 7 || + entry.Session != session { + t.Fatalf("bad entry: %#v", entry) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } +} + +func TestStateStore_KVSDeleteTree(t *testing.T) { + s := testStateStore(t) + + // Create kvs entries in the state store + testSetKey(t, s, 1, "foo/bar", "bar") + testSetKey(t, s, 2, "foo/bar/baz", "baz") + testSetKey(t, s, 3, "foo/bar/zip", "zip") + testSetKey(t, s, 4, "foo/zorp", "zorp") + + // Calling tree deletion which affects nothing does not + // modify the table index. + if err := s.KVSDeleteTree(9, "bar"); err != nil { + t.Fatalf("err: %s", err) + } + if idx := s.maxIndex("kvs"); idx != 4 { + t.Fatalf("bad index: %d", idx) + } + + // Call tree deletion with a nested prefix. + if err := s.KVSDeleteTree(5, "foo/bar"); err != nil { + t.Fatalf("err: %s", err) + } + + // Check that all the matching keys were deleted + tx := s.db.Txn(false) + defer tx.Abort() + + entries, err := tx.Get("kvs", "id") + if err != nil { + t.Fatalf("err: %s", err) + } + + num := 0 + for entry := entries.Next(); entry != nil; entry = entries.Next() { + if entry.(*structs.DirEntry).Key != "foo/zorp" { + t.Fatalf("unexpected kvs entry: %#v", entry) + } + num++ + } + + if num != 1 { + t.Fatalf("expected 1 key, got: %d", num) + } + + // Index should be updated if modifications are made + if idx := s.maxIndex("kvs"); idx != 5 { + t.Fatalf("bad index: %d", idx) + } + + // Check that the tombstones ware created and that prevents the index + // from sliding backwards. + idx, _, err := s.KVSList("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 5 { + t.Fatalf("bad index: %d", idx) + } + + // Now reap the tombstones and watch the index revert to the remaining + // foo/zorp key's index. + if err := s.ReapTombstones(5); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSList("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 4 { + t.Fatalf("bad index: %d", idx) + } +} + +func TestStateStore_KVSLockDelay(t *testing.T) { + s := testStateStore(t) + + // KVSLockDelay is exercised in the lock/unlock and session invalidation + // cases below, so we just do a basic check on a nonexistent key here. + expires := s.KVSLockDelay("/not/there") + if expires.After(time.Now()) { + t.Fatalf("bad: %v", expires) + } +} + +func TestStateStore_KVSLock(t *testing.T) { + s := testStateStore(t) + + // Lock with no session should fail. + ok, err := s.KVSLock(0, &structs.DirEntry{Key: "foo", Value: []byte("foo")}) + if ok || err == nil || !strings.Contains(err.Error(), "missing session") { + t.Fatalf("didn't detect missing session: %v %s", ok, err) + } + + // Now try with a bogus session. + ok, err = s.KVSLock(1, &structs.DirEntry{Key: "foo", Value: []byte("foo"), Session: testUUID()}) + if ok || err == nil || !strings.Contains(err.Error(), "invalid session") { + t.Fatalf("didn't detect invalid session: %v %s", ok, err) + } + + // Make a real session. + testRegisterNode(t, s, 2, "node1") + session1 := testUUID() + if err := s.SessionCreate(3, &structs.Session{ID: session1, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + + // Lock and make the key at the same time. + ok, err = s.KVSLock(4, &structs.DirEntry{Key: "foo", Value: []byte("foo"), Session: session1}) + if !ok || err != nil { + t.Fatalf("didn't get the lock: %v %s", ok, err) + } + + // Make sure the indexes got set properly. + idx, result, err := s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 4 || + string(result.Value) != "foo" { + t.Fatalf("bad entry: %#v", result) + } + if idx != 4 { + t.Fatalf("bad index: %d", idx) + } + + // Re-locking with the same session should update the value and report + // success. + ok, err = s.KVSLock(5, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: session1}) + if !ok || err != nil { + t.Fatalf("didn't handle locking an already-locked key: %v %s", ok, err) + } + + // Make sure the indexes got set properly, note that the lock index + // won't go up since we didn't lock it again. + idx, result, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 5 || + string(result.Value) != "bar" { + t.Fatalf("bad entry: %#v", result) + } + if idx != 5 { + t.Fatalf("bad index: %d", idx) + } + + // Unlock and the re-lock. + ok, err = s.KVSUnlock(6, &structs.DirEntry{Key: "foo", Value: []byte("baz"), Session: session1}) + if !ok || err != nil { + t.Fatalf("didn't handle unlocking a locked key: %v %s", ok, err) + } + ok, err = s.KVSLock(7, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: session1}) + if !ok || err != nil { + t.Fatalf("didn't get the lock: %v %s", ok, err) + } + + // Make sure the indexes got set properly. + idx, result, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.LockIndex != 2 || result.CreateIndex != 4 || result.ModifyIndex != 7 || + string(result.Value) != "zoo" { + t.Fatalf("bad entry: %#v", result) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + + // Lock an existing key. + testSetKey(t, s, 8, "bar", "bar") + ok, err = s.KVSLock(9, &structs.DirEntry{Key: "bar", Value: []byte("xxx"), Session: session1}) + if !ok || err != nil { + t.Fatalf("didn't get the lock: %v %s", ok, err) + } + + // Make sure the indexes got set properly. + idx, result, err = s.KVSGet("bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.LockIndex != 1 || result.CreateIndex != 8 || result.ModifyIndex != 9 || + string(result.Value) != "xxx" { + t.Fatalf("bad entry: %#v", result) + } + if idx != 9 { + t.Fatalf("bad index: %d", idx) + } + + // Attempting a re-lock with a different session should also fail. + session2 := testUUID() + if err := s.SessionCreate(10, &structs.Session{ID: session2, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + + // Re-locking should not return an error, but will report that it didn't + // get the lock. + ok, err = s.KVSLock(11, &structs.DirEntry{Key: "bar", Value: []byte("nope"), Session: session2}) + if ok || err != nil { + t.Fatalf("didn't handle locking an already-locked key: %v %s", ok, err) + } + + // Make sure the indexes didn't update. + idx, result, err = s.KVSGet("bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.LockIndex != 1 || result.CreateIndex != 8 || result.ModifyIndex != 9 || + string(result.Value) != "xxx" { + t.Fatalf("bad entry: %#v", result) + } + if idx != 9 { + t.Fatalf("bad index: %d", idx) + } +} + +func TestStateStore_KVSUnlock(t *testing.T) { + s := testStateStore(t) + + // Unlock with no session should fail. + ok, err := s.KVSUnlock(0, &structs.DirEntry{Key: "foo", Value: []byte("bar")}) + if ok || err == nil || !strings.Contains(err.Error(), "missing session") { + t.Fatalf("didn't detect missing session: %v %s", ok, err) + } + + // Make a real session. + testRegisterNode(t, s, 1, "node1") + session1 := testUUID() + if err := s.SessionCreate(2, &structs.Session{ID: session1, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + + // Unlock with a real session but no key should not return an error, but + // will report it didn't unlock anything. + ok, err = s.KVSUnlock(3, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: session1}) + if ok || err != nil { + t.Fatalf("didn't handle unlocking a missing key: %v %s", ok, err) + } + + // Make a key and unlock it, without it being locked. + testSetKey(t, s, 4, "foo", "bar") + ok, err = s.KVSUnlock(5, &structs.DirEntry{Key: "foo", Value: []byte("baz"), Session: session1}) + if ok || err != nil { + t.Fatalf("didn't handle unlocking a non-locked key: %v %s", ok, err) + } + + // Make sure the indexes didn't update. + idx, result, err := s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.LockIndex != 0 || result.CreateIndex != 4 || result.ModifyIndex != 4 || + string(result.Value) != "bar" { + t.Fatalf("bad entry: %#v", result) + } + if idx != 4 { + t.Fatalf("bad index: %d", idx) + } + + // Lock it with the first session. + ok, err = s.KVSLock(6, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: session1}) + if !ok || err != nil { + t.Fatalf("didn't get the lock: %v %s", ok, err) + } + + // Attempt an unlock with another session. + session2 := testUUID() + if err := s.SessionCreate(7, &structs.Session{ID: session2, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + ok, err = s.KVSUnlock(8, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: session2}) + if ok || err != nil { + t.Fatalf("didn't handle unlocking with the wrong session: %v %s", ok, err) + } + + // Make sure the indexes didn't update. + idx, result, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 6 || + string(result.Value) != "bar" { + t.Fatalf("bad entry: %#v", result) + } + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + + // Now do the unlock with the correct session. + ok, err = s.KVSUnlock(9, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: session1}) + if !ok || err != nil { + t.Fatalf("didn't handle unlocking with the correct session: %v %s", ok, err) + } + + // Make sure the indexes got set properly. + idx, result, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 9 || + string(result.Value) != "zoo" { + t.Fatalf("bad entry: %#v", result) + } + if idx != 9 { + t.Fatalf("bad index: %d", idx) + } + + // Unlocking again should fail and not change anything. + ok, err = s.KVSUnlock(10, &structs.DirEntry{Key: "foo", Value: []byte("nope"), Session: session1}) + if ok || err != nil { + t.Fatalf("didn't handle unlocking with the previous session: %v %s", ok, err) + } + + // Make sure the indexes didn't update. + idx, result, err = s.KVSGet("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 9 || + string(result.Value) != "zoo" { + t.Fatalf("bad entry: %#v", result) + } + if idx != 9 { + t.Fatalf("bad index: %d", idx) + } +} + +func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { + s := testStateStore(t) + + // Build up some entries to seed. + entries := structs.DirEntries{ + &structs.DirEntry{ + Key: "aaa", + Flags: 23, + Value: []byte("hello"), + }, + &structs.DirEntry{ + Key: "bar/a", + Value: []byte("one"), + }, + &structs.DirEntry{ + Key: "bar/b", + Value: []byte("two"), + }, + &structs.DirEntry{ + Key: "bar/c", + Value: []byte("three"), + }, + } + for i, entry := range entries { + if err := s.KVSSet(uint64(i+1), entry); err != nil { + t.Fatalf("err: %s", err) + } + } + + // Make a node and session so we can test a locked key. + testRegisterNode(t, s, 5, "node1") + session := testUUID() + if err := s.SessionCreate(6, &structs.Session{ID: session, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + entries[3].Session = session + if ok, err := s.KVSLock(7, entries[3]); !ok || err != nil { + t.Fatalf("didn't get the lock: %v %s", ok, err) + } + + // This is required for the compare later. + entries[3].LockIndex = 1 + + // Snapshot the keys. + snap := s.Snapshot() + defer snap.Close() + + // Alter the real state store. + if err := s.KVSSet(8, &structs.DirEntry{Key: "aaa", Value: []byte("nope")}); err != nil { + t.Fatalf("err: %s", err) + } + + // Verify the snapshot. + if idx := snap.LastIndex(); idx != 7 { + t.Fatalf("bad index: %d", idx) + } + iter, err := snap.KVs() + if err != nil { + t.Fatalf("err: %s", err) + } + var dump structs.DirEntries + for entry := iter.Next(); entry != nil; entry = iter.Next() { + dump = append(dump, entry.(*structs.DirEntry)) + } + if !reflect.DeepEqual(dump, entries) { + t.Fatalf("bad: %#v", dump) + } + + // Restore the values into a new state store. + func() { + s := testStateStore(t) + restore := s.Restore() + for _, entry := range dump { + if err := restore.KVS(entry); err != nil { + t.Fatalf("err: %s", err) + } + } + restore.Commit() + + // Read the restored keys back out and verify they match. + idx, res, err := s.KVSList("") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 7 { + t.Fatalf("bad index: %d", idx) + } + if !reflect.DeepEqual(res, entries) { + t.Fatalf("bad: %#v", res) + } + + // Check that the index was updated. + if idx := s.maxIndex("kvs"); idx != 7 { + t.Fatalf("bad index: %d", idx) + } + }() +} + +func TestStateStore_KVS_Watches(t *testing.T) { + s := testStateStore(t) + + // This is used when locking down below. + testRegisterNode(t, s, 1, "node1") + session := testUUID() + if err := s.SessionCreate(2, &structs.Session{ID: session, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + + // An empty prefix watch should hit on all KVS ops, and some other + // prefix should not be affected ever. We also add a positive prefix + // match. + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSSet(1, &structs.DirEntry{Key: "aaa"}); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSSet(2, &structs.DirEntry{Key: "aaa"}); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + + // Restore just fires off a top-level watch, so we should get hits on + // any prefix, including ones for keys that aren't in there. + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("b"), func() { + verifyWatch(t, s.GetKVSWatch("/nope"), func() { + restore := s.Restore() + if err := restore.KVS(&structs.DirEntry{Key: "bbb"}); err != nil { + t.Fatalf("err: %s", err) + } + restore.Commit() + }) + }) + }) + + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSDelete(3, "aaa"); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if ok, err := s.KVSSetCAS(4, &structs.DirEntry{Key: "aaa"}); !ok || err != nil { + t.Fatalf("ok: %v err: %s", ok, err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if ok, err := s.KVSLock(5, &structs.DirEntry{Key: "aaa", Session: session}); !ok || err != nil { + t.Fatalf("ok: %v err: %s", ok, err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if ok, err := s.KVSUnlock(6, &structs.DirEntry{Key: "aaa", Session: session}); !ok || err != nil { + t.Fatalf("ok: %v err: %s", ok, err) + } + }) + }) + }) + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSDeleteTree(7, "aaa"); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + + // A delete tree operation at the top level will notify all the watches. + verifyWatch(t, s.GetKVSWatch(""), func() { + verifyWatch(t, s.GetKVSWatch("a"), func() { + verifyWatch(t, s.GetKVSWatch("/nope"), func() { + if err := s.KVSDeleteTree(8, ""); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + + // Create a more interesting tree. + testSetKey(t, s, 9, "foo/bar", "bar") + testSetKey(t, s, 10, "foo/bar/baz", "baz") + testSetKey(t, s, 11, "foo/bar/zip", "zip") + testSetKey(t, s, 12, "foo/zorp", "zorp") + + // Deleting just the foo/bar key should not trigger watches on the + // children. + verifyWatch(t, s.GetKVSWatch("foo/bar"), func() { + verifyNoWatch(t, s.GetKVSWatch("foo/bar/baz"), func() { + verifyNoWatch(t, s.GetKVSWatch("foo/bar/zip"), func() { + if err := s.KVSDelete(13, "foo/bar"); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + + // But a delete tree from that point should notify the whole subtree, + // even for keys that don't exist. + verifyWatch(t, s.GetKVSWatch("foo/bar"), func() { + verifyWatch(t, s.GetKVSWatch("foo/bar/baz"), func() { + verifyWatch(t, s.GetKVSWatch("foo/bar/zip"), func() { + verifyWatch(t, s.GetKVSWatch("foo/bar/uh/nope"), func() { + if err := s.KVSDeleteTree(14, "foo/bar"); err != nil { + t.Fatalf("err: %s", err) + } + }) + }) + }) + }) +} + +func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { + s := testStateStore(t) + + // Insert a key and then delete it to create a tombstone. + testSetKey(t, s, 1, "foo/bar", "bar") + testSetKey(t, s, 2, "foo/bar/baz", "bar") + testSetKey(t, s, 3, "foo/bar/zoo", "bar") + if err := s.KVSDelete(4, "foo/bar"); err != nil { + t.Fatalf("err: %s", err) + } + + // Snapshot the Tombstones. + snap := s.Snapshot() + defer snap.Close() + + // Alter the real state store. + if err := s.ReapTombstones(4); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err := s.KVSList("foo/bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } + + // Verify the snapshot. + stones, err := snap.Tombstones() + if err != nil { + t.Fatalf("err: %s", err) + } + var dump []*Tombstone + for stone := stones.Next(); stone != nil; stone = stones.Next() { + dump = append(dump, stone.(*Tombstone)) + } + if len(dump) != 1 { + t.Fatalf("bad %#v", dump) + } + stone := dump[0] + if stone.Key != "foo/bar" || stone.Index != 4 { + t.Fatalf("bad: %#v", stone) + } + + // Restore the values into a new state store. + func() { + s := testStateStore(t) + restore := s.Restore() + for _, stone := range dump { + if err := restore.Tombstone(stone); err != nil { + t.Fatalf("err: %s", err) + } + } + restore.Commit() + + // See if the stone works properly in a list query. + idx, _, err := s.KVSList("foo/bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 4 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure it reaps correctly. We should still get a 4 for + // the index here because it will be using the last index from + // the tombstone table. + if err := s.ReapTombstones(4); err != nil { + t.Fatalf("err: %s", err) + } + idx, _, err = s.KVSList("foo/bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 4 { + t.Fatalf("bad index: %d", idx) + } + + // But make sure the tombstone is actually gone. + snap := s.Snapshot() + defer snap.Close() + stones, err := snap.Tombstones() + if err != nil { + t.Fatalf("err: %s", err) + } + if stones.Next() != nil { + t.Fatalf("unexpected extra tombstones") + } + }() +} diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 7ba3ae3355..a5b4b2f837 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -169,20 +169,6 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) { return iter, nil } -// KVs is used to pull the full list of KVS entries for use during snapshots. -func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) { - iter, err := s.tx.Get("kvs", "id_prefix") - if err != nil { - return nil, err - } - return iter, nil -} - -// Tombstones is used to pull all the tombstones from the graveyard. -func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) { - return s.store.kvsGraveyard.DumpTxn(s.tx) -} - // Sessions is used to pull the full list of sessions for use during snapshots. func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) { iter, err := s.tx.Get("sessions", "id") @@ -246,30 +232,6 @@ func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) er return nil } -// KVS is used when restoring from a snapshot. Use KVSSet for general inserts. -func (s *StateRestore) KVS(entry *structs.DirEntry) error { - if err := s.tx.Insert("kvs", entry); err != nil { - return fmt.Errorf("failed inserting kvs entry: %s", err) - } - - if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - // We have a single top-level KVS watch trigger instead of doing - // tons of prefix watches. - return nil -} - -// Tombstone is used when restoring from a snapshot. For general inserts, use -// Graveyard.InsertTxn. -func (s *StateRestore) Tombstone(stone *Tombstone) error { - if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil { - return fmt.Errorf("failed restoring tombstone: %s", err) - } - return nil -} - // Session is used when restoring from a snapshot. For general inserts, use // SessionCreate. func (s *StateRestore) Session(sess *structs.Session) error { @@ -377,21 +339,6 @@ func indexUpdateMaxTxn(tx *memdb.Txn, idx uint64, table string) error { return nil } -// ReapTombstones is used to delete all the tombstones with an index -// less than or equal to the given index. This is used to prevent -// unbounded storage growth of the tombstones. -func (s *StateStore) ReapTombstones(index uint64) error { - tx := s.db.Txn(true) - defer tx.Abort() - - if err := s.kvsGraveyard.ReapTxn(tx, index); err != nil { - return fmt.Errorf("failed to reap kvs tombstones: %s", err) - } - - tx.Commit() - return nil -} - // getWatchTables returns the list of tables that should be watched and used for // max index calculations for the given query method. This is used for all // methods except for KVS. This will panic if the method is unknown. @@ -1408,468 +1355,6 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64, return idx, results, nil } -// KVSSet is used to store a key/value pair. -func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error { - tx := s.db.Txn(true) - defer tx.Abort() - - // Perform the actual set. - if err := s.kvsSetTxn(tx, idx, entry, false); err != nil { - return err - } - - tx.Commit() - return nil -} - -// kvsSetTxn is used to insert or update a key/value pair in the state -// store. It is the inner method used and handles only the actual storage. -// If updateSession is true, then the incoming entry will set the new -// session (should be validated before calling this). Otherwise, we will keep -// whatever the existing session is. -func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error { - // Retrieve an existing KV pair - existing, err := tx.First("kvs", "id", entry.Key) - if err != nil { - return fmt.Errorf("failed kvs lookup: %s", err) - } - - // Set the indexes. - if existing != nil { - entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex - } else { - entry.CreateIndex = idx - } - entry.ModifyIndex = idx - - // Preserve the existing session unless told otherwise. The "existing" - // session for a new entry is "no session". - if !updateSession { - if existing != nil { - entry.Session = existing.(*structs.DirEntry).Session - } else { - entry.Session = "" - } - } - - // Store the kv pair in the state store and update the index. - if err := tx.Insert("kvs", entry); err != nil { - return fmt.Errorf("failed inserting kvs entry: %s", err) - } - if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - tx.Defer(func() { s.kvsWatch.Notify(entry.Key, false) }) - return nil -} - -// KVSGet is used to retrieve a key/value pair from the state store. -func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the table index. - idx := maxIndexTxn(tx, "kvs", "tombstones") - - // Retrieve the key. - entry, err := tx.First("kvs", "id", key) - if err != nil { - return 0, nil, fmt.Errorf("failed kvs lookup: %s", err) - } - if entry != nil { - return idx, entry.(*structs.DirEntry), nil - } - return idx, nil, nil -} - -// KVSList is used to list out all keys under a given prefix. If the -// prefix is left empty, all keys in the KVS will be returned. The returned -// is the max index of the returned kvs entries or applicable tombstones, or -// else it's the full table indexes for kvs and tombstones. -func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the table indexes. - idx := maxIndexTxn(tx, "kvs", "tombstones") - - // Query the prefix and list the available keys - entries, err := tx.Get("kvs", "id_prefix", prefix) - if err != nil { - return 0, nil, fmt.Errorf("failed kvs lookup: %s", err) - } - - // Gather all of the keys found in the store - var ents structs.DirEntries - var lindex uint64 - for entry := entries.Next(); entry != nil; entry = entries.Next() { - e := entry.(*structs.DirEntry) - ents = append(ents, e) - if e.ModifyIndex > lindex { - lindex = e.ModifyIndex - } - } - - // Check for the highest index in the graveyard. If the prefix is empty - // then just use the full table indexes since we are listing everything. - if prefix != "" { - gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix) - if err != nil { - return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err) - } - if gindex > lindex { - lindex = gindex - } - } else { - lindex = idx - } - - // Use the sub index if it was set and there are entries, otherwise use - // the full table index from above. - if lindex != 0 { - idx = lindex - } - return idx, ents, nil -} - -// KVSListKeys is used to query the KV store for keys matching the given prefix. -// An optional separator may be specified, which can be used to slice off a part -// of the response so that only a subset of the prefix is returned. In this -// mode, the keys which are omitted are still counted in the returned index. -func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the table indexes. - idx := maxIndexTxn(tx, "kvs", "tombstones") - - // Fetch keys using the specified prefix - entries, err := tx.Get("kvs", "id_prefix", prefix) - if err != nil { - return 0, nil, fmt.Errorf("failed kvs lookup: %s", err) - } - - prefixLen := len(prefix) - sepLen := len(sep) - - var keys []string - var lindex uint64 - var last string - for entry := entries.Next(); entry != nil; entry = entries.Next() { - e := entry.(*structs.DirEntry) - - // Accumulate the high index - if e.ModifyIndex > lindex { - lindex = e.ModifyIndex - } - - // Always accumulate if no separator provided - if sepLen == 0 { - keys = append(keys, e.Key) - continue - } - - // Parse and de-duplicate the returned keys based on the - // key separator, if provided. - after := e.Key[prefixLen:] - sepIdx := strings.Index(after, sep) - if sepIdx > -1 { - key := e.Key[:prefixLen+sepIdx+sepLen] - if key != last { - keys = append(keys, key) - last = key - } - } else { - keys = append(keys, e.Key) - } - } - - // Check for the highest index in the graveyard. If the prefix is empty - // then just use the full table indexes since we are listing everything. - if prefix != "" { - gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix) - if err != nil { - return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err) - } - if gindex > lindex { - lindex = gindex - } - } else { - lindex = idx - } - - // Use the sub index if it was set and there are entries, otherwise use - // the full table index from above. - if lindex != 0 { - idx = lindex - } - return idx, keys, nil -} - -// KVSDelete is used to perform a shallow delete on a single key in the -// the state store. -func (s *StateStore) KVSDelete(idx uint64, key string) error { - tx := s.db.Txn(true) - defer tx.Abort() - - // Perform the actual delete - if err := s.kvsDeleteTxn(tx, idx, key); err != nil { - return err - } - - tx.Commit() - return nil -} - -// kvsDeleteTxn is the inner method used to perform the actual deletion -// of a key/value pair within an existing transaction. -func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error { - // Look up the entry in the state store. - entry, err := tx.First("kvs", "id", key) - if err != nil { - return fmt.Errorf("failed kvs lookup: %s", err) - } - if entry == nil { - return nil - } - - // Create a tombstone. - if err := s.kvsGraveyard.InsertTxn(tx, key, idx); err != nil { - return fmt.Errorf("failed adding to graveyard: %s", err) - } - - // Delete the entry and update the index. - if err := tx.Delete("kvs", entry); err != nil { - return fmt.Errorf("failed deleting kvs entry: %s", err) - } - if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - tx.Defer(func() { s.kvsWatch.Notify(key, false) }) - return nil -} - -// KVSDeleteCAS is used to try doing a KV delete operation with a given -// raft index. If the CAS index specified is not equal to the last -// observed index for the given key, then the call is a noop, otherwise -// a normal KV delete is invoked. -func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) { - tx := s.db.Txn(true) - defer tx.Abort() - - // Retrieve the existing kvs entry, if any exists. - entry, err := tx.First("kvs", "id", key) - if err != nil { - return false, fmt.Errorf("failed kvs lookup: %s", err) - } - - // If the existing index does not match the provided CAS - // index arg, then we shouldn't update anything and can safely - // return early here. - e, ok := entry.(*structs.DirEntry) - if !ok || e.ModifyIndex != cidx { - return entry == nil, nil - } - - // Call the actual deletion if the above passed. - if err := s.kvsDeleteTxn(tx, idx, key); err != nil { - return false, err - } - - tx.Commit() - return true, nil -} - -// KVSSetCAS is used to do a check-and-set operation on a KV entry. The -// ModifyIndex in the provided entry is used to determine if we should -// write the entry to the state store or bail. Returns a bool indicating -// if a write happened and any error. -func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) { - tx := s.db.Txn(true) - defer tx.Abort() - - // Retrieve the existing entry. - existing, err := tx.First("kvs", "id", entry.Key) - if err != nil { - return false, fmt.Errorf("failed kvs lookup: %s", err) - } - - // Check if the we should do the set. A ModifyIndex of 0 means that - // we are doing a set-if-not-exists. - if entry.ModifyIndex == 0 && existing != nil { - return false, nil - } - if entry.ModifyIndex != 0 && existing == nil { - return false, nil - } - e, ok := existing.(*structs.DirEntry) - if ok && entry.ModifyIndex != 0 && entry.ModifyIndex != e.ModifyIndex { - return false, nil - } - - // If we made it this far, we should perform the set. - if err := s.kvsSetTxn(tx, idx, entry, false); err != nil { - return false, err - } - - tx.Commit() - return true, nil -} - -// KVSDeleteTree is used to do a recursive delete on a key prefix -// in the state store. If any keys are modified, the last index is -// set, otherwise this is a no-op. -func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error { - tx := s.db.Txn(true) - defer tx.Abort() - - // Get an iterator over all of the keys with the given prefix. - entries, err := tx.Get("kvs", "id_prefix", prefix) - if err != nil { - return fmt.Errorf("failed kvs lookup: %s", err) - } - - // Go over all of the keys and remove them. We call the delete - // directly so that we only update the index once. We also add - // tombstones as we go. - var modified bool - var objs []interface{} - for entry := entries.Next(); entry != nil; entry = entries.Next() { - e := entry.(*structs.DirEntry) - if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil { - return fmt.Errorf("failed adding to graveyard: %s", err) - } - objs = append(objs, entry) - modified = true - } - - // Do the actual deletes in a separate loop so we don't trash the - // iterator as we go. - for _, obj := range objs { - if err := tx.Delete("kvs", obj); err != nil { - return fmt.Errorf("failed deleting kvs entry: %s", err) - } - } - - // Update the index - if modified { - tx.Defer(func() { s.kvsWatch.Notify(prefix, true) }) - if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - } - - tx.Commit() - return nil -} - -// KVSLockDelay returns the expiration time for any lock delay associated with -// the given key. -func (s *StateStore) KVSLockDelay(key string) time.Time { - return s.lockDelay.GetExpiration(key) -} - -// KVSLock is similar to KVSSet but only performs the set if the lock can be -// acquired. -func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) { - tx := s.db.Txn(true) - defer tx.Abort() - - // Verify that a session is present. - if entry.Session == "" { - return false, fmt.Errorf("missing session") - } - - // Verify that the session exists. - sess, err := tx.First("sessions", "id", entry.Session) - if err != nil { - return false, fmt.Errorf("failed session lookup: %s", err) - } - if sess == nil { - return false, fmt.Errorf("invalid session %#v", entry.Session) - } - - // Retrieve the existing entry. - existing, err := tx.First("kvs", "id", entry.Key) - if err != nil { - return false, fmt.Errorf("failed kvs lookup: %s", err) - } - - // Set up the entry, using the existing entry if present. - if existing != nil { - e := existing.(*structs.DirEntry) - if e.Session == entry.Session { - // We already hold this lock, good to go. - entry.CreateIndex = e.CreateIndex - entry.LockIndex = e.LockIndex - } else if e.Session != "" { - // Bail out, someone else holds this lock. - return false, nil - } else { - // Set up a new lock with this session. - entry.CreateIndex = e.CreateIndex - entry.LockIndex = e.LockIndex + 1 - } - } else { - entry.CreateIndex = idx - entry.LockIndex = 1 - } - entry.ModifyIndex = idx - - // If we made it this far, we should perform the set. - if err := s.kvsSetTxn(tx, idx, entry, true); err != nil { - return false, err - } - - tx.Commit() - return true, nil -} - -// KVSUnlock is similar to KVSSet but only performs the set if the lock can be -// unlocked (the key must already exist and be locked). -func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) { - tx := s.db.Txn(true) - defer tx.Abort() - - // Verify that a session is present. - if entry.Session == "" { - return false, fmt.Errorf("missing session") - } - - // Retrieve the existing entry. - existing, err := tx.First("kvs", "id", entry.Key) - if err != nil { - return false, fmt.Errorf("failed kvs lookup: %s", err) - } - - // Bail if there's no existing key. - if existing == nil { - return false, nil - } - - // Make sure the given session is the lock holder. - e := existing.(*structs.DirEntry) - if e.Session != entry.Session { - return false, nil - } - - // Clear the lock and update the entry. - entry.Session = "" - entry.LockIndex = e.LockIndex - entry.CreateIndex = e.CreateIndex - entry.ModifyIndex = idx - - // If we made it this far, we should perform the set. - if err := s.kvsSetTxn(tx, idx, entry, true); err != nil { - return false, err - } - - tx.Commit() - return true, nil -} - // SessionCreate is used to register a new session in the state store. func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error { tx := s.db.Txn(true) diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 114745ca94..d88275642f 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -183,166 +183,6 @@ func TestStateStore_indexUpdateMaxTxn(t *testing.T) { } } -func TestStateStore_GC(t *testing.T) { - // Build up a fast GC. - ttl := 10 * time.Millisecond - gran := 5 * time.Millisecond - gc, err := NewTombstoneGC(ttl, gran) - if err != nil { - t.Fatalf("err: %s", err) - } - - // Enable it and attach it to the state store. - gc.SetEnabled(true) - s, err := NewStateStore(gc) - if err != nil { - t.Fatalf("err: %s", err) - } - - // Create some KV pairs. - testSetKey(t, s, 1, "foo", "foo") - testSetKey(t, s, 2, "foo/bar", "bar") - testSetKey(t, s, 3, "foo/baz", "bar") - testSetKey(t, s, 4, "foo/moo", "bar") - testSetKey(t, s, 5, "foo/zoo", "bar") - - // Delete a key and make sure the GC sees it. - if err := s.KVSDelete(6, "foo/zoo"); err != nil { - t.Fatalf("err: %s", err) - } - select { - case idx := <-gc.ExpireCh(): - if idx != 6 { - t.Fatalf("bad index: %d", idx) - } - case <-time.After(2 * ttl): - t.Fatalf("GC never fired") - } - - // Check for the same behavior with a tree delete. - if err := s.KVSDeleteTree(7, "foo/moo"); err != nil { - t.Fatalf("err: %s", err) - } - select { - case idx := <-gc.ExpireCh(): - if idx != 7 { - t.Fatalf("bad index: %d", idx) - } - case <-time.After(2 * ttl): - t.Fatalf("GC never fired") - } - - // Check for the same behavior with a CAS delete. - if ok, err := s.KVSDeleteCAS(8, 3, "foo/baz"); !ok || err != nil { - t.Fatalf("err: %s", err) - } - select { - case idx := <-gc.ExpireCh(): - if idx != 8 { - t.Fatalf("bad index: %d", idx) - } - case <-time.After(2 * ttl): - t.Fatalf("GC never fired") - } - - // Finally, try it with an expiring session. - testRegisterNode(t, s, 9, "node1") - session := &structs.Session{ - ID: testUUID(), - Node: "node1", - Behavior: structs.SessionKeysDelete, - } - if err := s.SessionCreate(10, session); err != nil { - t.Fatalf("err: %s", err) - } - d := &structs.DirEntry{ - Key: "lock", - Session: session.ID, - } - if ok, err := s.KVSLock(11, d); !ok || err != nil { - t.Fatalf("err: %v", err) - } - if err := s.SessionDestroy(12, session.ID); err != nil { - t.Fatalf("err: %s", err) - } - select { - case idx := <-gc.ExpireCh(): - if idx != 12 { - t.Fatalf("bad index: %d", idx) - } - case <-time.After(2 * ttl): - t.Fatalf("GC never fired") - } -} - -func TestStateStore_ReapTombstones(t *testing.T) { - s := testStateStore(t) - - // Create some KV pairs. - testSetKey(t, s, 1, "foo", "foo") - testSetKey(t, s, 2, "foo/bar", "bar") - testSetKey(t, s, 3, "foo/baz", "bar") - testSetKey(t, s, 4, "foo/moo", "bar") - testSetKey(t, s, 5, "foo/zoo", "bar") - - // Call a delete on some specific keys. - if err := s.KVSDelete(6, "foo/baz"); err != nil { - t.Fatalf("err: %s", err) - } - if err := s.KVSDelete(7, "foo/moo"); err != nil { - t.Fatalf("err: %s", err) - } - - // Pull out the list and check the index, which should come from the - // tombstones. - idx, _, err := s.KVSList("foo/") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 7 { - t.Fatalf("bad index: %d", idx) - } - - // Reap the tombstones <= 6. - if err := s.ReapTombstones(6); err != nil { - t.Fatalf("err: %s", err) - } - - // Should still be good because 7 is in there. - idx, _, err = s.KVSList("foo/") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 7 { - t.Fatalf("bad index: %d", idx) - } - - // Now reap them all. - if err := s.ReapTombstones(7); err != nil { - t.Fatalf("err: %s", err) - } - - // At this point the sub index will slide backwards. - idx, _, err = s.KVSList("foo/") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 5 { - t.Fatalf("bad index: %d", idx) - } - - // Make sure the tombstones are actually gone. - snap := s.Snapshot() - defer snap.Close() - stones, err := snap.Tombstones() - if err != nil { - t.Fatalf("err: %s", err) - } - if stones.Next() != nil { - t.Fatalf("unexpected extra tombstones") - } -} - func TestStateStore_GetWatches(t *testing.T) { s := testStateStore(t) @@ -2230,1376 +2070,6 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) { } } -func TestStateStore_KVSSet_KVSGet(t *testing.T) { - s := testStateStore(t) - - // Get on an nonexistent key returns nil. - idx, result, err := s.KVSGet("foo") - if result != nil || err != nil || idx != 0 { - t.Fatalf("expected (0, nil, nil), got : (%#v, %#v, %#v)", idx, result, err) - } - - // Write a new K/V entry to the store. - entry := &structs.DirEntry{ - Key: "foo", - Value: []byte("bar"), - } - if err := s.KVSSet(1, entry); err != nil { - t.Fatalf("err: %s", err) - } - - // Retrieve the K/V entry again. - idx, result, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result == nil { - t.Fatalf("expected k/v pair, got nothing") - } - if idx != 1 { - t.Fatalf("bad index: %d", idx) - } - - // Check that the index was injected into the result. - if result.CreateIndex != 1 || result.ModifyIndex != 1 { - t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex) - } - - // Check that the value matches. - if v := string(result.Value); v != "bar" { - t.Fatalf("expected 'bar', got: '%s'", v) - } - - // Updating the entry works and changes the index. - update := &structs.DirEntry{ - Key: "foo", - Value: []byte("baz"), - } - if err := s.KVSSet(2, update); err != nil { - t.Fatalf("err: %s", err) - } - - // Fetch the kv pair and check. - idx, result, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.CreateIndex != 1 || result.ModifyIndex != 2 { - t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex) - } - if v := string(result.Value); v != "baz" { - t.Fatalf("expected 'baz', got '%s'", v) - } - if idx != 2 { - t.Fatalf("bad index: %d", idx) - } - - // Attempt to set the session during an update. - update = &structs.DirEntry{ - Key: "foo", - Value: []byte("zoo"), - Session: "nope", - } - if err := s.KVSSet(3, update); err != nil { - t.Fatalf("err: %s", err) - } - - // Fetch the kv pair and check. - idx, result, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.CreateIndex != 1 || result.ModifyIndex != 3 { - t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex) - } - if v := string(result.Value); v != "zoo" { - t.Fatalf("expected 'zoo', got '%s'", v) - } - if result.Session != "" { - t.Fatalf("expected empty session, got '%s", result.Session) - } - if idx != 3 { - t.Fatalf("bad index: %d", idx) - } - - // Make a real session and then lock the key to set the session. - testRegisterNode(t, s, 4, "node1") - session := testUUID() - if err := s.SessionCreate(5, &structs.Session{ID: session, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - update = &structs.DirEntry{ - Key: "foo", - Value: []byte("locked"), - Session: session, - } - ok, err := s.KVSLock(6, update) - if !ok || err != nil { - t.Fatalf("didn't get the lock: %v %s", ok, err) - } - - // Fetch the kv pair and check. - idx, result, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.CreateIndex != 1 || result.ModifyIndex != 6 { - t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex) - } - if v := string(result.Value); v != "locked" { - t.Fatalf("expected 'zoo', got '%s'", v) - } - if result.Session != session { - t.Fatalf("expected session, got '%s", result.Session) - } - if idx != 6 { - t.Fatalf("bad index: %d", idx) - } - - // Now make an update without the session and make sure it gets applied - // and doesn't take away the session (it is allowed to change the value). - update = &structs.DirEntry{ - Key: "foo", - Value: []byte("stoleit"), - } - if err := s.KVSSet(7, update); err != nil { - t.Fatalf("err: %s", err) - } - - // Fetch the kv pair and check. - idx, result, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.CreateIndex != 1 || result.ModifyIndex != 7 { - t.Fatalf("bad index: %d, %d", result.CreateIndex, result.ModifyIndex) - } - if v := string(result.Value); v != "stoleit" { - t.Fatalf("expected 'zoo', got '%s'", v) - } - if result.Session != session { - t.Fatalf("expected session, got '%s", result.Session) - } - if idx != 7 { - t.Fatalf("bad index: %d", idx) - } - - // Fetch a key that doesn't exist and make sure we get the right - // response. - idx, result, err = s.KVSGet("nope") - if result != nil || err != nil || idx != 7 { - t.Fatalf("expected (7, nil, nil), got : (%#v, %#v, %#v)", idx, result, err) - } -} - -func TestStateStore_KVSList(t *testing.T) { - s := testStateStore(t) - - // Listing an empty KVS returns nothing - idx, entries, err := s.KVSList("") - if idx != 0 || entries != nil || err != nil { - t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, entries, err) - } - - // Create some KVS entries - testSetKey(t, s, 1, "foo", "foo") - testSetKey(t, s, 2, "foo/bar", "bar") - testSetKey(t, s, 3, "foo/bar/zip", "zip") - testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp") - testSetKey(t, s, 5, "foo/bar/baz", "baz") - - // List out all of the keys - idx, entries, err = s.KVSList("") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 5 { - t.Fatalf("bad index: %d", idx) - } - - // Check that all of the keys were returned - if n := len(entries); n != 5 { - t.Fatalf("expected 5 kvs entries, got: %d", n) - } - - // Try listing with a provided prefix - idx, entries, err = s.KVSList("foo/bar/zip") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } - - // Check that only the keys in the prefix were returned - if n := len(entries); n != 2 { - t.Fatalf("expected 2 kvs entries, got: %d", n) - } - if entries[0].Key != "foo/bar/zip" || entries[1].Key != "foo/bar/zip/zorp" { - t.Fatalf("bad: %#v", entries) - } - - // Delete a key and make sure the index comes from the tombstone. - if err := s.KVSDelete(6, "foo/bar/baz"); err != nil { - t.Fatalf("err: %s", err) - } - idx, _, err = s.KVSList("foo/bar/baz") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 6 { - t.Fatalf("bad index: %d", idx) - } - - // Set a different key to bump the index. - testSetKey(t, s, 7, "some/other/key", "") - - // Make sure we get the right index from the tombstone. - idx, _, err = s.KVSList("foo/bar/baz") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 6 { - t.Fatalf("bad index: %d", idx) - } - - // Now reap the tombstones and make sure we get the latest index - // since there are no matching keys. - if err := s.ReapTombstones(6); err != nil { - t.Fatalf("err: %s", err) - } - idx, _, err = s.KVSList("foo/bar/baz") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 7 { - t.Fatalf("bad index: %d", idx) - } - - // List all the keys to make sure the index is also correct. - idx, _, err = s.KVSList("") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 7 { - t.Fatalf("bad index: %d", idx) - } -} - -func TestStateStore_KVSListKeys(t *testing.T) { - s := testStateStore(t) - - // Listing keys with no results returns nil. - idx, keys, err := s.KVSListKeys("", "") - if idx != 0 || keys != nil || err != nil { - t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, keys, err) - } - - // Create some keys. - testSetKey(t, s, 1, "foo", "foo") - testSetKey(t, s, 2, "foo/bar", "bar") - testSetKey(t, s, 3, "foo/bar/baz", "baz") - testSetKey(t, s, 4, "foo/bar/zip", "zip") - testSetKey(t, s, 5, "foo/bar/zip/zam", "zam") - testSetKey(t, s, 6, "foo/bar/zip/zorp", "zorp") - testSetKey(t, s, 7, "some/other/prefix", "nack") - - // List all the keys. - idx, keys, err = s.KVSListKeys("", "") - if err != nil { - t.Fatalf("err: %s", err) - } - if len(keys) != 7 { - t.Fatalf("bad keys: %#v", keys) - } - if idx != 7 { - t.Fatalf("bad index: %d", idx) - } - - // Query using a prefix and pass a separator. - idx, keys, err = s.KVSListKeys("foo/bar/", "/") - if err != nil { - t.Fatalf("err: %s", err) - } - if len(keys) != 3 { - t.Fatalf("bad keys: %#v", keys) - } - if idx != 6 { - t.Fatalf("bad index: %d", idx) - } - - // Subset of the keys was returned. - expect := []string{"foo/bar/baz", "foo/bar/zip", "foo/bar/zip/"} - if !reflect.DeepEqual(keys, expect) { - t.Fatalf("bad keys: %#v", keys) - } - - // Listing keys with no separator returns everything. - idx, keys, err = s.KVSListKeys("foo", "") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 6 { - t.Fatalf("bad index: %d", idx) - } - expect = []string{"foo", "foo/bar", "foo/bar/baz", "foo/bar/zip", - "foo/bar/zip/zam", "foo/bar/zip/zorp"} - if !reflect.DeepEqual(keys, expect) { - t.Fatalf("bad keys: %#v", keys) - } - - // Delete a key and make sure the index comes from the tombstone. - if err := s.KVSDelete(8, "foo/bar/baz"); err != nil { - t.Fatalf("err: %s", err) - } - idx, _, err = s.KVSListKeys("foo/bar/baz", "") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 8 { - t.Fatalf("bad index: %d", idx) - } - - // Set a different key to bump the index. - testSetKey(t, s, 9, "some/other/key", "") - - // Make sure the index still comes from the tombstone. - idx, _, err = s.KVSListKeys("foo/bar/baz", "") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 8 { - t.Fatalf("bad index: %d", idx) - } - - // Now reap the tombstones and make sure we get the latest index - // since there are no matching keys. - if err := s.ReapTombstones(8); err != nil { - t.Fatalf("err: %s", err) - } - idx, _, err = s.KVSListKeys("foo/bar/baz", "") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 9 { - t.Fatalf("bad index: %d", idx) - } - - // List all the keys to make sure the index is also correct. - idx, _, err = s.KVSListKeys("", "") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 9 { - t.Fatalf("bad index: %d", idx) - } -} - -func TestStateStore_KVSDelete(t *testing.T) { - s := testStateStore(t) - - // Create some KV pairs - testSetKey(t, s, 1, "foo", "foo") - testSetKey(t, s, 2, "foo/bar", "bar") - - // Call a delete on a specific key - if err := s.KVSDelete(3, "foo"); err != nil { - t.Fatalf("err: %s", err) - } - - // The entry was removed from the state store - tx := s.db.Txn(false) - defer tx.Abort() - e, err := tx.First("kvs", "id", "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if e != nil { - t.Fatalf("expected kvs entry to be deleted, got: %#v", e) - } - - // Try fetching the other keys to ensure they still exist - e, err = tx.First("kvs", "id", "foo/bar") - if err != nil { - t.Fatalf("err: %s", err) - } - if e == nil || string(e.(*structs.DirEntry).Value) != "bar" { - t.Fatalf("bad kvs entry: %#v", e) - } - - // Check that the index table was updated - if idx := s.maxIndex("kvs"); idx != 3 { - t.Fatalf("bad index: %d", idx) - } - - // Check that the tombstone was created and that prevents the index - // from sliding backwards. - idx, _, err := s.KVSList("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 3 { - t.Fatalf("bad index: %d", idx) - } - - // Now reap the tombstone and watch the index revert to the remaining - // foo/bar key's index. - if err := s.ReapTombstones(3); err != nil { - t.Fatalf("err: %s", err) - } - idx, _, err = s.KVSList("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 2 { - t.Fatalf("bad index: %d", idx) - } - - // Deleting a nonexistent key should be idempotent and not return an - // error - if err := s.KVSDelete(4, "foo"); err != nil { - t.Fatalf("err: %s", err) - } - if idx := s.maxIndex("kvs"); idx != 3 { - t.Fatalf("bad index: %d", idx) - } -} - -func TestStateStore_KVSDeleteCAS(t *testing.T) { - s := testStateStore(t) - - // Create some KV entries - testSetKey(t, s, 1, "foo", "foo") - testSetKey(t, s, 2, "bar", "bar") - testSetKey(t, s, 3, "baz", "baz") - - // Do a CAS delete with an index lower than the entry - ok, err := s.KVSDeleteCAS(4, 1, "bar") - if ok || err != nil { - t.Fatalf("expected (false, nil), got: (%v, %#v)", ok, err) - } - - // Check that the index is untouched and the entry - // has not been deleted. - idx, e, err := s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if e == nil { - t.Fatalf("expected a kvs entry, got nil") - } - if idx != 3 { - t.Fatalf("bad index: %d", idx) - } - - // Do another CAS delete, this time with the correct index - // which should cause the delete to take place. - ok, err = s.KVSDeleteCAS(4, 2, "bar") - if !ok || err != nil { - t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err) - } - - // Entry was deleted and index was updated - idx, e, err = s.KVSGet("bar") - if err != nil { - t.Fatalf("err: %s", err) - } - if e != nil { - t.Fatalf("entry should be deleted") - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } - - // Add another key to bump the index. - testSetKey(t, s, 5, "some/other/key", "baz") - - // Check that the tombstone was created and that prevents the index - // from sliding backwards. - idx, _, err = s.KVSList("bar") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } - - // Now reap the tombstone and watch the index move up to the table - // index since there are no matching keys. - if err := s.ReapTombstones(4); err != nil { - t.Fatalf("err: %s", err) - } - idx, _, err = s.KVSList("bar") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 5 { - t.Fatalf("bad index: %d", idx) - } - - // A delete on a nonexistent key should be idempotent and not return an - // error - ok, err = s.KVSDeleteCAS(6, 2, "bar") - if !ok || err != nil { - t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err) - } - if idx := s.maxIndex("kvs"); idx != 5 { - t.Fatalf("bad index: %d", idx) - } -} - -func TestStateStore_KVSSetCAS(t *testing.T) { - s := testStateStore(t) - - // Doing a CAS with ModifyIndex != 0 and no existing entry - // is a no-op. - entry := &structs.DirEntry{ - Key: "foo", - Value: []byte("foo"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 1, - ModifyIndex: 1, - }, - } - ok, err := s.KVSSetCAS(2, entry) - if ok || err != nil { - t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err) - } - - // Check that nothing was actually stored - tx := s.db.Txn(false) - if e, err := tx.First("kvs", "id", "foo"); e != nil || err != nil { - t.Fatalf("expected (nil, nil), got: (%#v, %#v)", e, err) - } - tx.Abort() - - // Index was not updated - if idx := s.maxIndex("kvs"); idx != 0 { - t.Fatalf("bad index: %d", idx) - } - - // Doing a CAS with a ModifyIndex of zero when no entry exists - // performs the set and saves into the state store. - entry = &structs.DirEntry{ - Key: "foo", - Value: []byte("foo"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 0, - ModifyIndex: 0, - }, - } - ok, err = s.KVSSetCAS(2, entry) - if !ok || err != nil { - t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err) - } - - // Entry was inserted - idx, entry, err := s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 { - t.Fatalf("bad entry: %#v", entry) - } - if idx != 2 { - t.Fatalf("bad index: %d", idx) - } - - // Doing a CAS with a ModifyIndex of zero when an entry exists does - // not do anything. - entry = &structs.DirEntry{ - Key: "foo", - Value: []byte("foo"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 0, - ModifyIndex: 0, - }, - } - ok, err = s.KVSSetCAS(3, entry) - if ok || err != nil { - t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err) - } - - // Doing a CAS with a ModifyIndex which does not match the current - // index does not do anything. - entry = &structs.DirEntry{ - Key: "foo", - Value: []byte("bar"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 3, - ModifyIndex: 3, - }, - } - ok, err = s.KVSSetCAS(3, entry) - if ok || err != nil { - t.Fatalf("expected (false, nil), got: (%#v, %#v)", ok, err) - } - - // Entry was not updated in the store - idx, entry, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if string(entry.Value) != "foo" || entry.CreateIndex != 2 || entry.ModifyIndex != 2 { - t.Fatalf("bad entry: %#v", entry) - } - if idx != 2 { - t.Fatalf("bad index: %d", idx) - } - - // Doing a CAS with the proper current index should make the - // modification. - entry = &structs.DirEntry{ - Key: "foo", - Value: []byte("bar"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 2, - ModifyIndex: 2, - }, - } - ok, err = s.KVSSetCAS(3, entry) - if !ok || err != nil { - t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err) - } - - // Entry was updated - idx, entry, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if string(entry.Value) != "bar" || entry.CreateIndex != 2 || entry.ModifyIndex != 3 { - t.Fatalf("bad entry: %#v", entry) - } - if idx != 3 { - t.Fatalf("bad index: %d", idx) - } - - // Attempt to update the session during the CAS. - entry = &structs.DirEntry{ - Key: "foo", - Value: []byte("zoo"), - Session: "nope", - RaftIndex: structs.RaftIndex{ - CreateIndex: 2, - ModifyIndex: 3, - }, - } - ok, err = s.KVSSetCAS(4, entry) - if !ok || err != nil { - t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err) - } - - // Entry was updated, but the session should have been ignored. - idx, entry, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if string(entry.Value) != "zoo" || entry.CreateIndex != 2 || entry.ModifyIndex != 4 || - entry.Session != "" { - t.Fatalf("bad entry: %#v", entry) - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } - - // Now lock it and try the update, which should keep the session. - testRegisterNode(t, s, 5, "node1") - session := testUUID() - if err := s.SessionCreate(6, &structs.Session{ID: session, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - entry = &structs.DirEntry{ - Key: "foo", - Value: []byte("locked"), - Session: session, - RaftIndex: structs.RaftIndex{ - CreateIndex: 2, - ModifyIndex: 4, - }, - } - ok, err = s.KVSLock(6, entry) - if !ok || err != nil { - t.Fatalf("didn't get the lock: %v %s", ok, err) - } - entry = &structs.DirEntry{ - Key: "foo", - Value: []byte("locked"), - RaftIndex: structs.RaftIndex{ - CreateIndex: 2, - ModifyIndex: 6, - }, - } - ok, err = s.KVSSetCAS(7, entry) - if !ok || err != nil { - t.Fatalf("expected (true, nil), got: (%#v, %#v)", ok, err) - } - - // Entry was updated, and the lock status should have stayed the same. - idx, entry, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if string(entry.Value) != "locked" || entry.CreateIndex != 2 || entry.ModifyIndex != 7 || - entry.Session != session { - t.Fatalf("bad entry: %#v", entry) - } - if idx != 7 { - t.Fatalf("bad index: %d", idx) - } -} - -func TestStateStore_KVSDeleteTree(t *testing.T) { - s := testStateStore(t) - - // Create kvs entries in the state store - testSetKey(t, s, 1, "foo/bar", "bar") - testSetKey(t, s, 2, "foo/bar/baz", "baz") - testSetKey(t, s, 3, "foo/bar/zip", "zip") - testSetKey(t, s, 4, "foo/zorp", "zorp") - - // Calling tree deletion which affects nothing does not - // modify the table index. - if err := s.KVSDeleteTree(9, "bar"); err != nil { - t.Fatalf("err: %s", err) - } - if idx := s.maxIndex("kvs"); idx != 4 { - t.Fatalf("bad index: %d", idx) - } - - // Call tree deletion with a nested prefix. - if err := s.KVSDeleteTree(5, "foo/bar"); err != nil { - t.Fatalf("err: %s", err) - } - - // Check that all the matching keys were deleted - tx := s.db.Txn(false) - defer tx.Abort() - - entries, err := tx.Get("kvs", "id") - if err != nil { - t.Fatalf("err: %s", err) - } - - num := 0 - for entry := entries.Next(); entry != nil; entry = entries.Next() { - if entry.(*structs.DirEntry).Key != "foo/zorp" { - t.Fatalf("unexpected kvs entry: %#v", entry) - } - num++ - } - - if num != 1 { - t.Fatalf("expected 1 key, got: %d", num) - } - - // Index should be updated if modifications are made - if idx := s.maxIndex("kvs"); idx != 5 { - t.Fatalf("bad index: %d", idx) - } - - // Check that the tombstones ware created and that prevents the index - // from sliding backwards. - idx, _, err := s.KVSList("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 5 { - t.Fatalf("bad index: %d", idx) - } - - // Now reap the tombstones and watch the index revert to the remaining - // foo/zorp key's index. - if err := s.ReapTombstones(5); err != nil { - t.Fatalf("err: %s", err) - } - idx, _, err = s.KVSList("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } -} - -func TestStateStore_KVSLockDelay(t *testing.T) { - s := testStateStore(t) - - // KVSLockDelay is exercised in the lock/unlock and session invalidation - // cases below, so we just do a basic check on a nonexistent key here. - expires := s.KVSLockDelay("/not/there") - if expires.After(time.Now()) { - t.Fatalf("bad: %v", expires) - } -} - -func TestStateStore_KVSLock(t *testing.T) { - s := testStateStore(t) - - // Lock with no session should fail. - ok, err := s.KVSLock(0, &structs.DirEntry{Key: "foo", Value: []byte("foo")}) - if ok || err == nil || !strings.Contains(err.Error(), "missing session") { - t.Fatalf("didn't detect missing session: %v %s", ok, err) - } - - // Now try with a bogus session. - ok, err = s.KVSLock(1, &structs.DirEntry{Key: "foo", Value: []byte("foo"), Session: testUUID()}) - if ok || err == nil || !strings.Contains(err.Error(), "invalid session") { - t.Fatalf("didn't detect invalid session: %v %s", ok, err) - } - - // Make a real session. - testRegisterNode(t, s, 2, "node1") - session1 := testUUID() - if err := s.SessionCreate(3, &structs.Session{ID: session1, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - - // Lock and make the key at the same time. - ok, err = s.KVSLock(4, &structs.DirEntry{Key: "foo", Value: []byte("foo"), Session: session1}) - if !ok || err != nil { - t.Fatalf("didn't get the lock: %v %s", ok, err) - } - - // Make sure the indexes got set properly. - idx, result, err := s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 4 || - string(result.Value) != "foo" { - t.Fatalf("bad entry: %#v", result) - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } - - // Re-locking with the same session should update the value and report - // success. - ok, err = s.KVSLock(5, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: session1}) - if !ok || err != nil { - t.Fatalf("didn't handle locking an already-locked key: %v %s", ok, err) - } - - // Make sure the indexes got set properly, note that the lock index - // won't go up since we didn't lock it again. - idx, result, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 5 || - string(result.Value) != "bar" { - t.Fatalf("bad entry: %#v", result) - } - if idx != 5 { - t.Fatalf("bad index: %d", idx) - } - - // Unlock and the re-lock. - ok, err = s.KVSUnlock(6, &structs.DirEntry{Key: "foo", Value: []byte("baz"), Session: session1}) - if !ok || err != nil { - t.Fatalf("didn't handle unlocking a locked key: %v %s", ok, err) - } - ok, err = s.KVSLock(7, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: session1}) - if !ok || err != nil { - t.Fatalf("didn't get the lock: %v %s", ok, err) - } - - // Make sure the indexes got set properly. - idx, result, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.LockIndex != 2 || result.CreateIndex != 4 || result.ModifyIndex != 7 || - string(result.Value) != "zoo" { - t.Fatalf("bad entry: %#v", result) - } - if idx != 7 { - t.Fatalf("bad index: %d", idx) - } - - // Lock an existing key. - testSetKey(t, s, 8, "bar", "bar") - ok, err = s.KVSLock(9, &structs.DirEntry{Key: "bar", Value: []byte("xxx"), Session: session1}) - if !ok || err != nil { - t.Fatalf("didn't get the lock: %v %s", ok, err) - } - - // Make sure the indexes got set properly. - idx, result, err = s.KVSGet("bar") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.LockIndex != 1 || result.CreateIndex != 8 || result.ModifyIndex != 9 || - string(result.Value) != "xxx" { - t.Fatalf("bad entry: %#v", result) - } - if idx != 9 { - t.Fatalf("bad index: %d", idx) - } - - // Attempting a re-lock with a different session should also fail. - session2 := testUUID() - if err := s.SessionCreate(10, &structs.Session{ID: session2, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - - // Re-locking should not return an error, but will report that it didn't - // get the lock. - ok, err = s.KVSLock(11, &structs.DirEntry{Key: "bar", Value: []byte("nope"), Session: session2}) - if ok || err != nil { - t.Fatalf("didn't handle locking an already-locked key: %v %s", ok, err) - } - - // Make sure the indexes didn't update. - idx, result, err = s.KVSGet("bar") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.LockIndex != 1 || result.CreateIndex != 8 || result.ModifyIndex != 9 || - string(result.Value) != "xxx" { - t.Fatalf("bad entry: %#v", result) - } - if idx != 9 { - t.Fatalf("bad index: %d", idx) - } -} - -func TestStateStore_KVSUnlock(t *testing.T) { - s := testStateStore(t) - - // Unlock with no session should fail. - ok, err := s.KVSUnlock(0, &structs.DirEntry{Key: "foo", Value: []byte("bar")}) - if ok || err == nil || !strings.Contains(err.Error(), "missing session") { - t.Fatalf("didn't detect missing session: %v %s", ok, err) - } - - // Make a real session. - testRegisterNode(t, s, 1, "node1") - session1 := testUUID() - if err := s.SessionCreate(2, &structs.Session{ID: session1, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - - // Unlock with a real session but no key should not return an error, but - // will report it didn't unlock anything. - ok, err = s.KVSUnlock(3, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: session1}) - if ok || err != nil { - t.Fatalf("didn't handle unlocking a missing key: %v %s", ok, err) - } - - // Make a key and unlock it, without it being locked. - testSetKey(t, s, 4, "foo", "bar") - ok, err = s.KVSUnlock(5, &structs.DirEntry{Key: "foo", Value: []byte("baz"), Session: session1}) - if ok || err != nil { - t.Fatalf("didn't handle unlocking a non-locked key: %v %s", ok, err) - } - - // Make sure the indexes didn't update. - idx, result, err := s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.LockIndex != 0 || result.CreateIndex != 4 || result.ModifyIndex != 4 || - string(result.Value) != "bar" { - t.Fatalf("bad entry: %#v", result) - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } - - // Lock it with the first session. - ok, err = s.KVSLock(6, &structs.DirEntry{Key: "foo", Value: []byte("bar"), Session: session1}) - if !ok || err != nil { - t.Fatalf("didn't get the lock: %v %s", ok, err) - } - - // Attempt an unlock with another session. - session2 := testUUID() - if err := s.SessionCreate(7, &structs.Session{ID: session2, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - ok, err = s.KVSUnlock(8, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: session2}) - if ok || err != nil { - t.Fatalf("didn't handle unlocking with the wrong session: %v %s", ok, err) - } - - // Make sure the indexes didn't update. - idx, result, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 6 || - string(result.Value) != "bar" { - t.Fatalf("bad entry: %#v", result) - } - if idx != 6 { - t.Fatalf("bad index: %d", idx) - } - - // Now do the unlock with the correct session. - ok, err = s.KVSUnlock(9, &structs.DirEntry{Key: "foo", Value: []byte("zoo"), Session: session1}) - if !ok || err != nil { - t.Fatalf("didn't handle unlocking with the correct session: %v %s", ok, err) - } - - // Make sure the indexes got set properly. - idx, result, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 9 || - string(result.Value) != "zoo" { - t.Fatalf("bad entry: %#v", result) - } - if idx != 9 { - t.Fatalf("bad index: %d", idx) - } - - // Unlocking again should fail and not change anything. - ok, err = s.KVSUnlock(10, &structs.DirEntry{Key: "foo", Value: []byte("nope"), Session: session1}) - if ok || err != nil { - t.Fatalf("didn't handle unlocking with the previous session: %v %s", ok, err) - } - - // Make sure the indexes didn't update. - idx, result, err = s.KVSGet("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if result.LockIndex != 1 || result.CreateIndex != 4 || result.ModifyIndex != 9 || - string(result.Value) != "zoo" { - t.Fatalf("bad entry: %#v", result) - } - if idx != 9 { - t.Fatalf("bad index: %d", idx) - } -} - -func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { - s := testStateStore(t) - - // Build up some entries to seed. - entries := structs.DirEntries{ - &structs.DirEntry{ - Key: "aaa", - Flags: 23, - Value: []byte("hello"), - }, - &structs.DirEntry{ - Key: "bar/a", - Value: []byte("one"), - }, - &structs.DirEntry{ - Key: "bar/b", - Value: []byte("two"), - }, - &structs.DirEntry{ - Key: "bar/c", - Value: []byte("three"), - }, - } - for i, entry := range entries { - if err := s.KVSSet(uint64(i+1), entry); err != nil { - t.Fatalf("err: %s", err) - } - } - - // Make a node and session so we can test a locked key. - testRegisterNode(t, s, 5, "node1") - session := testUUID() - if err := s.SessionCreate(6, &structs.Session{ID: session, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - entries[3].Session = session - if ok, err := s.KVSLock(7, entries[3]); !ok || err != nil { - t.Fatalf("didn't get the lock: %v %s", ok, err) - } - - // This is required for the compare later. - entries[3].LockIndex = 1 - - // Snapshot the keys. - snap := s.Snapshot() - defer snap.Close() - - // Alter the real state store. - if err := s.KVSSet(8, &structs.DirEntry{Key: "aaa", Value: []byte("nope")}); err != nil { - t.Fatalf("err: %s", err) - } - - // Verify the snapshot. - if idx := snap.LastIndex(); idx != 7 { - t.Fatalf("bad index: %d", idx) - } - iter, err := snap.KVs() - if err != nil { - t.Fatalf("err: %s", err) - } - var dump structs.DirEntries - for entry := iter.Next(); entry != nil; entry = iter.Next() { - dump = append(dump, entry.(*structs.DirEntry)) - } - if !reflect.DeepEqual(dump, entries) { - t.Fatalf("bad: %#v", dump) - } - - // Restore the values into a new state store. - func() { - s := testStateStore(t) - restore := s.Restore() - for _, entry := range dump { - if err := restore.KVS(entry); err != nil { - t.Fatalf("err: %s", err) - } - } - restore.Commit() - - // Read the restored keys back out and verify they match. - idx, res, err := s.KVSList("") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 7 { - t.Fatalf("bad index: %d", idx) - } - if !reflect.DeepEqual(res, entries) { - t.Fatalf("bad: %#v", res) - } - - // Check that the index was updated. - if idx := s.maxIndex("kvs"); idx != 7 { - t.Fatalf("bad index: %d", idx) - } - }() -} - -func TestStateStore_KVS_Watches(t *testing.T) { - s := testStateStore(t) - - // This is used when locking down below. - testRegisterNode(t, s, 1, "node1") - session := testUUID() - if err := s.SessionCreate(2, &structs.Session{ID: session, Node: "node1"}); err != nil { - t.Fatalf("err: %s", err) - } - - // An empty prefix watch should hit on all KVS ops, and some other - // prefix should not be affected ever. We also add a positive prefix - // match. - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSSet(1, &structs.DirEntry{Key: "aaa"}); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSSet(2, &structs.DirEntry{Key: "aaa"}); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - - // Restore just fires off a top-level watch, so we should get hits on - // any prefix, including ones for keys that aren't in there. - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("b"), func() { - verifyWatch(t, s.GetKVSWatch("/nope"), func() { - restore := s.Restore() - if err := restore.KVS(&structs.DirEntry{Key: "bbb"}); err != nil { - t.Fatalf("err: %s", err) - } - restore.Commit() - }) - }) - }) - - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSDelete(3, "aaa"); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if ok, err := s.KVSSetCAS(4, &structs.DirEntry{Key: "aaa"}); !ok || err != nil { - t.Fatalf("ok: %v err: %s", ok, err) - } - }) - }) - }) - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if ok, err := s.KVSLock(5, &structs.DirEntry{Key: "aaa", Session: session}); !ok || err != nil { - t.Fatalf("ok: %v err: %s", ok, err) - } - }) - }) - }) - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if ok, err := s.KVSUnlock(6, &structs.DirEntry{Key: "aaa", Session: session}); !ok || err != nil { - t.Fatalf("ok: %v err: %s", ok, err) - } - }) - }) - }) - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSDeleteTree(7, "aaa"); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - - // A delete tree operation at the top level will notify all the watches. - verifyWatch(t, s.GetKVSWatch(""), func() { - verifyWatch(t, s.GetKVSWatch("a"), func() { - verifyWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSDeleteTree(8, ""); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - - // Create a more interesting tree. - testSetKey(t, s, 9, "foo/bar", "bar") - testSetKey(t, s, 10, "foo/bar/baz", "baz") - testSetKey(t, s, 11, "foo/bar/zip", "zip") - testSetKey(t, s, 12, "foo/zorp", "zorp") - - // Deleting just the foo/bar key should not trigger watches on the - // children. - verifyWatch(t, s.GetKVSWatch("foo/bar"), func() { - verifyNoWatch(t, s.GetKVSWatch("foo/bar/baz"), func() { - verifyNoWatch(t, s.GetKVSWatch("foo/bar/zip"), func() { - if err := s.KVSDelete(13, "foo/bar"); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - - // But a delete tree from that point should notify the whole subtree, - // even for keys that don't exist. - verifyWatch(t, s.GetKVSWatch("foo/bar"), func() { - verifyWatch(t, s.GetKVSWatch("foo/bar/baz"), func() { - verifyWatch(t, s.GetKVSWatch("foo/bar/zip"), func() { - verifyWatch(t, s.GetKVSWatch("foo/bar/uh/nope"), func() { - if err := s.KVSDeleteTree(14, "foo/bar"); err != nil { - t.Fatalf("err: %s", err) - } - }) - }) - }) - }) -} - -func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { - s := testStateStore(t) - - // Insert a key and then delete it to create a tombstone. - testSetKey(t, s, 1, "foo/bar", "bar") - testSetKey(t, s, 2, "foo/bar/baz", "bar") - testSetKey(t, s, 3, "foo/bar/zoo", "bar") - if err := s.KVSDelete(4, "foo/bar"); err != nil { - t.Fatalf("err: %s", err) - } - - // Snapshot the Tombstones. - snap := s.Snapshot() - defer snap.Close() - - // Alter the real state store. - if err := s.ReapTombstones(4); err != nil { - t.Fatalf("err: %s", err) - } - idx, _, err := s.KVSList("foo/bar") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 3 { - t.Fatalf("bad index: %d", idx) - } - - // Verify the snapshot. - stones, err := snap.Tombstones() - if err != nil { - t.Fatalf("err: %s", err) - } - var dump []*Tombstone - for stone := stones.Next(); stone != nil; stone = stones.Next() { - dump = append(dump, stone.(*Tombstone)) - } - if len(dump) != 1 { - t.Fatalf("bad %#v", dump) - } - stone := dump[0] - if stone.Key != "foo/bar" || stone.Index != 4 { - t.Fatalf("bad: %#v", stone) - } - - // Restore the values into a new state store. - func() { - s := testStateStore(t) - restore := s.Restore() - for _, stone := range dump { - if err := restore.Tombstone(stone); err != nil { - t.Fatalf("err: %s", err) - } - } - restore.Commit() - - // See if the stone works properly in a list query. - idx, _, err := s.KVSList("foo/bar") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } - - // Make sure it reaps correctly. We should still get a 4 for - // the index here because it will be using the last index from - // the tombstone table. - if err := s.ReapTombstones(4); err != nil { - t.Fatalf("err: %s", err) - } - idx, _, err = s.KVSList("foo/bar") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 4 { - t.Fatalf("bad index: %d", idx) - } - - // But make sure the tombstone is actually gone. - snap := s.Snapshot() - defer snap.Close() - stones, err := snap.Tombstones() - if err != nil { - t.Fatalf("err: %s", err) - } - if stones.Next() != nil { - t.Fatalf("unexpected extra tombstones") - } - }() -} - func TestStateStore_SessionCreate_SessionGet(t *testing.T) { s := testStateStore(t)