Adds a facility to notify when restores occur.

This commit is contained in:
James Phillips 2017-01-24 10:38:03 -08:00
parent 1d39ddbd4b
commit 68e90d0f24
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
5 changed files with 65 additions and 0 deletions

View File

@ -330,9 +330,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
// External code might be calling State(), so we need to synchronize
// here to make sure we swap in the new state store atomically.
c.stateLock.Lock()
stateOld := c.state
c.state = stateNew
c.stateLock.Unlock()
// The old state store has been abandoned already since we've replaced
// it with an empty one, but we defer telling watchers about it until
// the restore is done, so they wake up one we have the latest data.
defer stateOld.Abandon()
// Set up a new restore transaction
restore := c.state.Restore()
defer restore.Abort()

View File

@ -563,6 +563,33 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if !reflect.DeepEqual(queries[0], &query) {
t.Fatalf("bad: %#v", queries[0])
}
// Snapshot
snap, err = fsm2.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Release()
// Persist
buf = bytes.NewBuffer(nil)
sink = &MockSink{buf, false}
if err := snap.Persist(sink); err != nil {
t.Fatalf("err: %v", err)
}
// Try to restore on the old FSM and make sure it abandons the old state
// store.
abandonCh := fsm.state.AbandonCh()
if err := fsm.Restore(sink); err != nil {
t.Fatalf("err: %v", err)
}
select {
case <-abandonCh:
default:
t.Fatalf("bad")
}
}
func TestFSM_KVSSet(t *testing.T) {

View File

@ -469,6 +469,10 @@ RUN_QUERY:
var ws memdb.WatchSet
if queryOpts.MinQueryIndex > 0 {
ws = memdb.NewWatchSet()
// This channel will be closed if a snapshot is restored and the
// whole state store is abandoned.
ws.Add(s.fsm.State().AbandonCh())
}
// Block up to the timeout if we didn't see anything fresh.

View File

@ -50,6 +50,10 @@ type StateStore struct {
schema *memdb.DBSchema
db *memdb.MemDB
// abandonCh is used to signal watchers that this state store has been
// abandoned (usually during a restore). This is only ever closed.
abandonCh chan struct{}
// tableWatches holds all the full table watches, indexed by table name.
tableWatches map[string]*FullTableWatch
@ -118,6 +122,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
s := &StateStore{
schema: schema,
db: db,
abandonCh: make(chan struct{}),
tableWatches: tableWatches,
kvsWatch: NewPrefixWatchManager(),
kvsGraveyard: NewGraveyard(gc),
@ -175,6 +180,18 @@ func (s *StateRestore) Commit() {
s.tx.Commit()
}
// AbandonCh returns a channel you can wait on to know if the state store was
// abandoned.
func (s *StateStore) AbandonCh() <-chan struct{} {
return s.abandonCh
}
// Abandon is used to signal that the given state store has been abandoned.
// Calling this more than one time will panic.
func (s *StateStore) Abandon() {
close(s.abandonCh)
}
// maxIndex is a helper used to retrieve the highest known index
// amongst a set of tables in the db.
func (s *StateStore) maxIndex(tables ...string) uint64 {

View File

@ -164,6 +164,17 @@ func TestStateStore_Restore_Abort(t *testing.T) {
}
}
func TestStateStore_Abandon(t *testing.T) {
s := testStateStore(t)
abandonCh := s.AbandonCh()
s.Abandon()
select {
case <-abandonCh:
default:
t.Fatalf("bad")
}
}
func TestStateStore_maxIndex(t *testing.T) {
s := testStateStore(t)