mirror of https://github.com/status-im/consul.git
Nukes old state store's connection to FSM and RPC.
This commit is contained in:
parent
7729b66099
commit
6ba70be33c
|
@ -4,7 +4,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -26,7 +25,6 @@ type consulFSM struct {
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
path string
|
path string
|
||||||
stateNew *state.StateStore
|
stateNew *state.StateStore
|
||||||
state *StateStore
|
|
||||||
gc *state.TombstoneGC
|
gc *state.TombstoneGC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,7 +32,6 @@ type consulFSM struct {
|
||||||
// state in a way that can be accessed concurrently with operations
|
// state in a way that can be accessed concurrently with operations
|
||||||
// that may modify the live state.
|
// that may modify the live state.
|
||||||
type consulSnapshot struct {
|
type consulSnapshot struct {
|
||||||
state *StateSnapshot
|
|
||||||
stateNew *state.StateSnapshot
|
stateNew *state.StateSnapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,51 +43,26 @@ type snapshotHeader struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFSMPath is used to construct a new FSM with a blank state
|
// NewFSMPath is used to construct a new FSM with a blank state
|
||||||
func NewFSM(gc *state.TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) {
|
func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) {
|
||||||
// Create the state store.
|
|
||||||
stateNew, err := state.NewStateStore(gc)
|
stateNew, err := state.NewStateStore(gc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a temporary path for the state store
|
|
||||||
tmpPath, err := ioutil.TempDir(path, "state")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a state store
|
|
||||||
state, err := NewStateStorePath(gc, tmpPath, logOutput)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
fsm := &consulFSM{
|
fsm := &consulFSM{
|
||||||
logOutput: logOutput,
|
logOutput: logOutput,
|
||||||
logger: log.New(logOutput, "", log.LstdFlags),
|
logger: log.New(logOutput, "", log.LstdFlags),
|
||||||
path: path,
|
|
||||||
stateNew: stateNew,
|
stateNew: stateNew,
|
||||||
state: state,
|
|
||||||
gc: gc,
|
gc: gc,
|
||||||
}
|
}
|
||||||
return fsm, nil
|
return fsm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close is used to cleanup resources associated with the FSM
|
|
||||||
func (c *consulFSM) Close() error {
|
|
||||||
return c.state.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// StateNew is used to return a handle to the current state
|
// StateNew is used to return a handle to the current state
|
||||||
func (c *consulFSM) StateNew() *state.StateStore {
|
func (c *consulFSM) StateNew() *state.StateStore {
|
||||||
return c.stateNew
|
return c.stateNew
|
||||||
}
|
}
|
||||||
|
|
||||||
// State is used to return a handle to the current state
|
|
||||||
func (c *consulFSM) State() *StateStore {
|
|
||||||
return c.state
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
||||||
buf := log.Data
|
buf := log.Data
|
||||||
msgType := structs.MessageType(buf[0])
|
msgType := structs.MessageType(buf[0])
|
||||||
|
@ -282,30 +254,18 @@ func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||||
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
// Create a new snapshot
|
return &consulSnapshot{c.stateNew.Snapshot()}, nil
|
||||||
snap, err := c.state.Snapshot()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &consulSnapshot{snap, c.stateNew.Snapshot()}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *consulFSM) Restore(old io.ReadCloser) error {
|
func (c *consulFSM) Restore(old io.ReadCloser) error {
|
||||||
defer old.Close()
|
defer old.Close()
|
||||||
|
|
||||||
// Create a temporary path for the state store
|
|
||||||
tmpPath, err := ioutil.TempDir(c.path, "state")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new state store
|
// Create a new state store
|
||||||
store, err := NewStateStorePath(c.gc, tmpPath, c.logOutput)
|
stateNew, err := state.NewStateStore(c.gc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.state.Close()
|
c.stateNew = stateNew
|
||||||
c.state = store
|
|
||||||
|
|
||||||
// Create a decoder
|
// Create a decoder
|
||||||
dec := codec.NewDecoder(old, msgpackHandle)
|
dec := codec.NewDecoder(old, msgpackHandle)
|
||||||
|
@ -390,6 +350,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
||||||
|
|
||||||
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||||
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
|
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
|
||||||
|
|
||||||
// Register the nodes
|
// Register the nodes
|
||||||
encoder := codec.NewEncoder(sink, msgpackHandle)
|
encoder := codec.NewEncoder(sink, msgpackHandle)
|
||||||
|
|
||||||
|
@ -557,6 +518,5 @@ func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *consulSnapshot) Release() {
|
func (s *consulSnapshot) Release() {
|
||||||
s.state.Close()
|
|
||||||
s.stateNew.Close()
|
s.stateNew.Close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -38,16 +37,10 @@ func makeLog(buf []byte) *raft.Log {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_RegisterNode(t *testing.T) {
|
func TestFSM_RegisterNode(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -87,16 +80,10 @@ func TestFSM_RegisterNode(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_RegisterNode_Service(t *testing.T) {
|
func TestFSM_RegisterNode_Service(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -155,16 +142,10 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_DeregisterService(t *testing.T) {
|
func TestFSM_DeregisterService(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -222,16 +203,10 @@ func TestFSM_DeregisterService(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_DeregisterCheck(t *testing.T) {
|
func TestFSM_DeregisterCheck(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -289,16 +264,10 @@ func TestFSM_DeregisterCheck(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_DeregisterNode(t *testing.T) {
|
func TestFSM_DeregisterNode(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -371,16 +340,10 @@ func TestFSM_DeregisterNode(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_SnapshotRestore(t *testing.T) {
|
func TestFSM_SnapshotRestore(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
// Add some state
|
// Add some state
|
||||||
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||||
|
@ -433,11 +396,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to restore on a new FSM
|
// Try to restore on a new FSM
|
||||||
fsm2, err := NewFSM(nil, path, os.Stderr)
|
fsm2, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer fsm2.Close()
|
|
||||||
|
|
||||||
// Do a restore
|
// Do a restore
|
||||||
if err := fsm2.Restore(sink); err != nil {
|
if err := fsm2.Restore(sink); err != nil {
|
||||||
|
@ -519,16 +481,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_KVSSet(t *testing.T) {
|
func TestFSM_KVSSet(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
req := structs.KVSRequest{
|
req := structs.KVSRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -559,16 +515,10 @@ func TestFSM_KVSSet(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_KVSDelete(t *testing.T) {
|
func TestFSM_KVSDelete(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
req := structs.KVSRequest{
|
req := structs.KVSRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -610,16 +560,10 @@ func TestFSM_KVSDelete(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_KVSDeleteTree(t *testing.T) {
|
func TestFSM_KVSDeleteTree(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
req := structs.KVSRequest{
|
req := structs.KVSRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -662,16 +606,10 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
|
func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
req := structs.KVSRequest{
|
req := structs.KVSRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -723,16 +661,10 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_KVSCheckAndSet(t *testing.T) {
|
func TestFSM_KVSCheckAndSet(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
req := structs.KVSRequest{
|
req := structs.KVSRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -785,16 +717,10 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_SessionCreate_Destroy(t *testing.T) {
|
func TestFSM_SessionCreate_Destroy(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||||
fsm.stateNew.EnsureCheck(2, &structs.HealthCheck{
|
fsm.stateNew.EnsureCheck(2, &structs.HealthCheck{
|
||||||
|
@ -870,16 +796,10 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_KVSLock(t *testing.T) {
|
func TestFSM_KVSLock(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||||
session := &structs.Session{ID: generateUUID(), Node: "foo"}
|
session := &structs.Session{ID: generateUUID(), Node: "foo"}
|
||||||
|
@ -920,16 +840,10 @@ func TestFSM_KVSLock(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_KVSUnlock(t *testing.T) {
|
func TestFSM_KVSUnlock(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||||
session := &structs.Session{ID: generateUUID(), Node: "foo"}
|
session := &structs.Session{ID: generateUUID(), Node: "foo"}
|
||||||
|
@ -988,16 +902,10 @@ func TestFSM_KVSUnlock(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_ACL_Set_Delete(t *testing.T) {
|
func TestFSM_ACL_Set_Delete(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
// Create a new ACL
|
// Create a new ACL
|
||||||
req := structs.ACLRequest{
|
req := structs.ACLRequest{
|
||||||
|
@ -1066,16 +974,10 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_TombstoneReap(t *testing.T) {
|
func TestFSM_TombstoneReap(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
// Create some tombstones
|
// Create some tombstones
|
||||||
fsm.stateNew.KVSSet(11, &structs.DirEntry{
|
fsm.stateNew.KVSSet(11, &structs.DirEntry{
|
||||||
|
@ -1117,16 +1019,10 @@ func TestFSM_TombstoneReap(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFSM_IgnoreUnknown(t *testing.T) {
|
func TestFSM_IgnoreUnknown(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(path)
|
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
|
|
||||||
// Create a new reap request
|
// Create a new reap request
|
||||||
type UnknownRequest struct {
|
type UnknownRequest struct {
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -11,15 +10,10 @@ import (
|
||||||
|
|
||||||
// Testing for GH-300 and GH-279
|
// Testing for GH-300 and GH-279
|
||||||
func TestHealthCheckRace(t *testing.T) {
|
func TestHealthCheckRace(t *testing.T) {
|
||||||
path, err := ioutil.TempDir("", "fsm")
|
fsm, err := NewFSM(nil, os.Stderr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
fsm, err := NewFSM(nil, path, os.Stderr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer fsm.Close()
|
|
||||||
state := fsm.StateNew()
|
state := fsm.StateNew()
|
||||||
|
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
|
|
104
consul/rpc.go
104
consul/rpc.go
|
@ -297,108 +297,8 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
|
||||||
return future.Response(), nil
|
return future.Response(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// blockingRPC is used for queries that need to wait for a
|
// blockingRPCNew is used for queries that need to wait for a minimum index. This
|
||||||
// minimum index. This is used to block and wait for changes.
|
// is used to block and wait for changes.
|
||||||
func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta,
|
|
||||||
tables MDBTables, run func() error) error {
|
|
||||||
opts := blockingRPCOptions{
|
|
||||||
queryOpts: b,
|
|
||||||
queryMeta: m,
|
|
||||||
tables: tables,
|
|
||||||
run: run,
|
|
||||||
}
|
|
||||||
return s.blockingRPCOpt(&opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// blockingRPCOptions is used to parameterize blockingRPCOpt since
|
|
||||||
// it takes so many options. It should be preferred over blockingRPC.
|
|
||||||
type blockingRPCOptions struct {
|
|
||||||
queryOpts *structs.QueryOptions
|
|
||||||
queryMeta *structs.QueryMeta
|
|
||||||
tables MDBTables
|
|
||||||
kvWatch bool
|
|
||||||
kvPrefix string
|
|
||||||
run func() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// blockingRPCOpt is the replacement for blockingRPC as it allows
|
|
||||||
// for more parameterization easily. It should be preferred over blockingRPC.
|
|
||||||
func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
|
|
||||||
var timeout *time.Timer
|
|
||||||
var notifyCh chan struct{}
|
|
||||||
var state *StateStore
|
|
||||||
|
|
||||||
// Fast path non-blocking
|
|
||||||
if opts.queryOpts.MinQueryIndex == 0 {
|
|
||||||
goto RUN_QUERY
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sanity check that we have tables to block on
|
|
||||||
if len(opts.tables) == 0 && !opts.kvWatch {
|
|
||||||
panic("no tables to block on")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restrict the max query time, and ensure there is always one
|
|
||||||
if opts.queryOpts.MaxQueryTime > maxQueryTime {
|
|
||||||
opts.queryOpts.MaxQueryTime = maxQueryTime
|
|
||||||
} else if opts.queryOpts.MaxQueryTime <= 0 {
|
|
||||||
opts.queryOpts.MaxQueryTime = defaultQueryTime
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply a small amount of jitter to the request
|
|
||||||
opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction)
|
|
||||||
|
|
||||||
// Setup a query timeout
|
|
||||||
timeout = time.NewTimer(opts.queryOpts.MaxQueryTime)
|
|
||||||
|
|
||||||
// Setup the notify channel
|
|
||||||
notifyCh = make(chan struct{}, 1)
|
|
||||||
|
|
||||||
// Ensure we tear down any watchers on return
|
|
||||||
state = s.fsm.State()
|
|
||||||
defer func() {
|
|
||||||
timeout.Stop()
|
|
||||||
state.StopWatch(opts.tables, notifyCh)
|
|
||||||
if opts.kvWatch {
|
|
||||||
state.StopWatchKV(opts.kvPrefix, notifyCh)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
REGISTER_NOTIFY:
|
|
||||||
// Register the notification channel. This may be done
|
|
||||||
// multiple times if we have not reached the target wait index.
|
|
||||||
state.Watch(opts.tables, notifyCh)
|
|
||||||
if opts.kvWatch {
|
|
||||||
state.WatchKV(opts.kvPrefix, notifyCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
RUN_QUERY:
|
|
||||||
// Update the query meta data
|
|
||||||
s.setQueryMeta(opts.queryMeta)
|
|
||||||
|
|
||||||
// Check if query must be consistent
|
|
||||||
if opts.queryOpts.RequireConsistent {
|
|
||||||
if err := s.consistentRead(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run the query function
|
|
||||||
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
|
|
||||||
err := opts.run()
|
|
||||||
|
|
||||||
// Check for minimum query time
|
|
||||||
if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
|
|
||||||
select {
|
|
||||||
case <-notifyCh:
|
|
||||||
goto REGISTER_NOTIFY
|
|
||||||
case <-timeout.C:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(slackpad)
|
|
||||||
func (s *Server) blockingRPCNew(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
|
func (s *Server) blockingRPCNew(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
|
||||||
watch state.Watch, run func() error) error {
|
watch state.Watch, run func() error) error {
|
||||||
var timeout *time.Timer
|
var timeout *time.Timer
|
||||||
|
|
|
@ -34,7 +34,6 @@ const (
|
||||||
serfLANSnapshot = "serf/local.snapshot"
|
serfLANSnapshot = "serf/local.snapshot"
|
||||||
serfWANSnapshot = "serf/remote.snapshot"
|
serfWANSnapshot = "serf/remote.snapshot"
|
||||||
raftState = "raft/"
|
raftState = "raft/"
|
||||||
tmpStatePath = "tmp/"
|
|
||||||
snapshotsRetained = 2
|
snapshotsRetained = 2
|
||||||
|
|
||||||
// serverRPCCache controls how long we keep an idle connection
|
// serverRPCCache controls how long we keep an idle connection
|
||||||
|
@ -317,18 +316,9 @@ func (s *Server) setupRaft() error {
|
||||||
s.config.RaftConfig.EnableSingleNode = true
|
s.config.RaftConfig.EnableSingleNode = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the base state path
|
|
||||||
statePath := filepath.Join(s.config.DataDir, tmpStatePath)
|
|
||||||
if err := os.RemoveAll(statePath); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := ensurePath(statePath, true); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the FSM
|
// Create the FSM
|
||||||
var err error
|
var err error
|
||||||
s.fsm, err = NewFSM(s.tombstoneGC, statePath, s.config.LogOutput)
|
s.fsm, err = NewFSM(s.tombstoneGC, s.config.LogOutput)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -491,11 +481,6 @@ func (s *Server) Shutdown() error {
|
||||||
// Close the connection pool
|
// Close the connection pool
|
||||||
s.connPool.Shutdown()
|
s.connPool.Shutdown()
|
||||||
|
|
||||||
// Close the fsm
|
|
||||||
if s.fsm != nil {
|
|
||||||
s.fsm.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue