Puts all restore operations into a single transaction and optimizes watches.

This commit is contained in:
James Phillips 2015-10-19 23:06:59 -07:00
parent 52c373bb65
commit f3a95bf9fe
3 changed files with 408 additions and 136 deletions

View File

@ -78,7 +78,7 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
switch msgType { switch msgType {
case structs.RegisterRequestType: case structs.RegisterRequestType:
return c.decodeRegister(buf[1:], log.Index) return c.applyRegister(buf[1:], log.Index)
case structs.DeregisterRequestType: case structs.DeregisterRequestType:
return c.applyDeregister(buf[1:], log.Index) return c.applyDeregister(buf[1:], log.Index)
case structs.KVSRequestType: case structs.KVSRequestType:
@ -99,18 +99,15 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
} }
} }
func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} { func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
var req structs.RegisterRequest var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
return c.applyRegister(&req, index)
}
func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} {
// Apply all updates in a single transaction // Apply all updates in a single transaction
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now()) if err := c.state.EnsureRegistration(index, &req); err != nil {
if err := c.state.EnsureRegistration(index, req); err != nil {
c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err) c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err)
return err return err
} }
@ -267,6 +264,10 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
} }
c.state = stateNew c.state = stateNew
// Set up a new restore transaction
restore := c.state.Restore()
defer restore.Abort()
// Create a decoder // Create a decoder
dec := codec.NewDecoder(old, msgpackHandle) dec := codec.NewDecoder(old, msgpackHandle)
@ -294,32 +295,16 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(&req); err != nil { if err := dec.Decode(&req); err != nil {
return err return err
} }
c.applyRegister(&req, header.LastIndex) if err := restore.Registration(header.LastIndex, &req); err != nil {
return err
}
case structs.KVSRequestType: case structs.KVSRequestType:
var req structs.DirEntry var req structs.DirEntry
if err := dec.Decode(&req); err != nil { if err := dec.Decode(&req); err != nil {
return err return err
} }
if err := c.state.KVSRestore(&req); err != nil { if err := restore.KVS(&req); err != nil {
return err
}
case structs.SessionRequestType:
var req structs.Session
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.SessionRestore(&req); err != nil {
return err
}
case structs.ACLRequestType:
var req structs.ACL
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.ACLRestore(&req); err != nil {
return err return err
} }
@ -336,7 +321,25 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
Key: req.Key, Key: req.Key,
Index: req.ModifyIndex, Index: req.ModifyIndex,
} }
if err := c.state.TombstoneRestore(stone); err != nil { if err := restore.Tombstone(stone); err != nil {
return err
}
case structs.SessionRequestType:
var req structs.Session
if err := dec.Decode(&req); err != nil {
return err
}
if err := restore.Session(&req); err != nil {
return err
}
case structs.ACLRequestType:
var req structs.ACL
if err := dec.Decode(&req); err != nil {
return err
}
if err := restore.ACL(&req); err != nil {
return err return err
} }
@ -345,6 +348,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
} }
} }
restore.Commit()
return nil return nil
} }

View File

@ -57,6 +57,14 @@ type StateSnapshot struct {
lastIndex uint64 lastIndex uint64
} }
// StateRestore is used to efficiently manage restoring a large amount of
// data to a state store.
type StateRestore struct {
store *StateStore
tx *memdb.Txn
watches *DumbWatchManager
}
// IndexEntry keeps a record of the last index per-table. // IndexEntry keeps a record of the last index per-table.
type IndexEntry struct { type IndexEntry struct {
Key string Key string
@ -188,6 +196,109 @@ func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
return iter, nil return iter, nil
} }
// Restore is used to efficiently manage restoring a large amount of data into
// the state store. It works by doing all the restores inside of a single
// transaction.
func (s *StateStore) Restore() *StateRestore {
tx := s.db.Txn(true)
watches := NewDumbWatchManager(s.tableWatches)
return &StateRestore{s, tx, watches}
}
// Abort abandons the changes made by a restore. This or Commit should always be
// called.
func (s *StateRestore) Abort() {
s.tx.Abort()
}
// Commit commits the changes made by a restore. This or Abort should always be
// called.
func (s *StateRestore) Commit() {
// Fire off a single KVS watch instead of a zillion prefix ones, and use
// a dumb watch manager to single-fire all the full table watches.
s.tx.Defer(func() { s.store.kvsWatch.Notify("", true) })
s.tx.Defer(func() { s.watches.Notify() })
s.tx.Commit()
}
// Registration is used to make sure a node, service, and check registration is
// performed within a single transaction to avoid race conditions on state
// updates.
func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) error {
if err := s.store.ensureRegistrationTxn(s.tx, idx, s.watches, req); err != nil {
return err
}
return nil
}
// KVS is used when restoring from a snapshot. Use KVSSet for general inserts.
func (s *StateRestore) KVS(entry *structs.DirEntry) error {
if err := s.tx.Insert("kvs", entry); err != nil {
return fmt.Errorf("failed inserting kvs entry: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
// We have a single top-level KVS watch trigger instead of doing
// tons of prefix watches.
return nil
}
// Tombstone is used when restoring from a snapshot. For general inserts, use
// Graveyard.InsertTxn.
func (s *StateRestore) Tombstone(stone *Tombstone) error {
if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil {
return fmt.Errorf("failed restoring tombstone: %s", err)
}
return nil
}
// Session is used when restoring from a snapshot. For general inserts, use
// SessionCreate.
func (s *StateRestore) Session(sess *structs.Session) error {
// Insert the session.
if err := s.tx.Insert("sessions", sess); err != nil {
return fmt.Errorf("failed inserting session: %s", err)
}
// Insert the check mappings.
for _, checkID := range sess.Checks {
mapping := &sessionCheck{
Node: sess.Node,
CheckID: checkID,
Session: sess.ID,
}
if err := s.tx.Insert("session_checks", mapping); err != nil {
return fmt.Errorf("failed inserting session check mapping: %s", err)
}
}
// Update the index.
if err := indexUpdateMaxTxn(s.tx, sess.ModifyIndex, "sessions"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("sessions")
return nil
}
// ACL is used when restoring from a snapshot. For general inserts, use ACLSet.
func (s *StateRestore) ACL(acl *structs.ACL) error {
if err := s.tx.Insert("acls", acl); err != nil {
return fmt.Errorf("failed restoring acl: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, acl.ModifyIndex, "acls"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("acls")
return nil
}
// maxIndex is a helper used to retrieve the highest known index // maxIndex is a helper used to retrieve the highest known index
// amongst a set of tables in the db. // amongst a set of tables in the db.
func (s *StateStore) maxIndex(tables ...string) uint64 { func (s *StateStore) maxIndex(tables ...string) uint64 {
@ -311,32 +422,46 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
watches := NewDumbWatchManager(s.tableWatches)
if err := s.ensureRegistrationTxn(tx, idx, watches, req); err != nil {
return err
}
tx.Defer(func() { watches.Notify() })
tx.Commit()
return nil
}
// ensureRegistrationTxn is used to make sure a node, service, and check
// registration is performed within a single transaction to avoid race
// conditions on state updates.
func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
req *structs.RegisterRequest) error {
// Add the node. // Add the node.
node := &structs.Node{Node: req.Node, Address: req.Address} node := &structs.Node{Node: req.Node, Address: req.Address}
if err := s.ensureNodeTxn(tx, idx, node); err != nil { if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
return fmt.Errorf("failed inserting node: %s", err) return fmt.Errorf("failed inserting node: %s", err)
} }
// Add the service, if any. // Add the service, if any.
if req.Service != nil { if req.Service != nil {
if err := s.ensureServiceTxn(tx, idx, req.Node, req.Service); err != nil { if err := s.ensureServiceTxn(tx, idx, watches, req.Node, req.Service); err != nil {
return fmt.Errorf("failed inserting service: %s", err) return fmt.Errorf("failed inserting service: %s", err)
} }
} }
// Add the checks, if any. // Add the checks, if any.
if req.Check != nil { if req.Check != nil {
if err := s.ensureCheckTxn(tx, idx, req.Check); err != nil { if err := s.ensureCheckTxn(tx, idx, watches, req.Check); err != nil {
return fmt.Errorf("failed inserting check: %s", err) return fmt.Errorf("failed inserting check: %s", err)
} }
} }
for _, check := range req.Checks { for _, check := range req.Checks {
if err := s.ensureCheckTxn(tx, idx, check); err != nil { if err := s.ensureCheckTxn(tx, idx, watches, check); err != nil {
return fmt.Errorf("failed inserting check: %s", err) return fmt.Errorf("failed inserting check: %s", err)
} }
} }
tx.Commit()
return nil return nil
} }
@ -346,10 +471,12 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
defer tx.Abort() defer tx.Abort()
// Call the node upsert // Call the node upsert
if err := s.ensureNodeTxn(tx, idx, node); err != nil { watches := NewDumbWatchManager(s.tableWatches)
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
return err return err
} }
tx.Defer(func() { watches.Notify() })
tx.Commit() tx.Commit()
return nil return nil
} }
@ -357,7 +484,8 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
// ensureNodeTxn is the inner function called to actually create a node // ensureNodeTxn is the inner function called to actually create a node
// registration or modify an existing one in the state store. It allows // registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn. // passing in a memdb transaction so it may be part of a larger txn.
func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error { func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
node *structs.Node) error {
// Check for an existing node // Check for an existing node
existing, err := tx.First("nodes", "id", node.Node) existing, err := tx.First("nodes", "id", node.Node)
if err != nil { if err != nil {
@ -381,7 +509,7 @@ func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
tx.Defer(func() { s.tableWatches["nodes"].Notify() }) watches.Arm("nodes")
return nil return nil
} }
@ -527,17 +655,20 @@ func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeSer
defer tx.Abort() defer tx.Abort()
// Call the service registration upsert // Call the service registration upsert
if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil { watches := NewDumbWatchManager(s.tableWatches)
if err := s.ensureServiceTxn(tx, idx, watches, node, svc); err != nil {
return err return err
} }
tx.Defer(func() { watches.Notify() })
tx.Commit() tx.Commit()
return nil return nil
} }
// ensureServiceTxn is used to upsert a service registration within an // ensureServiceTxn is used to upsert a service registration within an
// existing memdb transaction. // existing memdb transaction.
func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error { func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
node string, svc *structs.NodeService) error {
// Check for existing service // Check for existing service
existing, err := tx.First("services", "id", node, svc.ID) existing, err := tx.First("services", "id", node, svc.ID)
if err != nil { if err != nil {
@ -572,7 +703,7 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, sv
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
tx.Defer(func() { s.tableWatches["services"].Notify() }) watches.Arm("services")
return nil return nil
} }
@ -819,10 +950,12 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
defer tx.Abort() defer tx.Abort()
// Call the check registration // Call the check registration
if err := s.ensureCheckTxn(tx, idx, hc); err != nil { watches := NewDumbWatchManager(s.tableWatches)
if err := s.ensureCheckTxn(tx, idx, watches, hc); err != nil {
return err return err
} }
tx.Defer(func() { watches.Notify() })
tx.Commit() tx.Commit()
return nil return nil
} }
@ -830,7 +963,8 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
// ensureCheckTransaction is used as the inner method to handle inserting // ensureCheckTransaction is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting // a health check into the state store. It ensures safety against inserting
// checks with no matching node or service. // checks with no matching node or service.
func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error { func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
hc *structs.HealthCheck) error {
// Check if we have an existing health check // Check if we have an existing health check
existing, err := tx.First("checks", "id", hc.Node, hc.CheckID) existing, err := tx.First("checks", "id", hc.Node, hc.CheckID)
if err != nil { if err != nil {
@ -906,7 +1040,7 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.Healt
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
tx.Defer(func() { s.tableWatches["checks"].Notify() }) watches.Arm("checks")
return nil return nil
} }
@ -1680,39 +1814,6 @@ func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error
return true, nil return true, nil
} }
// KVSRestore is used when restoring from a snapshot. Use KVSSet for general
// inserts.
func (s *StateStore) KVSRestore(entry *structs.DirEntry) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := tx.Insert("kvs", entry); err != nil {
return fmt.Errorf("failed inserting kvs entry: %s", err)
}
if err := indexUpdateMaxTxn(tx, entry.ModifyIndex, "kvs"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.kvsWatch.Notify(entry.Key, false) })
tx.Commit()
return nil
}
// Tombstone is used when restoring from a snapshot. For general inserts, use
// Graveyard.InsertTxn.
func (s *StateStore) TombstoneRestore(stone *Tombstone) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := s.kvsGraveyard.RestoreTxn(tx, stone); err != nil {
return fmt.Errorf("failed restoring tombstone: %s", err)
}
tx.Commit()
return nil
}
// SessionCreate is used to register a new session in the state store. // SessionCreate is used to register a new session in the state store.
func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error { func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
tx := s.db.Txn(true) tx := s.db.Txn(true)
@ -1990,39 +2091,6 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
return nil return nil
} }
// SessionRestore is used when restoring from a snapshot. For general inserts,
// use SessionCreate.
func (s *StateStore) SessionRestore(sess *structs.Session) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Insert the session.
if err := tx.Insert("sessions", sess); err != nil {
return fmt.Errorf("failed inserting session: %s", err)
}
// Insert the check mappings.
for _, checkID := range sess.Checks {
mapping := &sessionCheck{
Node: sess.Node,
CheckID: checkID,
Session: sess.ID,
}
if err := tx.Insert("session_checks", mapping); err != nil {
return fmt.Errorf("failed inserting session check mapping: %s", err)
}
}
// Update the index.
if err := indexUpdateMaxTxn(tx, sess.ModifyIndex, "sessions"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["sessions"].Notify() })
tx.Commit()
return nil
}
// ACLSet is used to insert an ACL rule into the state store. // ACLSet is used to insert an ACL rule into the state store.
func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error { func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error {
tx := s.db.Txn(true) tx := s.db.Txn(true)
@ -2163,22 +2231,3 @@ func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error
tx.Defer(func() { s.tableWatches["acls"].Notify() }) tx.Defer(func() { s.tableWatches["acls"].Notify() })
return nil return nil
} }
// ACLRestore is used when restoring from a snapshot. For general inserts, use
// ACLSet.
func (s *StateStore) ACLRestore(acl *structs.ACL) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := tx.Insert("acls", acl); err != nil {
return fmt.Errorf("failed restoring acl: %s", err)
}
if err := indexUpdateMaxTxn(tx, acl.ModifyIndex, "acls"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["acls"].Notify() })
tx.Commit()
return nil
}

View File

@ -121,6 +121,36 @@ func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) {
} }
} }
func TestStateStore_Restore_Abort(t *testing.T) {
s := testStateStore(t)
// The detailed restore functions are tested below, this just checks
// that abort works.
restore := s.Restore()
entry := &structs.DirEntry{
Key: "foo",
Value: []byte("bar"),
RaftIndex: structs.RaftIndex{
ModifyIndex: 5,
},
}
if err := restore.KVS(entry); err != nil {
t.Fatalf("err: %s", err)
}
restore.Abort()
idx, entries, err := s.KVSList("")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if len(entries) != 0 {
t.Fatalf("bad: %#v", entries)
}
}
func TestStateStore_maxIndex(t *testing.T) { func TestStateStore_maxIndex(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
@ -487,6 +517,144 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
}() }()
} }
func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
s := testStateStore(t)
// Start with just a node.
req := &structs.RegisterRequest{
Node: "node1",
Address: "1.2.3.4",
}
restore := s.Restore()
if err := restore.Registration(1, req); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
// Retrieve the node and verify its contents.
verifyNode := func(created, modified uint64) {
_, out, err := s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if out.Node != "node1" || out.Address != "1.2.3.4" ||
out.CreateIndex != created || out.ModifyIndex != modified {
t.Fatalf("bad node returned: %#v", out)
}
}
verifyNode(1, 1)
// Add in a service definition.
req.Service = &structs.NodeService{
ID: "redis1",
Service: "redis",
Address: "1.1.1.1",
Port: 8080,
}
restore = s.Restore()
if err := restore.Registration(2, req); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
// Verify that the service got registered.
verifyService := func(created, modified uint64) {
idx, out, err := s.NodeServices("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != modified {
t.Fatalf("bad index: %d", idx)
}
if len(out.Services) != 1 {
t.Fatalf("bad: %#v", out.Services)
}
s := out.Services["redis1"]
if s.ID != "redis1" || s.Service != "redis" ||
s.Address != "1.1.1.1" || s.Port != 8080 ||
s.CreateIndex != created || s.ModifyIndex != modified {
t.Fatalf("bad service returned: %#v", s)
}
}
verifyNode(1, 2)
verifyService(2, 2)
// Add in a top-level check.
req.Check = &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "check",
}
restore = s.Restore()
if err := restore.Registration(3, req); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
// Verify that the check got registered.
verifyCheck := func(created, modified uint64) {
idx, out, err := s.NodeChecks("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != modified {
t.Fatalf("bad index: %d", idx)
}
if len(out) != 1 {
t.Fatalf("bad: %#v", out)
}
c := out[0]
if c.Node != "node1" || c.CheckID != "check1" || c.Name != "check" ||
c.CreateIndex != created || c.ModifyIndex != modified {
t.Fatalf("bad check returned: %#v", c)
}
}
verifyNode(1, 3)
verifyService(2, 3)
verifyCheck(3, 3)
// Add in another check via the slice.
req.Checks = structs.HealthChecks{
&structs.HealthCheck{
Node: "node1",
CheckID: "check2",
Name: "check",
},
}
restore = s.Restore()
if err := restore.Registration(4, req); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
// Verify that the additional check got registered.
verifyNode(1, 4)
verifyService(2, 4)
func() {
idx, out, err := s.NodeChecks("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if len(out) != 2 {
t.Fatalf("bad: %#v", out)
}
c1 := out[0]
if c1.Node != "node1" || c1.CheckID != "check1" || c1.Name != "check" ||
c1.CreateIndex != 3 || c1.ModifyIndex != 4 {
t.Fatalf("bad check returned: %#v", c1)
}
c2 := out[1]
if c2.Node != "node1" || c2.CheckID != "check2" || c2.Name != "check" ||
c2.CreateIndex != 4 || c2.ModifyIndex != 4 {
t.Fatalf("bad check returned: %#v", c2)
}
}()
}
func TestStateStore_EnsureRegistration_Watches(t *testing.T) { func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
@ -505,6 +673,18 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
}) })
}) })
}) })
// The nodes watch should fire for this one.
verifyWatch(t, s.getTableWatch("nodes"), func() {
verifyNoWatch(t, s.getTableWatch("services"), func() {
verifyNoWatch(t, s.getTableWatch("checks"), func() {
restore := s.Restore()
if err := restore.Registration(1, req); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
})
})
// With a service definition added it should fire nodes and // With a service definition added it should fire nodes and
// services. // services.
@ -523,6 +703,17 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
}) })
}) })
}) })
verifyWatch(t, s.getTableWatch("nodes"), func() {
verifyWatch(t, s.getTableWatch("services"), func() {
verifyNoWatch(t, s.getTableWatch("checks"), func() {
restore := s.Restore()
if err := restore.Registration(2, req); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
})
})
// Now with a check it should hit all three. // Now with a check it should hit all three.
req.Check = &structs.HealthCheck{ req.Check = &structs.HealthCheck{
@ -539,6 +730,17 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) {
}) })
}) })
}) })
verifyWatch(t, s.getTableWatch("nodes"), func() {
verifyWatch(t, s.getTableWatch("services"), func() {
verifyWatch(t, s.getTableWatch("checks"), func() {
restore := s.Restore()
if err := restore.Registration(3, req); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
})
})
})
} }
func TestStateStore_EnsureNode(t *testing.T) { func TestStateStore_EnsureNode(t *testing.T) {
@ -3135,11 +3337,13 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
// Restore the values into a new state store. // Restore the values into a new state store.
func() { func() {
s := testStateStore(t) s := testStateStore(t)
restore := s.Restore()
for _, entry := range dump { for _, entry := range dump {
if err := s.KVSRestore(entry); err != nil { if err := restore.KVS(entry); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
} }
restore.Commit()
// Read the restored keys back out and verify they match. // Read the restored keys back out and verify they match.
idx, res, err := s.KVSList("") idx, res, err := s.KVSList("")
@ -3191,15 +3395,21 @@ func TestStateStore_KVS_Watches(t *testing.T) {
}) })
}) })
}) })
// Restore just fires off a top-level watch, so we should get hits on
// any prefix, including ones for keys that aren't in there.
verifyWatch(t, s.GetKVSWatch(""), func() { verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("b"), func() { verifyWatch(t, s.GetKVSWatch("b"), func() {
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { verifyWatch(t, s.GetKVSWatch("/nope"), func() {
if err := s.KVSRestore(&structs.DirEntry{Key: "bbb"}); err != nil { restore := s.Restore()
if err := restore.KVS(&structs.DirEntry{Key: "bbb"}); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
restore.Commit()
}) })
}) })
}) })
verifyWatch(t, s.GetKVSWatch(""), func() { verifyWatch(t, s.GetKVSWatch(""), func() {
verifyWatch(t, s.GetKVSWatch("a"), func() { verifyWatch(t, s.GetKVSWatch("a"), func() {
verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { verifyNoWatch(t, s.GetKVSWatch("/nope"), func() {
@ -3337,12 +3547,13 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
// Restore the values into a new state store. // Restore the values into a new state store.
func() { func() {
s := testStateStore(t) s := testStateStore(t)
restore := s.Restore()
for _, stone := range dump { for _, stone := range dump {
if err := s.TombstoneRestore(stone); err != nil { if err := restore.Tombstone(stone); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
} }
restore.Commit()
// See if the stone works properly in a list query. // See if the stone works properly in a list query.
idx, _, err := s.KVSList("foo/bar") idx, _, err := s.KVSList("foo/bar")
@ -3777,11 +3988,13 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) {
// Restore the sessions into a new state store. // Restore the sessions into a new state store.
func() { func() {
s := testStateStore(t) s := testStateStore(t)
restore := s.Restore()
for _, session := range dump { for _, session := range dump {
if err := s.SessionRestore(session); err != nil { if err := restore.Session(session); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
} }
restore.Commit()
// Read the restored sessions back out and verify that they // Read the restored sessions back out and verify that they
// match. // match.
@ -3859,14 +4072,16 @@ func TestStateStore_Session_Watches(t *testing.T) {
} }
}) })
verifyWatch(t, s.getTableWatch("sessions"), func() { verifyWatch(t, s.getTableWatch("sessions"), func() {
restore := s.Restore()
sess := &structs.Session{ sess := &structs.Session{
ID: session, ID: session,
Node: "node1", Node: "node1",
Behavior: structs.SessionKeysDelete, Behavior: structs.SessionKeysDelete,
} }
if err := s.SessionRestore(sess); err != nil { if err := restore.Session(sess); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
restore.Commit()
}) })
} }
@ -4456,11 +4671,13 @@ func TestStateStore_ACL_Snapshot_Restore(t *testing.T) {
// Restore the values into a new state store. // Restore the values into a new state store.
func() { func() {
s := testStateStore(t) s := testStateStore(t)
restore := s.Restore()
for _, acl := range dump { for _, acl := range dump {
if err := s.ACLRestore(acl); err != nil { if err := restore.ACL(acl); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
} }
restore.Commit()
// Read the restored ACLs back out and verify that they match. // Read the restored ACLs back out and verify that they match.
idx, res, err := s.ACLList() idx, res, err := s.ACLList()
@ -4497,8 +4714,10 @@ func TestStateStore_ACL_Watches(t *testing.T) {
} }
}) })
verifyWatch(t, s.getTableWatch("acls"), func() { verifyWatch(t, s.getTableWatch("acls"), func() {
if err := s.ACLRestore(&structs.ACL{ID: "acl1"}); err != nil { restore := s.Restore()
if err := restore.ACL(&structs.ACL{ID: "acl1"}); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
restore.Commit()
}) })
} }