From f3a95bf9fed4573c9ca174dc70f4a26ed0b48982 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 19 Oct 2015 23:06:59 -0700 Subject: [PATCH] Puts all restore operations into a single transaction and optimizes watches. --- consul/fsm.go | 60 ++++---- consul/state/state_store.go | 247 ++++++++++++++++++------------- consul/state/state_store_test.go | 237 +++++++++++++++++++++++++++-- 3 files changed, 408 insertions(+), 136 deletions(-) diff --git a/consul/fsm.go b/consul/fsm.go index bb78e5dd76..89cfd5725f 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -78,7 +78,7 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { switch msgType { case structs.RegisterRequestType: - return c.decodeRegister(buf[1:], log.Index) + return c.applyRegister(buf[1:], log.Index) case structs.DeregisterRequestType: return c.applyDeregister(buf[1:], log.Index) case structs.KVSRequestType: @@ -99,18 +99,15 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { } } -func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} { +func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now()) var req structs.RegisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - return c.applyRegister(&req, index) -} -func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} { // Apply all updates in a single transaction - defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now()) - if err := c.state.EnsureRegistration(index, req); err != nil { + if err := c.state.EnsureRegistration(index, &req); err != nil { c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err) return err } @@ -267,6 +264,10 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { } c.state = stateNew + // Set up a new restore transaction + restore := c.state.Restore() + defer restore.Abort() + // Create a decoder dec := codec.NewDecoder(old, msgpackHandle) @@ -294,32 +295,16 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(&req); err != nil { return err } - c.applyRegister(&req, header.LastIndex) + if err := restore.Registration(header.LastIndex, &req); err != nil { + return err + } case structs.KVSRequestType: var req structs.DirEntry if err := dec.Decode(&req); err != nil { return err } - if err := c.state.KVSRestore(&req); err != nil { - return err - } - - case structs.SessionRequestType: - var req structs.Session - if err := dec.Decode(&req); err != nil { - return err - } - if err := c.state.SessionRestore(&req); err != nil { - return err - } - - case structs.ACLRequestType: - var req structs.ACL - if err := dec.Decode(&req); err != nil { - return err - } - if err := c.state.ACLRestore(&req); err != nil { + if err := restore.KVS(&req); err != nil { return err } @@ -336,7 +321,25 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { Key: req.Key, Index: req.ModifyIndex, } - if err := c.state.TombstoneRestore(stone); err != nil { + if err := restore.Tombstone(stone); err != nil { + return err + } + + case structs.SessionRequestType: + var req structs.Session + if err := dec.Decode(&req); err != nil { + return err + } + if err := restore.Session(&req); err != nil { + return err + } + + case structs.ACLRequestType: + var req structs.ACL + if err := dec.Decode(&req); err != nil { + return err + } + if err := restore.ACL(&req); err != nil { return err } @@ -345,6 +348,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { } } + restore.Commit() return nil } diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 5dac6675df..402cf64cec 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -57,6 +57,14 @@ type StateSnapshot struct { lastIndex uint64 } +// StateRestore is used to efficiently manage restoring a large amount of +// data to a state store. +type StateRestore struct { + store *StateStore + tx *memdb.Txn + watches *DumbWatchManager +} + // IndexEntry keeps a record of the last index per-table. type IndexEntry struct { Key string @@ -188,6 +196,109 @@ func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) { return iter, nil } +// Restore is used to efficiently manage restoring a large amount of data into +// the state store. It works by doing all the restores inside of a single +// transaction. +func (s *StateStore) Restore() *StateRestore { + tx := s.db.Txn(true) + watches := NewDumbWatchManager(s.tableWatches) + return &StateRestore{s, tx, watches} +} + +// Abort abandons the changes made by a restore. This or Commit should always be +// called. +func (s *StateRestore) Abort() { + s.tx.Abort() +} + +// Commit commits the changes made by a restore. This or Abort should always be +// called. +func (s *StateRestore) Commit() { + // Fire off a single KVS watch instead of a zillion prefix ones, and use + // a dumb watch manager to single-fire all the full table watches. + s.tx.Defer(func() { s.store.kvsWatch.Notify("", true) }) + s.tx.Defer(func() { s.watches.Notify() }) + + s.tx.Commit() +} + +// Registration is used to make sure a node, service, and check registration is +// performed within a single transaction to avoid race conditions on state +// updates. +func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) error { + if err := s.store.ensureRegistrationTxn(s.tx, idx, s.watches, req); err != nil { + return err + } + return nil +} + +// KVS is used when restoring from a snapshot. Use KVSSet for general inserts. +func (s *StateRestore) KVS(entry *structs.DirEntry) error { + if err := s.tx.Insert("kvs", entry); err != nil { + return fmt.Errorf("failed inserting kvs entry: %s", err) + } + + if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + // We have a single top-level KVS watch trigger instead of doing + // tons of prefix watches. + return nil +} + +// Tombstone is used when restoring from a snapshot. For general inserts, use +// Graveyard.InsertTxn. +func (s *StateRestore) Tombstone(stone *Tombstone) error { + if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil { + return fmt.Errorf("failed restoring tombstone: %s", err) + } + return nil +} + +// Session is used when restoring from a snapshot. For general inserts, use +// SessionCreate. +func (s *StateRestore) Session(sess *structs.Session) error { + // Insert the session. + if err := s.tx.Insert("sessions", sess); err != nil { + return fmt.Errorf("failed inserting session: %s", err) + } + + // Insert the check mappings. + for _, checkID := range sess.Checks { + mapping := &sessionCheck{ + Node: sess.Node, + CheckID: checkID, + Session: sess.ID, + } + if err := s.tx.Insert("session_checks", mapping); err != nil { + return fmt.Errorf("failed inserting session check mapping: %s", err) + } + } + + // Update the index. + if err := indexUpdateMaxTxn(s.tx, sess.ModifyIndex, "sessions"); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + s.watches.Arm("sessions") + return nil +} + +// ACL is used when restoring from a snapshot. For general inserts, use ACLSet. +func (s *StateRestore) ACL(acl *structs.ACL) error { + if err := s.tx.Insert("acls", acl); err != nil { + return fmt.Errorf("failed restoring acl: %s", err) + } + + if err := indexUpdateMaxTxn(s.tx, acl.ModifyIndex, "acls"); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + s.watches.Arm("acls") + return nil +} + // maxIndex is a helper used to retrieve the highest known index // amongst a set of tables in the db. func (s *StateStore) maxIndex(tables ...string) uint64 { @@ -311,32 +422,46 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest tx := s.db.Txn(true) defer tx.Abort() + watches := NewDumbWatchManager(s.tableWatches) + if err := s.ensureRegistrationTxn(tx, idx, watches, req); err != nil { + return err + } + + tx.Defer(func() { watches.Notify() }) + tx.Commit() + return nil +} + +// ensureRegistrationTxn is used to make sure a node, service, and check +// registration is performed within a single transaction to avoid race +// conditions on state updates. +func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, + req *structs.RegisterRequest) error { // Add the node. node := &structs.Node{Node: req.Node, Address: req.Address} - if err := s.ensureNodeTxn(tx, idx, node); err != nil { + if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil { return fmt.Errorf("failed inserting node: %s", err) } // Add the service, if any. if req.Service != nil { - if err := s.ensureServiceTxn(tx, idx, req.Node, req.Service); err != nil { + if err := s.ensureServiceTxn(tx, idx, watches, req.Node, req.Service); err != nil { return fmt.Errorf("failed inserting service: %s", err) } } // Add the checks, if any. if req.Check != nil { - if err := s.ensureCheckTxn(tx, idx, req.Check); err != nil { + if err := s.ensureCheckTxn(tx, idx, watches, req.Check); err != nil { return fmt.Errorf("failed inserting check: %s", err) } } for _, check := range req.Checks { - if err := s.ensureCheckTxn(tx, idx, check); err != nil { + if err := s.ensureCheckTxn(tx, idx, watches, check); err != nil { return fmt.Errorf("failed inserting check: %s", err) } } - tx.Commit() return nil } @@ -346,10 +471,12 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error { defer tx.Abort() // Call the node upsert - if err := s.ensureNodeTxn(tx, idx, node); err != nil { + watches := NewDumbWatchManager(s.tableWatches) + if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil { return err } + tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } @@ -357,7 +484,8 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error { // ensureNodeTxn is the inner function called to actually create a node // registration or modify an existing one in the state store. It allows // passing in a memdb transaction so it may be part of a larger txn. -func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error { +func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, + node *structs.Node) error { // Check for an existing node existing, err := tx.First("nodes", "id", node.Node) if err != nil { @@ -381,7 +509,7 @@ func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node return fmt.Errorf("failed updating index: %s", err) } - tx.Defer(func() { s.tableWatches["nodes"].Notify() }) + watches.Arm("nodes") return nil } @@ -527,17 +655,20 @@ func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeSer defer tx.Abort() // Call the service registration upsert - if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil { + watches := NewDumbWatchManager(s.tableWatches) + if err := s.ensureServiceTxn(tx, idx, watches, node, svc); err != nil { return err } + tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } // ensureServiceTxn is used to upsert a service registration within an // existing memdb transaction. -func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error { +func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, + node string, svc *structs.NodeService) error { // Check for existing service existing, err := tx.First("services", "id", node, svc.ID) if err != nil { @@ -572,7 +703,7 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, sv return fmt.Errorf("failed updating index: %s", err) } - tx.Defer(func() { s.tableWatches["services"].Notify() }) + watches.Arm("services") return nil } @@ -819,10 +950,12 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { defer tx.Abort() // Call the check registration - if err := s.ensureCheckTxn(tx, idx, hc); err != nil { + watches := NewDumbWatchManager(s.tableWatches) + if err := s.ensureCheckTxn(tx, idx, watches, hc); err != nil { return err } + tx.Defer(func() { watches.Notify() }) tx.Commit() return nil } @@ -830,7 +963,8 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { // ensureCheckTransaction is used as the inner method to handle inserting // a health check into the state store. It ensures safety against inserting // checks with no matching node or service. -func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error { +func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, + hc *structs.HealthCheck) error { // Check if we have an existing health check existing, err := tx.First("checks", "id", hc.Node, hc.CheckID) if err != nil { @@ -906,7 +1040,7 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.Healt return fmt.Errorf("failed updating index: %s", err) } - tx.Defer(func() { s.tableWatches["checks"].Notify() }) + watches.Arm("checks") return nil } @@ -1680,39 +1814,6 @@ func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error return true, nil } -// KVSRestore is used when restoring from a snapshot. Use KVSSet for general -// inserts. -func (s *StateStore) KVSRestore(entry *structs.DirEntry) error { - tx := s.db.Txn(true) - defer tx.Abort() - - if err := tx.Insert("kvs", entry); err != nil { - return fmt.Errorf("failed inserting kvs entry: %s", err) - } - - if err := indexUpdateMaxTxn(tx, entry.ModifyIndex, "kvs"); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - tx.Defer(func() { s.kvsWatch.Notify(entry.Key, false) }) - tx.Commit() - return nil -} - -// Tombstone is used when restoring from a snapshot. For general inserts, use -// Graveyard.InsertTxn. -func (s *StateStore) TombstoneRestore(stone *Tombstone) error { - tx := s.db.Txn(true) - defer tx.Abort() - - if err := s.kvsGraveyard.RestoreTxn(tx, stone); err != nil { - return fmt.Errorf("failed restoring tombstone: %s", err) - } - - tx.Commit() - return nil -} - // SessionCreate is used to register a new session in the state store. func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error { tx := s.db.Txn(true) @@ -1990,39 +2091,6 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa return nil } -// SessionRestore is used when restoring from a snapshot. For general inserts, -// use SessionCreate. -func (s *StateStore) SessionRestore(sess *structs.Session) error { - tx := s.db.Txn(true) - defer tx.Abort() - - // Insert the session. - if err := tx.Insert("sessions", sess); err != nil { - return fmt.Errorf("failed inserting session: %s", err) - } - - // Insert the check mappings. - for _, checkID := range sess.Checks { - mapping := &sessionCheck{ - Node: sess.Node, - CheckID: checkID, - Session: sess.ID, - } - if err := tx.Insert("session_checks", mapping); err != nil { - return fmt.Errorf("failed inserting session check mapping: %s", err) - } - } - - // Update the index. - if err := indexUpdateMaxTxn(tx, sess.ModifyIndex, "sessions"); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - tx.Defer(func() { s.tableWatches["sessions"].Notify() }) - tx.Commit() - return nil -} - // ACLSet is used to insert an ACL rule into the state store. func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error { tx := s.db.Txn(true) @@ -2163,22 +2231,3 @@ func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error tx.Defer(func() { s.tableWatches["acls"].Notify() }) return nil } - -// ACLRestore is used when restoring from a snapshot. For general inserts, use -// ACLSet. -func (s *StateStore) ACLRestore(acl *structs.ACL) error { - tx := s.db.Txn(true) - defer tx.Abort() - - if err := tx.Insert("acls", acl); err != nil { - return fmt.Errorf("failed restoring acl: %s", err) - } - - if err := indexUpdateMaxTxn(tx, acl.ModifyIndex, "acls"); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - tx.Defer(func() { s.tableWatches["acls"].Notify() }) - tx.Commit() - return nil -} diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 7b76ea2c1b..ae2b82dd69 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -121,6 +121,36 @@ func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) { } } +func TestStateStore_Restore_Abort(t *testing.T) { + s := testStateStore(t) + + // The detailed restore functions are tested below, this just checks + // that abort works. + restore := s.Restore() + entry := &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + RaftIndex: structs.RaftIndex{ + ModifyIndex: 5, + }, + } + if err := restore.KVS(entry); err != nil { + t.Fatalf("err: %s", err) + } + restore.Abort() + + idx, entries, err := s.KVSList("") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 0 { + t.Fatalf("bad index: %d", idx) + } + if len(entries) != 0 { + t.Fatalf("bad: %#v", entries) + } +} + func TestStateStore_maxIndex(t *testing.T) { s := testStateStore(t) @@ -487,6 +517,144 @@ func TestStateStore_EnsureRegistration(t *testing.T) { }() } +func TestStateStore_EnsureRegistration_Restore(t *testing.T) { + s := testStateStore(t) + + // Start with just a node. + req := &structs.RegisterRequest{ + Node: "node1", + Address: "1.2.3.4", + } + restore := s.Restore() + if err := restore.Registration(1, req); err != nil { + t.Fatalf("err: %s", err) + } + restore.Commit() + + // Retrieve the node and verify its contents. + verifyNode := func(created, modified uint64) { + _, out, err := s.GetNode("node1") + if err != nil { + t.Fatalf("err: %s", err) + } + if out.Node != "node1" || out.Address != "1.2.3.4" || + out.CreateIndex != created || out.ModifyIndex != modified { + t.Fatalf("bad node returned: %#v", out) + } + } + verifyNode(1, 1) + + // Add in a service definition. + req.Service = &structs.NodeService{ + ID: "redis1", + Service: "redis", + Address: "1.1.1.1", + Port: 8080, + } + restore = s.Restore() + if err := restore.Registration(2, req); err != nil { + t.Fatalf("err: %s", err) + } + restore.Commit() + + // Verify that the service got registered. + verifyService := func(created, modified uint64) { + idx, out, err := s.NodeServices("node1") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != modified { + t.Fatalf("bad index: %d", idx) + } + if len(out.Services) != 1 { + t.Fatalf("bad: %#v", out.Services) + } + s := out.Services["redis1"] + if s.ID != "redis1" || s.Service != "redis" || + s.Address != "1.1.1.1" || s.Port != 8080 || + s.CreateIndex != created || s.ModifyIndex != modified { + t.Fatalf("bad service returned: %#v", s) + } + } + verifyNode(1, 2) + verifyService(2, 2) + + // Add in a top-level check. + req.Check = &structs.HealthCheck{ + Node: "node1", + CheckID: "check1", + Name: "check", + } + restore = s.Restore() + if err := restore.Registration(3, req); err != nil { + t.Fatalf("err: %s", err) + } + restore.Commit() + + // Verify that the check got registered. + verifyCheck := func(created, modified uint64) { + idx, out, err := s.NodeChecks("node1") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != modified { + t.Fatalf("bad index: %d", idx) + } + if len(out) != 1 { + t.Fatalf("bad: %#v", out) + } + c := out[0] + if c.Node != "node1" || c.CheckID != "check1" || c.Name != "check" || + c.CreateIndex != created || c.ModifyIndex != modified { + t.Fatalf("bad check returned: %#v", c) + } + } + verifyNode(1, 3) + verifyService(2, 3) + verifyCheck(3, 3) + + // Add in another check via the slice. + req.Checks = structs.HealthChecks{ + &structs.HealthCheck{ + Node: "node1", + CheckID: "check2", + Name: "check", + }, + } + restore = s.Restore() + if err := restore.Registration(4, req); err != nil { + t.Fatalf("err: %s", err) + } + restore.Commit() + + // Verify that the additional check got registered. + verifyNode(1, 4) + verifyService(2, 4) + func() { + idx, out, err := s.NodeChecks("node1") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 4 { + t.Fatalf("bad index: %d", idx) + } + if len(out) != 2 { + t.Fatalf("bad: %#v", out) + } + c1 := out[0] + if c1.Node != "node1" || c1.CheckID != "check1" || c1.Name != "check" || + c1.CreateIndex != 3 || c1.ModifyIndex != 4 { + t.Fatalf("bad check returned: %#v", c1) + } + + c2 := out[1] + if c2.Node != "node1" || c2.CheckID != "check2" || c2.Name != "check" || + c2.CreateIndex != 4 || c2.ModifyIndex != 4 { + t.Fatalf("bad check returned: %#v", c2) + } + }() +} + func TestStateStore_EnsureRegistration_Watches(t *testing.T) { s := testStateStore(t) @@ -505,6 +673,18 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) { }) }) }) + // The nodes watch should fire for this one. + verifyWatch(t, s.getTableWatch("nodes"), func() { + verifyNoWatch(t, s.getTableWatch("services"), func() { + verifyNoWatch(t, s.getTableWatch("checks"), func() { + restore := s.Restore() + if err := restore.Registration(1, req); err != nil { + t.Fatalf("err: %s", err) + } + restore.Commit() + }) + }) + }) // With a service definition added it should fire nodes and // services. @@ -523,6 +703,17 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) { }) }) }) + verifyWatch(t, s.getTableWatch("nodes"), func() { + verifyWatch(t, s.getTableWatch("services"), func() { + verifyNoWatch(t, s.getTableWatch("checks"), func() { + restore := s.Restore() + if err := restore.Registration(2, req); err != nil { + t.Fatalf("err: %s", err) + } + restore.Commit() + }) + }) + }) // Now with a check it should hit all three. req.Check = &structs.HealthCheck{ @@ -539,6 +730,17 @@ func TestStateStore_EnsureRegistration_Watches(t *testing.T) { }) }) }) + verifyWatch(t, s.getTableWatch("nodes"), func() { + verifyWatch(t, s.getTableWatch("services"), func() { + verifyWatch(t, s.getTableWatch("checks"), func() { + restore := s.Restore() + if err := restore.Registration(3, req); err != nil { + t.Fatalf("err: %s", err) + } + restore.Commit() + }) + }) + }) } func TestStateStore_EnsureNode(t *testing.T) { @@ -3135,11 +3337,13 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { // Restore the values into a new state store. func() { s := testStateStore(t) + restore := s.Restore() for _, entry := range dump { - if err := s.KVSRestore(entry); err != nil { + if err := restore.KVS(entry); err != nil { t.Fatalf("err: %s", err) } } + restore.Commit() // Read the restored keys back out and verify they match. idx, res, err := s.KVSList("") @@ -3191,15 +3395,21 @@ func TestStateStore_KVS_Watches(t *testing.T) { }) }) }) + + // Restore just fires off a top-level watch, so we should get hits on + // any prefix, including ones for keys that aren't in there. verifyWatch(t, s.GetKVSWatch(""), func() { verifyWatch(t, s.GetKVSWatch("b"), func() { - verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { - if err := s.KVSRestore(&structs.DirEntry{Key: "bbb"}); err != nil { + verifyWatch(t, s.GetKVSWatch("/nope"), func() { + restore := s.Restore() + if err := restore.KVS(&structs.DirEntry{Key: "bbb"}); err != nil { t.Fatalf("err: %s", err) } + restore.Commit() }) }) }) + verifyWatch(t, s.GetKVSWatch(""), func() { verifyWatch(t, s.GetKVSWatch("a"), func() { verifyNoWatch(t, s.GetKVSWatch("/nope"), func() { @@ -3337,12 +3547,13 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { // Restore the values into a new state store. func() { s := testStateStore(t) - + restore := s.Restore() for _, stone := range dump { - if err := s.TombstoneRestore(stone); err != nil { + if err := restore.Tombstone(stone); err != nil { t.Fatalf("err: %s", err) } } + restore.Commit() // See if the stone works properly in a list query. idx, _, err := s.KVSList("foo/bar") @@ -3777,11 +3988,13 @@ func TestStateStore_Session_Snapshot_Restore(t *testing.T) { // Restore the sessions into a new state store. func() { s := testStateStore(t) + restore := s.Restore() for _, session := range dump { - if err := s.SessionRestore(session); err != nil { + if err := restore.Session(session); err != nil { t.Fatalf("err: %s", err) } } + restore.Commit() // Read the restored sessions back out and verify that they // match. @@ -3859,14 +4072,16 @@ func TestStateStore_Session_Watches(t *testing.T) { } }) verifyWatch(t, s.getTableWatch("sessions"), func() { + restore := s.Restore() sess := &structs.Session{ ID: session, Node: "node1", Behavior: structs.SessionKeysDelete, } - if err := s.SessionRestore(sess); err != nil { + if err := restore.Session(sess); err != nil { t.Fatalf("err: %s", err) } + restore.Commit() }) } @@ -4456,11 +4671,13 @@ func TestStateStore_ACL_Snapshot_Restore(t *testing.T) { // Restore the values into a new state store. func() { s := testStateStore(t) + restore := s.Restore() for _, acl := range dump { - if err := s.ACLRestore(acl); err != nil { + if err := restore.ACL(acl); err != nil { t.Fatalf("err: %s", err) } } + restore.Commit() // Read the restored ACLs back out and verify that they match. idx, res, err := s.ACLList() @@ -4497,8 +4714,10 @@ func TestStateStore_ACL_Watches(t *testing.T) { } }) verifyWatch(t, s.getTableWatch("acls"), func() { - if err := s.ACLRestore(&structs.ACL{ID: "acl1"}); err != nil { + restore := s.Restore() + if err := restore.ACL(&structs.ACL{ID: "acl1"}); err != nil { t.Fatalf("err: %s", err) } + restore.Commit() }) }