mirror of https://github.com/status-im/consul.git
Merge pull request #2675 from hashicorp/f-fine-watch-redux
Cleans up state store restore behavior.
This commit is contained in:
commit
373f1a94ae
|
@ -318,29 +318,19 @@ func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||||
return &consulSnapshot{c.state.Snapshot()}, nil
|
return &consulSnapshot{c.state.Snapshot()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Restore streams in the snapshot and replaces the current state store with a
|
||||||
|
// new one based on the snapshot if all goes OK during the restore.
|
||||||
func (c *consulFSM) Restore(old io.ReadCloser) error {
|
func (c *consulFSM) Restore(old io.ReadCloser) error {
|
||||||
defer old.Close()
|
defer old.Close()
|
||||||
|
|
||||||
// Create a new state store
|
// Create a new state store.
|
||||||
stateNew, err := state.NewStateStore(c.gc)
|
stateNew, err := state.NewStateStore(c.gc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// External code might be calling State(), so we need to synchronize
|
|
||||||
// here to make sure we swap in the new state store atomically.
|
|
||||||
c.stateLock.Lock()
|
|
||||||
stateOld := c.state
|
|
||||||
c.state = stateNew
|
|
||||||
c.stateLock.Unlock()
|
|
||||||
|
|
||||||
// The old state store has been abandoned already since we've replaced
|
|
||||||
// it with an empty one, but we defer telling watchers about it until
|
|
||||||
// the restore is done, so they wake up once we have the latest data.
|
|
||||||
defer stateOld.Abandon()
|
|
||||||
|
|
||||||
// Set up a new restore transaction
|
// Set up a new restore transaction
|
||||||
restore := c.state.Restore()
|
restore := stateNew.Restore()
|
||||||
defer restore.Abort()
|
defer restore.Abort()
|
||||||
|
|
||||||
// Create a decoder
|
// Create a decoder
|
||||||
|
@ -443,6 +433,18 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
restore.Commit()
|
restore.Commit()
|
||||||
|
|
||||||
|
// External code might be calling State(), so we need to synchronize
|
||||||
|
// here to make sure we swap in the new state store atomically.
|
||||||
|
c.stateLock.Lock()
|
||||||
|
stateOld := c.state
|
||||||
|
c.state = stateNew
|
||||||
|
c.stateLock.Unlock()
|
||||||
|
|
||||||
|
// Signal that the old state store has been abandoned. This is required
|
||||||
|
// because we don't operate on it any more, we just throw it away, so
|
||||||
|
// blocking queries won't see any changes and need to be woken up.
|
||||||
|
stateOld.Abandon()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -592,37 +592,41 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_KVSSet(t *testing.T) {
|
func TestFSM_BadRestore(t *testing.T) {
|
||||||
|
// Create an FSM with some state.
|
||||||
fsm, err := NewFSM(nil, os.Stderr)
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||||
|
abandonCh := fsm.state.AbandonCh()
|
||||||
|
|
||||||
req := structs.KVSRequest{
|
// Do a bad restore.
|
||||||
Datacenter: "dc1",
|
buf := bytes.NewBuffer([]byte("bad snapshot"))
|
||||||
Op: structs.KVSSet,
|
sink := &MockSink{buf, false}
|
||||||
DirEnt: structs.DirEntry{
|
if err := fsm.Restore(sink); err == nil {
|
||||||
Key: "/test/path",
|
|
||||||
Flags: 0,
|
|
||||||
Value: []byte("test"),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
buf, err := structs.Encode(structs.KVSRequestType, req)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
resp := fsm.Apply(makeLog(buf))
|
|
||||||
if resp != nil {
|
// Verify the contents didn't get corrupted.
|
||||||
t.Fatalf("resp: %v", resp)
|
_, nodes, err := fsm.state.Nodes(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if len(nodes) != 1 {
|
||||||
|
t.Fatalf("bad: %v", nodes)
|
||||||
|
}
|
||||||
|
if nodes[0].Node != "foo" ||
|
||||||
|
nodes[0].Address != "127.0.0.1" ||
|
||||||
|
len(nodes[0].TaggedAddresses) != 0 {
|
||||||
|
t.Fatalf("bad: %v", nodes[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify key is set
|
// Verify the old state store didn't get abandoned.
|
||||||
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
select {
|
||||||
if err != nil {
|
case <-abandonCh:
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("bad")
|
||||||
}
|
default:
|
||||||
if d == nil {
|
|
||||||
t.Fatalf("missing")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -417,7 +417,16 @@ RUN_QUERY:
|
||||||
err := fn(ws, state)
|
err := fn(ws, state)
|
||||||
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
|
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
|
||||||
if expired := ws.Watch(timeout.C); !expired {
|
if expired := ws.Watch(timeout.C); !expired {
|
||||||
goto RUN_QUERY
|
// If a restore may have woken us up then bail out from
|
||||||
|
// the query immediately. This is slightly race-ey since
|
||||||
|
// this might have been interrupted for other reasons,
|
||||||
|
// but it's OK to kick it back to the caller in either
|
||||||
|
// case.
|
||||||
|
select {
|
||||||
|
case <-state.AbandonCh():
|
||||||
|
default:
|
||||||
|
goto RUN_QUERY
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -1,12 +1,15 @@
|
||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/state"
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/consul/testutil"
|
"github.com/hashicorp/consul/testutil"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -70,3 +73,93 @@ func TestRPC_NoLeader_Retry(t *testing.T) {
|
||||||
t.Fatalf("bad: %v", err)
|
t.Fatalf("bad: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRPC_blockingQuery(t *testing.T) {
|
||||||
|
dir, s := testServer(t)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer s.Shutdown()
|
||||||
|
|
||||||
|
// Perform a non-blocking query.
|
||||||
|
{
|
||||||
|
var opts structs.QueryOptions
|
||||||
|
var meta structs.QueryMeta
|
||||||
|
var calls int
|
||||||
|
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
|
||||||
|
calls++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if calls != 1 {
|
||||||
|
t.Fatalf("bad: %d", calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform a blocking query that gets woken up and loops around once.
|
||||||
|
{
|
||||||
|
opts := structs.QueryOptions{
|
||||||
|
MinQueryIndex: 3,
|
||||||
|
}
|
||||||
|
var meta structs.QueryMeta
|
||||||
|
var calls int
|
||||||
|
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
|
||||||
|
if calls == 0 {
|
||||||
|
meta.Index = 3
|
||||||
|
|
||||||
|
fakeCh := make(chan struct{})
|
||||||
|
close(fakeCh)
|
||||||
|
ws.Add(fakeCh)
|
||||||
|
} else {
|
||||||
|
meta.Index = 4
|
||||||
|
}
|
||||||
|
calls++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if calls != 2 {
|
||||||
|
t.Fatalf("bad: %d", calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform a query that blocks and gets interrupted when the state store
|
||||||
|
// is abandoned.
|
||||||
|
{
|
||||||
|
opts := structs.QueryOptions{
|
||||||
|
MinQueryIndex: 3,
|
||||||
|
}
|
||||||
|
var meta structs.QueryMeta
|
||||||
|
var calls int
|
||||||
|
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
|
||||||
|
if calls == 0 {
|
||||||
|
meta.Index = 3
|
||||||
|
|
||||||
|
snap, err := s.fsm.Snapshot()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer snap.Release()
|
||||||
|
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
sink := &MockSink{buf, false}
|
||||||
|
if err := snap.Persist(sink); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.fsm.Restore(sink); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
calls++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if calls != 1 {
|
||||||
|
t.Fatalf("bad: %d", calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue