mirror of https://github.com/status-im/consul.git
Merge pull request #2653 from hashicorp/f-split-state-store
Breaks up the state store into several files.
This commit is contained in:
commit
5efdec89ec
|
@ -0,0 +1,172 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ACLs is used to pull all the ACLs from the snapshot.
|
||||||
|
func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
|
||||||
|
iter, err := s.tx.Get("acls", "id")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return iter, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ACL is used when restoring from a snapshot. For general inserts, use ACLSet.
|
||||||
|
func (s *StateRestore) ACL(acl *structs.ACL) error {
|
||||||
|
if err := s.tx.Insert("acls", acl); err != nil {
|
||||||
|
return fmt.Errorf("failed restoring acl: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := indexUpdateMaxTxn(s.tx, acl.ModifyIndex, "acls"); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.watches.Arm("acls")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ACLSet is used to insert an ACL rule into the state store.
|
||||||
|
func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Call set on the ACL
|
||||||
|
if err := s.aclSetTxn(tx, idx, acl); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Commit()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// aclSetTxn is the inner method used to insert an ACL rule with the
|
||||||
|
// proper indexes into the state store.
|
||||||
|
func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) error {
|
||||||
|
// Check that the ID is set
|
||||||
|
if acl.ID == "" {
|
||||||
|
return ErrMissingACLID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for an existing ACL
|
||||||
|
existing, err := tx.First("acls", "id", acl.ID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed acl lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the indexes
|
||||||
|
if existing != nil {
|
||||||
|
acl.CreateIndex = existing.(*structs.ACL).CreateIndex
|
||||||
|
acl.ModifyIndex = idx
|
||||||
|
} else {
|
||||||
|
acl.CreateIndex = idx
|
||||||
|
acl.ModifyIndex = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert the ACL
|
||||||
|
if err := tx.Insert("acls", acl); err != nil {
|
||||||
|
return fmt.Errorf("failed inserting acl: %s", err)
|
||||||
|
}
|
||||||
|
if err := tx.Insert("index", &IndexEntry{"acls", idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Defer(func() { s.tableWatches["acls"].Notify() })
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ACLGet is used to look up an existing ACL by ID.
|
||||||
|
func (s *StateStore) ACLGet(aclID string) (uint64, *structs.ACL, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexTxn(tx, s.getWatchTables("ACLGet")...)
|
||||||
|
|
||||||
|
// Query for the existing ACL
|
||||||
|
acl, err := tx.First("acls", "id", aclID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
|
||||||
|
}
|
||||||
|
if acl != nil {
|
||||||
|
return idx, acl.(*structs.ACL), nil
|
||||||
|
}
|
||||||
|
return idx, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ACLList is used to list out all of the ACLs in the state store.
|
||||||
|
func (s *StateStore) ACLList() (uint64, structs.ACLs, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexTxn(tx, s.getWatchTables("ACLList")...)
|
||||||
|
|
||||||
|
// Return the ACLs.
|
||||||
|
acls, err := s.aclListTxn(tx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
|
||||||
|
}
|
||||||
|
return idx, acls, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// aclListTxn is used to list out all of the ACLs in the state store. This is a
|
||||||
|
// function vs. a method so it can be called from the snapshotter.
|
||||||
|
func (s *StateStore) aclListTxn(tx *memdb.Txn) (structs.ACLs, error) {
|
||||||
|
// Query all of the ACLs in the state store
|
||||||
|
acls, err := tx.Get("acls", "id")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed acl lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go over all of the ACLs and build the response
|
||||||
|
var result structs.ACLs
|
||||||
|
for acl := acls.Next(); acl != nil; acl = acls.Next() {
|
||||||
|
a := acl.(*structs.ACL)
|
||||||
|
result = append(result, a)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ACLDelete is used to remove an existing ACL from the state store. If
|
||||||
|
// the ACL does not exist this is a no-op and no error is returned.
|
||||||
|
func (s *StateStore) ACLDelete(idx uint64, aclID string) error {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Call the ACL delete
|
||||||
|
if err := s.aclDeleteTxn(tx, idx, aclID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Commit()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// aclDeleteTxn is used to delete an ACL from the state store within
|
||||||
|
// an existing transaction.
|
||||||
|
func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error {
|
||||||
|
// Look up the existing ACL
|
||||||
|
acl, err := tx.First("acls", "id", aclID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed acl lookup: %s", err)
|
||||||
|
}
|
||||||
|
if acl == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the ACL from the state store and update indexes
|
||||||
|
if err := tx.Delete("acls", acl); err != nil {
|
||||||
|
return fmt.Errorf("failed deleting acl: %s", err)
|
||||||
|
}
|
||||||
|
if err := tx.Insert("index", &IndexEntry{"acls", idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Defer(func() { s.tableWatches["acls"].Notify() })
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,298 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStateStore_ACLSet_ACLGet(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Querying ACLs with no results returns nil
|
||||||
|
idx, res, err := s.ACLGet("nope")
|
||||||
|
if idx != 0 || res != nil || err != nil {
|
||||||
|
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inserting an ACL with empty ID is disallowed
|
||||||
|
if err := s.ACLSet(1, &structs.ACL{}); err == nil {
|
||||||
|
t.Fatalf("expected %#v, got: %#v", ErrMissingACLID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index is not updated if nothing is saved
|
||||||
|
if idx := s.maxIndex("acls"); idx != 0 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inserting valid ACL works
|
||||||
|
acl := &structs.ACL{
|
||||||
|
ID: "acl1",
|
||||||
|
Name: "First ACL",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: "rules1",
|
||||||
|
}
|
||||||
|
if err := s.ACLSet(1, acl); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the index was updated
|
||||||
|
if idx := s.maxIndex("acls"); idx != 1 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the ACL again
|
||||||
|
idx, result, err := s.ACLGet("acl1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 1 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the ACL matches the result
|
||||||
|
expect := &structs.ACL{
|
||||||
|
ID: "acl1",
|
||||||
|
Name: "First ACL",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: "rules1",
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 1,
|
||||||
|
ModifyIndex: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(result, expect) {
|
||||||
|
t.Fatalf("bad: %#v", result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the ACL
|
||||||
|
acl = &structs.ACL{
|
||||||
|
ID: "acl1",
|
||||||
|
Name: "First ACL",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: "rules2",
|
||||||
|
}
|
||||||
|
if err := s.ACLSet(2, acl); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index was updated
|
||||||
|
if idx := s.maxIndex("acls"); idx != 2 {
|
||||||
|
t.Fatalf("bad: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ACL was updated and matches expected value
|
||||||
|
expect = &structs.ACL{
|
||||||
|
ID: "acl1",
|
||||||
|
Name: "First ACL",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: "rules2",
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 1,
|
||||||
|
ModifyIndex: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(acl, expect) {
|
||||||
|
t.Fatalf("bad: %#v", acl)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_ACLList(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Listing when no ACLs exist returns nil
|
||||||
|
idx, res, err := s.ACLList()
|
||||||
|
if idx != 0 || res != nil || err != nil {
|
||||||
|
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert some ACLs
|
||||||
|
acls := structs.ACLs{
|
||||||
|
&structs.ACL{
|
||||||
|
ID: "acl1",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: "rules1",
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 1,
|
||||||
|
ModifyIndex: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&structs.ACL{
|
||||||
|
ID: "acl2",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: "rules2",
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 2,
|
||||||
|
ModifyIndex: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, acl := range acls {
|
||||||
|
if err := s.ACLSet(acl.ModifyIndex, acl); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query the ACLs
|
||||||
|
idx, res, err = s.ACLList()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 2 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the result matches
|
||||||
|
if !reflect.DeepEqual(res, acls) {
|
||||||
|
t.Fatalf("bad: %#v", res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_ACLDelete(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Calling delete on an ACL which doesn't exist returns nil
|
||||||
|
if err := s.ACLDelete(1, "nope"); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index isn't updated if nothing is deleted
|
||||||
|
if idx := s.maxIndex("acls"); idx != 0 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert an ACL
|
||||||
|
if err := s.ACLSet(1, &structs.ACL{ID: "acl1"}); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the ACL and check that the index was updated
|
||||||
|
if err := s.ACLDelete(2, "acl1"); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("acls"); idx != 2 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Check that the ACL was really deleted
|
||||||
|
result, err := tx.First("acls", "id", "acl1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if result != nil {
|
||||||
|
t.Fatalf("expected nil, got: %#v", result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_ACL_Snapshot_Restore(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Insert some ACLs.
|
||||||
|
acls := structs.ACLs{
|
||||||
|
&structs.ACL{
|
||||||
|
ID: "acl1",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: "rules1",
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 1,
|
||||||
|
ModifyIndex: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&structs.ACL{
|
||||||
|
ID: "acl2",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: "rules2",
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 2,
|
||||||
|
ModifyIndex: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, acl := range acls {
|
||||||
|
if err := s.ACLSet(acl.ModifyIndex, acl); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot the ACLs.
|
||||||
|
snap := s.Snapshot()
|
||||||
|
defer snap.Close()
|
||||||
|
|
||||||
|
// Alter the real state store.
|
||||||
|
if err := s.ACLDelete(3, "acl1"); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the snapshot.
|
||||||
|
if idx := snap.LastIndex(); idx != 2 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
iter, err := snap.ACLs()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
var dump structs.ACLs
|
||||||
|
for acl := iter.Next(); acl != nil; acl = iter.Next() {
|
||||||
|
dump = append(dump, acl.(*structs.ACL))
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(dump, acls) {
|
||||||
|
t.Fatalf("bad: %#v", dump)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restore the values into a new state store.
|
||||||
|
func() {
|
||||||
|
s := testStateStore(t)
|
||||||
|
restore := s.Restore()
|
||||||
|
for _, acl := range dump {
|
||||||
|
if err := restore.ACL(acl); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
restore.Commit()
|
||||||
|
|
||||||
|
// Read the restored ACLs back out and verify that they match.
|
||||||
|
idx, res, err := s.ACLList()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 2 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(res, acls) {
|
||||||
|
t.Fatalf("bad: %#v", res)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the index was updated.
|
||||||
|
if idx := s.maxIndex("acls"); idx != 2 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_ACL_Watches(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Call functions that update the acls table and make sure a watch fires
|
||||||
|
// each time.
|
||||||
|
verifyWatch(t, s.getTableWatch("acls"), func() {
|
||||||
|
if err := s.ACLSet(1, &structs.ACL{ID: "acl1"}); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
verifyWatch(t, s.getTableWatch("acls"), func() {
|
||||||
|
if err := s.ACLDelete(2, "acl1"); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
verifyWatch(t, s.getTableWatch("acls"), func() {
|
||||||
|
restore := s.Restore()
|
||||||
|
if err := restore.ACL(&structs.ACL{ID: "acl1"}); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
restore.Commit()
|
||||||
|
})
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,117 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
"github.com/hashicorp/serf/coordinate"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Coordinates is used to pull all the coordinates from the snapshot.
|
||||||
|
func (s *StateSnapshot) Coordinates() (memdb.ResultIterator, error) {
|
||||||
|
iter, err := s.tx.Get("coordinates", "id")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return iter, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Coordinates is used when restoring from a snapshot. For general inserts, use
|
||||||
|
// CoordinateBatchUpdate. We do less vetting of the updates here because they
|
||||||
|
// already got checked on the way in during a batch update.
|
||||||
|
func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) error {
|
||||||
|
for _, update := range updates {
|
||||||
|
if err := s.tx.Insert("coordinates", update); err != nil {
|
||||||
|
return fmt.Errorf("failed restoring coordinate: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := indexUpdateMaxTxn(s.tx, idx, "coordinates"); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.watches.Arm("coordinates")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CoordinateGetRaw queries for the coordinate of the given node. This is an
|
||||||
|
// unusual state store method because it just returns the raw coordinate or
|
||||||
|
// nil, none of the Raft or node information is returned. This hits the 90%
|
||||||
|
// internal-to-Consul use case for this data, and this isn't exposed via an
|
||||||
|
// endpoint, so it doesn't matter that the Raft info isn't available.
|
||||||
|
func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Pull the full coordinate entry.
|
||||||
|
coord, err := tx.First("coordinates", "id", node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed coordinate lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pick out just the raw coordinate.
|
||||||
|
if coord != nil {
|
||||||
|
return coord.(*structs.Coordinate).Coord, nil
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Coordinates queries for all nodes with coordinates.
|
||||||
|
func (s *StateStore) Coordinates() (uint64, structs.Coordinates, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexTxn(tx, s.getWatchTables("Coordinates")...)
|
||||||
|
|
||||||
|
// Pull all the coordinates.
|
||||||
|
coords, err := tx.Get("coordinates", "id")
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
|
||||||
|
}
|
||||||
|
var results structs.Coordinates
|
||||||
|
for coord := coords.Next(); coord != nil; coord = coords.Next() {
|
||||||
|
results = append(results, coord.(*structs.Coordinate))
|
||||||
|
}
|
||||||
|
return idx, results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CoordinateBatchUpdate processes a batch of coordinate updates and applies
|
||||||
|
// them in a single transaction.
|
||||||
|
func (s *StateStore) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Upsert the coordinates.
|
||||||
|
for _, update := range updates {
|
||||||
|
// Since the cleanup of coordinates is tied to deletion of
|
||||||
|
// nodes, we silently drop any updates for nodes that we don't
|
||||||
|
// know about. This might be possible during normal operation
|
||||||
|
// if we happen to get a coordinate update for a node that
|
||||||
|
// hasn't been able to add itself to the catalog yet. Since we
|
||||||
|
// don't carefully sequence this, and since it will fix itself
|
||||||
|
// on the next coordinate update from that node, we don't return
|
||||||
|
// an error or log anything.
|
||||||
|
node, err := tx.First("nodes", "id", update.Node)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed node lookup: %s", err)
|
||||||
|
}
|
||||||
|
if node == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Insert("coordinates", update); err != nil {
|
||||||
|
return fmt.Errorf("failed inserting coordinate: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the index.
|
||||||
|
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Defer(func() { s.tableWatches["coordinates"].Notify() })
|
||||||
|
tx.Commit()
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,298 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/serf/coordinate"
|
||||||
|
)
|
||||||
|
|
||||||
|
// generateRandomCoordinate creates a random coordinate. This mucks with the
|
||||||
|
// underlying structure directly, so it's not really useful for any particular
|
||||||
|
// position in the network, but it's a good payload to send through to make
|
||||||
|
// sure things come out the other side or get stored correctly.
|
||||||
|
func generateRandomCoordinate() *coordinate.Coordinate {
|
||||||
|
config := coordinate.DefaultConfig()
|
||||||
|
coord := coordinate.NewCoordinate(config)
|
||||||
|
for i := range coord.Vec {
|
||||||
|
coord.Vec[i] = rand.NormFloat64()
|
||||||
|
}
|
||||||
|
coord.Error = rand.NormFloat64()
|
||||||
|
coord.Adjustment = rand.NormFloat64()
|
||||||
|
return coord
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Coordinate_Updates(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Make sure the coordinates list starts out empty, and that a query for
|
||||||
|
// a raw coordinate for a nonexistent node doesn't do anything bad.
|
||||||
|
idx, coords, err := s.Coordinates()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 0 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if coords != nil {
|
||||||
|
t.Fatalf("bad: %#v", coords)
|
||||||
|
}
|
||||||
|
coord, err := s.CoordinateGetRaw("nope")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if coord != nil {
|
||||||
|
t.Fatalf("bad: %#v", coord)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make an update for nodes that don't exist and make sure they get
|
||||||
|
// ignored.
|
||||||
|
updates := structs.Coordinates{
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node1",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
},
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node2",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := s.CoordinateBatchUpdate(1, updates); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should still be empty, though applying an empty batch does bump
|
||||||
|
// the table index.
|
||||||
|
idx, coords, err = s.Coordinates()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 1 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if coords != nil {
|
||||||
|
t.Fatalf("bad: %#v", coords)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the nodes then do the update again.
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
testRegisterNode(t, s, 2, "node2")
|
||||||
|
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should go through now.
|
||||||
|
idx, coords, err = s.Coordinates()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 3 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(coords, updates) {
|
||||||
|
t.Fatalf("bad: %#v", coords)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also verify the raw coordinate interface.
|
||||||
|
for _, update := range updates {
|
||||||
|
coord, err := s.CoordinateGetRaw(update.Node)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(coord, update.Coord) {
|
||||||
|
t.Fatalf("bad: %#v", coord)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the coordinate for one of the nodes.
|
||||||
|
updates[1].Coord = generateRandomCoordinate()
|
||||||
|
if err := s.CoordinateBatchUpdate(4, updates); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify it got applied.
|
||||||
|
idx, coords, err = s.Coordinates()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 4 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(coords, updates) {
|
||||||
|
t.Fatalf("bad: %#v", coords)
|
||||||
|
}
|
||||||
|
|
||||||
|
// And check the raw coordinate version of the same thing.
|
||||||
|
for _, update := range updates {
|
||||||
|
coord, err := s.CoordinateGetRaw(update.Node)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(coord, update.Coord) {
|
||||||
|
t.Fatalf("bad: %#v", coord)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Coordinate_Cleanup(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Register a node and update its coordinate.
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
updates := structs.Coordinates{
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node1",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure it's in there.
|
||||||
|
coord, err := s.CoordinateGetRaw("node1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(coord, updates[0].Coord) {
|
||||||
|
t.Fatalf("bad: %#v", coord)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now delete the node.
|
||||||
|
if err := s.DeleteNode(3, "node1"); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the coordinate is gone.
|
||||||
|
coord, err = s.CoordinateGetRaw("node1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if coord != nil {
|
||||||
|
t.Fatalf("bad: %#v", coord)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the index got updated.
|
||||||
|
idx, coords, err := s.Coordinates()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 3 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if coords != nil {
|
||||||
|
t.Fatalf("bad: %#v", coords)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Register two nodes and update their coordinates.
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
testRegisterNode(t, s, 2, "node2")
|
||||||
|
updates := structs.Coordinates{
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node1",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
},
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node2",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot the coordinates.
|
||||||
|
snap := s.Snapshot()
|
||||||
|
defer snap.Close()
|
||||||
|
|
||||||
|
// Alter the real state store.
|
||||||
|
trash := structs.Coordinates{
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node1",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
},
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node2",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := s.CoordinateBatchUpdate(4, trash); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the snapshot.
|
||||||
|
if idx := snap.LastIndex(); idx != 3 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
iter, err := snap.Coordinates()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
var dump structs.Coordinates
|
||||||
|
for coord := iter.Next(); coord != nil; coord = iter.Next() {
|
||||||
|
dump = append(dump, coord.(*structs.Coordinate))
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(dump, updates) {
|
||||||
|
t.Fatalf("bad: %#v", dump)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restore the values into a new state store.
|
||||||
|
func() {
|
||||||
|
s := testStateStore(t)
|
||||||
|
restore := s.Restore()
|
||||||
|
if err := restore.Coordinates(5, dump); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
restore.Commit()
|
||||||
|
|
||||||
|
// Read the restored coordinates back out and verify that they match.
|
||||||
|
idx, res, err := s.Coordinates()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 5 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(res, updates) {
|
||||||
|
t.Fatalf("bad: %#v", res)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the index was updated (note that it got passed
|
||||||
|
// in during the restore).
|
||||||
|
if idx := s.maxIndex("coordinates"); idx != 5 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Coordinate_Watches(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
|
||||||
|
// Call functions that update the coordinates table and make sure a watch fires
|
||||||
|
// each time.
|
||||||
|
verifyWatch(t, s.getTableWatch("coordinates"), func() {
|
||||||
|
updates := structs.Coordinates{
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node1",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
verifyWatch(t, s.getTableWatch("coordinates"), func() {
|
||||||
|
if err := s.DeleteNode(3, "node1"); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,345 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Sessions is used to pull the full list of sessions for use during snapshots.
|
||||||
|
func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) {
|
||||||
|
iter, err := s.tx.Get("sessions", "id")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return iter, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Session is used when restoring from a snapshot. For general inserts, use
|
||||||
|
// SessionCreate.
|
||||||
|
func (s *StateRestore) Session(sess *structs.Session) error {
|
||||||
|
// Insert the session.
|
||||||
|
if err := s.tx.Insert("sessions", sess); err != nil {
|
||||||
|
return fmt.Errorf("failed inserting session: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert the check mappings.
|
||||||
|
for _, checkID := range sess.Checks {
|
||||||
|
mapping := &sessionCheck{
|
||||||
|
Node: sess.Node,
|
||||||
|
CheckID: checkID,
|
||||||
|
Session: sess.ID,
|
||||||
|
}
|
||||||
|
if err := s.tx.Insert("session_checks", mapping); err != nil {
|
||||||
|
return fmt.Errorf("failed inserting session check mapping: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the index.
|
||||||
|
if err := indexUpdateMaxTxn(s.tx, sess.ModifyIndex, "sessions"); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.watches.Arm("sessions")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SessionCreate is used to register a new session in the state store.
|
||||||
|
func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// This code is technically able to (incorrectly) update an existing
|
||||||
|
// session but we never do that in practice. The upstream endpoint code
|
||||||
|
// always adds a unique ID when doing a create operation so we never hit
|
||||||
|
// an existing session again. It isn't worth the overhead to verify
|
||||||
|
// that here, but it's worth noting that we should never do this in the
|
||||||
|
// future.
|
||||||
|
|
||||||
|
// Call the session creation
|
||||||
|
if err := s.sessionCreateTxn(tx, idx, sess); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Commit()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sessionCreateTxn is the inner method used for creating session entries in
|
||||||
|
// an open transaction. Any health checks registered with the session will be
|
||||||
|
// checked for failing status. Returns any error encountered.
|
||||||
|
func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.Session) error {
|
||||||
|
// Check that we have a session ID
|
||||||
|
if sess.ID == "" {
|
||||||
|
return ErrMissingSessionID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the session behavior is valid
|
||||||
|
switch sess.Behavior {
|
||||||
|
case "":
|
||||||
|
// Release by default to preserve backwards compatibility
|
||||||
|
sess.Behavior = structs.SessionKeysRelease
|
||||||
|
case structs.SessionKeysRelease:
|
||||||
|
case structs.SessionKeysDelete:
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("Invalid session behavior: %s", sess.Behavior)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assign the indexes. ModifyIndex likely will not be used but
|
||||||
|
// we set it here anyways for sanity.
|
||||||
|
sess.CreateIndex = idx
|
||||||
|
sess.ModifyIndex = idx
|
||||||
|
|
||||||
|
// Check that the node exists
|
||||||
|
node, err := tx.First("nodes", "id", sess.Node)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed node lookup: %s", err)
|
||||||
|
}
|
||||||
|
if node == nil {
|
||||||
|
return ErrMissingNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go over the session checks and ensure they exist.
|
||||||
|
for _, checkID := range sess.Checks {
|
||||||
|
check, err := tx.First("checks", "id", sess.Node, string(checkID))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed check lookup: %s", err)
|
||||||
|
}
|
||||||
|
if check == nil {
|
||||||
|
return fmt.Errorf("Missing check '%s' registration", checkID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the check is not in critical state
|
||||||
|
status := check.(*structs.HealthCheck).Status
|
||||||
|
if status == structs.HealthCritical {
|
||||||
|
return fmt.Errorf("Check '%s' is in %s state", checkID, status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert the session
|
||||||
|
if err := tx.Insert("sessions", sess); err != nil {
|
||||||
|
return fmt.Errorf("failed inserting session: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert the check mappings
|
||||||
|
for _, checkID := range sess.Checks {
|
||||||
|
mapping := &sessionCheck{
|
||||||
|
Node: sess.Node,
|
||||||
|
CheckID: checkID,
|
||||||
|
Session: sess.ID,
|
||||||
|
}
|
||||||
|
if err := tx.Insert("session_checks", mapping); err != nil {
|
||||||
|
return fmt.Errorf("failed inserting session check mapping: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the index
|
||||||
|
if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Defer(func() { s.tableWatches["sessions"].Notify() })
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SessionGet is used to retrieve an active session from the state store.
|
||||||
|
func (s *StateStore) SessionGet(sessionID string) (uint64, *structs.Session, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexTxn(tx, s.getWatchTables("SessionGet")...)
|
||||||
|
|
||||||
|
// Look up the session by its ID
|
||||||
|
session, err := tx.First("sessions", "id", sessionID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
|
||||||
|
}
|
||||||
|
if session != nil {
|
||||||
|
return idx, session.(*structs.Session), nil
|
||||||
|
}
|
||||||
|
return idx, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SessionList returns a slice containing all of the active sessions.
|
||||||
|
func (s *StateStore) SessionList() (uint64, structs.Sessions, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexTxn(tx, s.getWatchTables("SessionList")...)
|
||||||
|
|
||||||
|
// Query all of the active sessions.
|
||||||
|
sessions, err := tx.Get("sessions", "id")
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go over the sessions and create a slice of them.
|
||||||
|
var result structs.Sessions
|
||||||
|
for session := sessions.Next(); session != nil; session = sessions.Next() {
|
||||||
|
result = append(result, session.(*structs.Session))
|
||||||
|
}
|
||||||
|
return idx, result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodeSessions returns a set of active sessions associated
|
||||||
|
// with the given node ID. The returned index is the highest
|
||||||
|
// index seen from the result set.
|
||||||
|
func (s *StateStore) NodeSessions(nodeID string) (uint64, structs.Sessions, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexTxn(tx, s.getWatchTables("NodeSessions")...)
|
||||||
|
|
||||||
|
// Get all of the sessions which belong to the node
|
||||||
|
sessions, err := tx.Get("sessions", "node", nodeID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go over all of the sessions and return them as a slice
|
||||||
|
var result structs.Sessions
|
||||||
|
for session := sessions.Next(); session != nil; session = sessions.Next() {
|
||||||
|
result = append(result, session.(*structs.Session))
|
||||||
|
}
|
||||||
|
return idx, result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SessionDestroy is used to remove an active session. This will
|
||||||
|
// implicitly invalidate the session and invoke the specified
|
||||||
|
// session destroy behavior.
|
||||||
|
func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Call the session deletion.
|
||||||
|
watches := NewDumbWatchManager(s.tableWatches)
|
||||||
|
if err := s.deleteSessionTxn(tx, idx, watches, sessionID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Defer(func() { watches.Notify() })
|
||||||
|
tx.Commit()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteSessionTxn is the inner method, which is used to do the actual
|
||||||
|
// session deletion and handle session invalidation, watch triggers, etc.
|
||||||
|
func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, sessionID string) error {
|
||||||
|
// Look up the session.
|
||||||
|
sess, err := tx.First("sessions", "id", sessionID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed session lookup: %s", err)
|
||||||
|
}
|
||||||
|
if sess == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the session and write the new index.
|
||||||
|
if err := tx.Delete("sessions", sess); err != nil {
|
||||||
|
return fmt.Errorf("failed deleting session: %s", err)
|
||||||
|
}
|
||||||
|
if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enforce the max lock delay.
|
||||||
|
session := sess.(*structs.Session)
|
||||||
|
delay := session.LockDelay
|
||||||
|
if delay > structs.MaxLockDelay {
|
||||||
|
delay = structs.MaxLockDelay
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snag the current now time so that all the expirations get calculated
|
||||||
|
// the same way.
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Get an iterator over all of the keys with the given session.
|
||||||
|
entries, err := tx.Get("kvs", "session", sessionID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed kvs lookup: %s", err)
|
||||||
|
}
|
||||||
|
var kvs []interface{}
|
||||||
|
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||||
|
kvs = append(kvs, entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invalidate any held locks.
|
||||||
|
switch session.Behavior {
|
||||||
|
case structs.SessionKeysRelease:
|
||||||
|
for _, obj := range kvs {
|
||||||
|
// Note that we clone here since we are modifying the
|
||||||
|
// returned object and want to make sure our set op
|
||||||
|
// respects the transaction we are in.
|
||||||
|
e := obj.(*structs.DirEntry).Clone()
|
||||||
|
e.Session = ""
|
||||||
|
if err := s.kvsSetTxn(tx, idx, e, true); err != nil {
|
||||||
|
return fmt.Errorf("failed kvs update: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply the lock delay if present.
|
||||||
|
if delay > 0 {
|
||||||
|
s.lockDelay.SetExpiration(e.Key, now, delay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case structs.SessionKeysDelete:
|
||||||
|
for _, obj := range kvs {
|
||||||
|
e := obj.(*structs.DirEntry)
|
||||||
|
if err := s.kvsDeleteTxn(tx, idx, e.Key); err != nil {
|
||||||
|
return fmt.Errorf("failed kvs delete: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply the lock delay if present.
|
||||||
|
if delay > 0 {
|
||||||
|
s.lockDelay.SetExpiration(e.Key, now, delay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown session behavior %#v", session.Behavior)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete any check mappings.
|
||||||
|
mappings, err := tx.Get("session_checks", "session", sessionID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed session checks lookup: %s", err)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
var objs []interface{}
|
||||||
|
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
||||||
|
objs = append(objs, mapping)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the delete in a separate loop so we don't trash the iterator.
|
||||||
|
for _, obj := range objs {
|
||||||
|
if err := tx.Delete("session_checks", obj); err != nil {
|
||||||
|
return fmt.Errorf("failed deleting session check: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete any prepared queries.
|
||||||
|
queries, err := tx.Get("prepared-queries", "session", sessionID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed prepared query lookup: %s", err)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
var ids []string
|
||||||
|
for wrapped := queries.Next(); wrapped != nil; wrapped = queries.Next() {
|
||||||
|
ids = append(ids, toPreparedQuery(wrapped).ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the delete in a separate loop so we don't trash the iterator.
|
||||||
|
for _, id := range ids {
|
||||||
|
if err := s.preparedQueryDeleteTxn(tx, idx, watches, id); err != nil {
|
||||||
|
return fmt.Errorf("failed prepared query delete: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
watches.Arm("sessions")
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,911 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStateStore_SessionCreate_SessionGet(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// SessionGet returns nil if the session doesn't exist
|
||||||
|
idx, session, err := s.SessionGet(testUUID())
|
||||||
|
if session != nil || err != nil {
|
||||||
|
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", session, err)
|
||||||
|
}
|
||||||
|
if idx != 0 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Registering without a session ID is disallowed
|
||||||
|
err = s.SessionCreate(1, &structs.Session{})
|
||||||
|
if err != ErrMissingSessionID {
|
||||||
|
t.Fatalf("expected %#v, got: %#v", ErrMissingSessionID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invalid session behavior throws error
|
||||||
|
sess := &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Behavior: "nope",
|
||||||
|
}
|
||||||
|
err = s.SessionCreate(1, sess)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "session behavior") {
|
||||||
|
t.Fatalf("expected session behavior error, got: %#v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Registering with an unknown node is disallowed
|
||||||
|
sess = &structs.Session{ID: testUUID()}
|
||||||
|
if err := s.SessionCreate(1, sess); err != ErrMissingNode {
|
||||||
|
t.Fatalf("expected %#v, got: %#v", ErrMissingNode, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// None of the errored operations modified the index
|
||||||
|
if idx := s.maxIndex("sessions"); idx != 0 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Valid session is able to register
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
sess = &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node1",
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(2, sess); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx := s.maxIndex("sessions"); idx != 2 {
|
||||||
|
t.Fatalf("bad index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the session again
|
||||||
|
idx, session, err = s.SessionGet(sess.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 2 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the session looks correct and was assigned the
|
||||||
|
// proper default value for session behavior.
|
||||||
|
expect := &structs.Session{
|
||||||
|
ID: sess.ID,
|
||||||
|
Behavior: structs.SessionKeysRelease,
|
||||||
|
Node: "node1",
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 2,
|
||||||
|
ModifyIndex: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(expect, session) {
|
||||||
|
t.Fatalf("bad session: %#v", session)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Registering with a non-existent check is disallowed
|
||||||
|
sess = &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node1",
|
||||||
|
Checks: []types.CheckID{"check1"},
|
||||||
|
}
|
||||||
|
err = s.SessionCreate(3, sess)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "Missing check") {
|
||||||
|
t.Fatalf("expected missing check error, got: %#v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Registering with a critical check is disallowed
|
||||||
|
testRegisterCheck(t, s, 3, "node1", "", "check1", structs.HealthCritical)
|
||||||
|
err = s.SessionCreate(4, sess)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), structs.HealthCritical) {
|
||||||
|
t.Fatalf("expected critical state error, got: %#v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Registering with a healthy check succeeds
|
||||||
|
testRegisterCheck(t, s, 4, "node1", "", "check1", structs.HealthPassing)
|
||||||
|
if err := s.SessionCreate(5, sess); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register a session against two checks.
|
||||||
|
testRegisterCheck(t, s, 5, "node1", "", "check2", structs.HealthPassing)
|
||||||
|
sess2 := &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node1",
|
||||||
|
Checks: []types.CheckID{"check1", "check2"},
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(6, sess2); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Check mappings were inserted
|
||||||
|
{
|
||||||
|
check, err := tx.First("session_checks", "session", sess.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if check == nil {
|
||||||
|
t.Fatalf("missing session check")
|
||||||
|
}
|
||||||
|
expectCheck := &sessionCheck{
|
||||||
|
Node: "node1",
|
||||||
|
CheckID: "check1",
|
||||||
|
Session: sess.ID,
|
||||||
|
}
|
||||||
|
if actual := check.(*sessionCheck); !reflect.DeepEqual(actual, expectCheck) {
|
||||||
|
t.Fatalf("expected %#v, got: %#v", expectCheck, actual)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checks, err := tx.Get("session_checks", "session", sess2.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
for i, check := 0, checks.Next(); check != nil; i, check = i+1, checks.Next() {
|
||||||
|
expectCheck := &sessionCheck{
|
||||||
|
Node: "node1",
|
||||||
|
CheckID: types.CheckID(fmt.Sprintf("check%d", i+1)),
|
||||||
|
Session: sess2.ID,
|
||||||
|
}
|
||||||
|
if actual := check.(*sessionCheck); !reflect.DeepEqual(actual, expectCheck) {
|
||||||
|
t.Fatalf("expected %#v, got: %#v", expectCheck, actual)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pulling a nonexistent session gives the table index.
|
||||||
|
idx, session, err = s.SessionGet(testUUID())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if session != nil {
|
||||||
|
t.Fatalf("expected not to get a session: %v", session)
|
||||||
|
}
|
||||||
|
if idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TegstStateStore_SessionList(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Listing when no sessions exist returns nil
|
||||||
|
idx, res, err := s.SessionList()
|
||||||
|
if idx != 0 || res != nil || err != nil {
|
||||||
|
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register some nodes
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
testRegisterNode(t, s, 2, "node2")
|
||||||
|
testRegisterNode(t, s, 3, "node3")
|
||||||
|
|
||||||
|
// Create some sessions in the state store
|
||||||
|
sessions := structs.Sessions{
|
||||||
|
&structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node1",
|
||||||
|
Behavior: structs.SessionKeysDelete,
|
||||||
|
},
|
||||||
|
&structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node2",
|
||||||
|
Behavior: structs.SessionKeysRelease,
|
||||||
|
},
|
||||||
|
&structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node3",
|
||||||
|
Behavior: structs.SessionKeysDelete,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, session := range sessions {
|
||||||
|
if err := s.SessionCreate(uint64(4+i), session); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// List out all of the sessions
|
||||||
|
idx, sessionList, err := s.SessionList()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(sessionList, sessions) {
|
||||||
|
t.Fatalf("bad: %#v", sessions)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_NodeSessions(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Listing sessions with no results returns nil
|
||||||
|
idx, res, err := s.NodeSessions("node1")
|
||||||
|
if idx != 0 || res != nil || err != nil {
|
||||||
|
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the nodes
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
testRegisterNode(t, s, 2, "node2")
|
||||||
|
|
||||||
|
// Register some sessions with the nodes
|
||||||
|
sessions1 := structs.Sessions{
|
||||||
|
&structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node1",
|
||||||
|
},
|
||||||
|
&structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
sessions2 := []*structs.Session{
|
||||||
|
&structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node2",
|
||||||
|
},
|
||||||
|
&structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node2",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, sess := range append(sessions1, sessions2...) {
|
||||||
|
if err := s.SessionCreate(uint64(3+i), sess); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query all of the sessions associated with a specific
|
||||||
|
// node in the state store.
|
||||||
|
idx, res, err = s.NodeSessions("node1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if len(res) != len(sessions1) {
|
||||||
|
t.Fatalf("bad: %#v", res)
|
||||||
|
}
|
||||||
|
if idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
idx, res, err = s.NodeSessions("node2")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if len(res) != len(sessions2) {
|
||||||
|
t.Fatalf("bad: %#v", res)
|
||||||
|
}
|
||||||
|
if idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_SessionDestroy(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Session destroy is idempotent and returns no error
|
||||||
|
// if the session doesn't exist.
|
||||||
|
if err := s.SessionDestroy(1, testUUID()); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the index was not updated if nothing was destroyed.
|
||||||
|
if idx := s.maxIndex("sessions"); idx != 0 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register a node.
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
|
||||||
|
// Register a new session
|
||||||
|
sess := &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node1",
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(2, sess); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Destroy the session.
|
||||||
|
if err := s.SessionDestroy(3, sess.ID); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the index was updated
|
||||||
|
if idx := s.maxIndex("sessions"); idx != 3 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the session is really gone.
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
sessions, err := tx.Get("sessions", "id")
|
||||||
|
if err != nil || sessions.Next() != nil {
|
||||||
|
t.Fatalf("session should not exist")
|
||||||
|
}
|
||||||
|
tx.Abort()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Session_Snapshot_Restore(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Register some nodes and checks.
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
testRegisterNode(t, s, 2, "node2")
|
||||||
|
testRegisterNode(t, s, 3, "node3")
|
||||||
|
testRegisterCheck(t, s, 4, "node1", "", "check1", structs.HealthPassing)
|
||||||
|
|
||||||
|
// Create some sessions in the state store.
|
||||||
|
session1 := testUUID()
|
||||||
|
sessions := structs.Sessions{
|
||||||
|
&structs.Session{
|
||||||
|
ID: session1,
|
||||||
|
Node: "node1",
|
||||||
|
Behavior: structs.SessionKeysDelete,
|
||||||
|
Checks: []types.CheckID{"check1"},
|
||||||
|
},
|
||||||
|
&structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node2",
|
||||||
|
Behavior: structs.SessionKeysRelease,
|
||||||
|
LockDelay: 10 * time.Second,
|
||||||
|
},
|
||||||
|
&structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "node3",
|
||||||
|
Behavior: structs.SessionKeysDelete,
|
||||||
|
TTL: "1.5s",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, session := range sessions {
|
||||||
|
if err := s.SessionCreate(uint64(5+i), session); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot the sessions.
|
||||||
|
snap := s.Snapshot()
|
||||||
|
defer snap.Close()
|
||||||
|
|
||||||
|
// Alter the real state store.
|
||||||
|
if err := s.SessionDestroy(8, session1); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the snapshot.
|
||||||
|
if idx := snap.LastIndex(); idx != 7 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
iter, err := snap.Sessions()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
var dump structs.Sessions
|
||||||
|
for session := iter.Next(); session != nil; session = iter.Next() {
|
||||||
|
sess := session.(*structs.Session)
|
||||||
|
dump = append(dump, sess)
|
||||||
|
|
||||||
|
found := false
|
||||||
|
for i, _ := range sessions {
|
||||||
|
if sess.ID == sessions[i].ID {
|
||||||
|
if !reflect.DeepEqual(sess, sessions[i]) {
|
||||||
|
t.Fatalf("bad: %#v", sess)
|
||||||
|
}
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Fatalf("bad: %#v", sess)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restore the sessions into a new state store.
|
||||||
|
func() {
|
||||||
|
s := testStateStore(t)
|
||||||
|
restore := s.Restore()
|
||||||
|
for _, session := range dump {
|
||||||
|
if err := restore.Session(session); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
restore.Commit()
|
||||||
|
|
||||||
|
// Read the restored sessions back out and verify that they
|
||||||
|
// match.
|
||||||
|
idx, res, err := s.SessionList()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 7 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
for _, sess := range res {
|
||||||
|
found := false
|
||||||
|
for i, _ := range sessions {
|
||||||
|
if sess.ID == sessions[i].ID {
|
||||||
|
if !reflect.DeepEqual(sess, sessions[i]) {
|
||||||
|
t.Fatalf("bad: %#v", sess)
|
||||||
|
}
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Fatalf("bad: %#v", sess)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the index was updated.
|
||||||
|
if idx := s.maxIndex("sessions"); idx != 7 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manually verify that the session check mapping got restored.
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
check, err := tx.First("session_checks", "session", session1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if check == nil {
|
||||||
|
t.Fatalf("missing session check")
|
||||||
|
}
|
||||||
|
expectCheck := &sessionCheck{
|
||||||
|
Node: "node1",
|
||||||
|
CheckID: "check1",
|
||||||
|
Session: session1,
|
||||||
|
}
|
||||||
|
if actual := check.(*sessionCheck); !reflect.DeepEqual(actual, expectCheck) {
|
||||||
|
t.Fatalf("expected %#v, got: %#v", expectCheck, actual)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Session_Watches(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Register a test node.
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
|
||||||
|
// This just covers the basics. The session invalidation tests above
|
||||||
|
// cover the more nuanced multiple table watches.
|
||||||
|
session := testUUID()
|
||||||
|
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||||
|
sess := &structs.Session{
|
||||||
|
ID: session,
|
||||||
|
Node: "node1",
|
||||||
|
Behavior: structs.SessionKeysDelete,
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(2, sess); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||||
|
if err := s.SessionDestroy(3, session); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||||
|
restore := s.Restore()
|
||||||
|
sess := &structs.Session{
|
||||||
|
ID: session,
|
||||||
|
Node: "node1",
|
||||||
|
Behavior: structs.SessionKeysDelete,
|
||||||
|
}
|
||||||
|
if err := restore.Session(sess); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
restore.Commit()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Session_Invalidate_DeleteNode(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Set up our test environment.
|
||||||
|
if err := s.EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
session := &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "foo",
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(14, session); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the node and make sure the watches fire.
|
||||||
|
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||||
|
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||||
|
if err := s.DeleteNode(15, "foo"); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// Lookup by ID, should be nil.
|
||||||
|
idx, s2, err := s.SessionGet(session.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if s2 != nil {
|
||||||
|
t.Fatalf("session should be invalidated")
|
||||||
|
}
|
||||||
|
if idx != 15 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Session_Invalidate_DeleteService(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Set up our test environment.
|
||||||
|
if err := s.EnsureNode(11, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if err := s.EnsureService(12, "foo", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
check := &structs.HealthCheck{
|
||||||
|
Node: "foo",
|
||||||
|
CheckID: "api",
|
||||||
|
Name: "Can connect",
|
||||||
|
Status: structs.HealthPassing,
|
||||||
|
ServiceID: "api",
|
||||||
|
}
|
||||||
|
if err := s.EnsureCheck(13, check); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
session := &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "foo",
|
||||||
|
Checks: []types.CheckID{"api"},
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(14, session); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the service and make sure the watches fire.
|
||||||
|
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||||
|
verifyWatch(t, s.getTableWatch("services"), func() {
|
||||||
|
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||||
|
if err := s.DeleteService(15, "foo", "api"); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// Lookup by ID, should be nil.
|
||||||
|
idx, s2, err := s.SessionGet(session.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if s2 != nil {
|
||||||
|
t.Fatalf("session should be invalidated")
|
||||||
|
}
|
||||||
|
if idx != 15 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Session_Invalidate_Critical_Check(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Set up our test environment.
|
||||||
|
if err := s.EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
check := &structs.HealthCheck{
|
||||||
|
Node: "foo",
|
||||||
|
CheckID: "bar",
|
||||||
|
Status: structs.HealthPassing,
|
||||||
|
}
|
||||||
|
if err := s.EnsureCheck(13, check); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
session := &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "foo",
|
||||||
|
Checks: []types.CheckID{"bar"},
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(14, session); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invalidate the check and make sure the watches fire.
|
||||||
|
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||||
|
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||||
|
check.Status = structs.HealthCritical
|
||||||
|
if err := s.EnsureCheck(15, check); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// Lookup by ID, should be nil.
|
||||||
|
idx, s2, err := s.SessionGet(session.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if s2 != nil {
|
||||||
|
t.Fatalf("session should be invalidated")
|
||||||
|
}
|
||||||
|
if idx != 15 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Session_Invalidate_DeleteCheck(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Set up our test environment.
|
||||||
|
if err := s.EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
check := &structs.HealthCheck{
|
||||||
|
Node: "foo",
|
||||||
|
CheckID: "bar",
|
||||||
|
Status: structs.HealthPassing,
|
||||||
|
}
|
||||||
|
if err := s.EnsureCheck(13, check); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
session := &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "foo",
|
||||||
|
Checks: []types.CheckID{"bar"},
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(14, session); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the check and make sure the watches fire.
|
||||||
|
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||||
|
verifyWatch(t, s.getTableWatch("checks"), func() {
|
||||||
|
if err := s.DeleteCheck(15, "foo", "bar"); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// Lookup by ID, should be nil.
|
||||||
|
idx, s2, err := s.SessionGet(session.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if s2 != nil {
|
||||||
|
t.Fatalf("session should be invalidated")
|
||||||
|
}
|
||||||
|
if idx != 15 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manually make sure the session checks mapping is clear.
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
mapping, err := tx.First("session_checks", "session", session.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if mapping != nil {
|
||||||
|
t.Fatalf("unexpected session check")
|
||||||
|
}
|
||||||
|
tx.Abort()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Set up our test environment.
|
||||||
|
if err := s.EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
session := &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "foo",
|
||||||
|
LockDelay: 50 * time.Millisecond,
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(4, session); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock a key with the session.
|
||||||
|
d := &structs.DirEntry{
|
||||||
|
Key: "/foo",
|
||||||
|
Flags: 42,
|
||||||
|
Value: []byte("test"),
|
||||||
|
Session: session.ID,
|
||||||
|
}
|
||||||
|
ok, err := s.KVSLock(5, d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unexpected fail")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the node and make sure the watches fire.
|
||||||
|
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||||
|
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||||
|
verifyWatch(t, s.GetKVSWatch("/f"), func() {
|
||||||
|
if err := s.DeleteNode(6, "foo"); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// Lookup by ID, should be nil.
|
||||||
|
idx, s2, err := s.SessionGet(session.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if s2 != nil {
|
||||||
|
t.Fatalf("session should be invalidated")
|
||||||
|
}
|
||||||
|
if idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Key should be unlocked.
|
||||||
|
idx, d2, err := s.KVSGet("/foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if d2.ModifyIndex != 6 {
|
||||||
|
t.Fatalf("bad index: %v", d2.ModifyIndex)
|
||||||
|
}
|
||||||
|
if d2.LockIndex != 1 {
|
||||||
|
t.Fatalf("bad: %v", *d2)
|
||||||
|
}
|
||||||
|
if d2.Session != "" {
|
||||||
|
t.Fatalf("bad: %v", *d2)
|
||||||
|
}
|
||||||
|
if idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Key should have a lock delay.
|
||||||
|
expires := s.KVSLockDelay("/foo")
|
||||||
|
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
|
||||||
|
t.Fatalf("Bad: %v", expires)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Set up our test environment.
|
||||||
|
if err := s.EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
session := &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "foo",
|
||||||
|
LockDelay: 50 * time.Millisecond,
|
||||||
|
Behavior: structs.SessionKeysDelete,
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(4, session); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock a key with the session.
|
||||||
|
d := &structs.DirEntry{
|
||||||
|
Key: "/bar",
|
||||||
|
Flags: 42,
|
||||||
|
Value: []byte("test"),
|
||||||
|
Session: session.ID,
|
||||||
|
}
|
||||||
|
ok, err := s.KVSLock(5, d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unexpected fail")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the node and make sure the watches fire.
|
||||||
|
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||||
|
verifyWatch(t, s.getTableWatch("nodes"), func() {
|
||||||
|
verifyWatch(t, s.GetKVSWatch("/b"), func() {
|
||||||
|
if err := s.DeleteNode(6, "foo"); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// Lookup by ID, should be nil.
|
||||||
|
idx, s2, err := s.SessionGet(session.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if s2 != nil {
|
||||||
|
t.Fatalf("session should be invalidated")
|
||||||
|
}
|
||||||
|
if idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Key should be deleted.
|
||||||
|
idx, d2, err := s.KVSGet("/bar")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if d2 != nil {
|
||||||
|
t.Fatalf("unexpected deleted key")
|
||||||
|
}
|
||||||
|
if idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Key should have a lock delay.
|
||||||
|
expires := s.KVSLockDelay("/bar")
|
||||||
|
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
|
||||||
|
t.Fatalf("Bad: %v", expires)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Session_Invalidate_PreparedQuery_Delete(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Set up our test environment.
|
||||||
|
testRegisterNode(t, s, 1, "foo")
|
||||||
|
testRegisterService(t, s, 2, "foo", "redis")
|
||||||
|
session := &structs.Session{
|
||||||
|
ID: testUUID(),
|
||||||
|
Node: "foo",
|
||||||
|
}
|
||||||
|
if err := s.SessionCreate(3, session); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
query := &structs.PreparedQuery{
|
||||||
|
ID: testUUID(),
|
||||||
|
Session: session.ID,
|
||||||
|
Service: structs.ServiceQuery{
|
||||||
|
Service: "redis",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := s.PreparedQuerySet(4, query); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invalidate the session and make sure the watches fire.
|
||||||
|
verifyWatch(t, s.getTableWatch("sessions"), func() {
|
||||||
|
verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
|
||||||
|
if err := s.SessionDestroy(5, session.ID); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// Make sure the session is gone.
|
||||||
|
idx, s2, err := s.SessionGet(session.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if s2 != nil {
|
||||||
|
t.Fatalf("session should be invalidated")
|
||||||
|
}
|
||||||
|
if idx != 5 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the query is gone and the index is updated.
|
||||||
|
idx, q2, err := s.PreparedQueryGet(query.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 5 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if q2 != nil {
|
||||||
|
t.Fatalf("bad: %v", q2)
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue