Moves the FSM into its own package.

This will help make it clearer what happens when we add some registration
plumbing for the different operations and snapshots.
This commit is contained in:
James Phillips 2017-11-28 18:01:17 -08:00
parent e810697e06
commit 78292662d7
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
6 changed files with 100 additions and 47 deletions

View File

@ -1,4 +1,4 @@
package consul
package fsm
import (
"fmt"
@ -18,10 +18,10 @@ import (
// msgpackHandle is a shared handle for encoding/decoding msgpack payloads
var msgpackHandle = &codec.MsgpackHandle{}
// consulFSM implements a finite state machine that is used
// FSM implements a finite state machine that is used
// along with Raft to provide strong consistency. We implement
// this outside the Server to avoid exposing this outside the package.
type consulFSM struct {
type FSM struct {
logOutput io.Writer
logger *log.Logger
path string
@ -50,14 +50,14 @@ type snapshotHeader struct {
LastIndex uint64
}
// NewFSM is used to construct a new FSM with a blank state
func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) {
// New is used to construct a new FSM with a blank state.
func New(gc *state.TombstoneGC, logOutput io.Writer) (*FSM, error) {
stateNew, err := state.NewStateStore(gc)
if err != nil {
return nil, err
}
fsm := &consulFSM{
fsm := &FSM{
logOutput: logOutput,
logger: log.New(logOutput, "", log.LstdFlags),
state: stateNew,
@ -67,13 +67,13 @@ func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) {
}
// State is used to return a handle to the current state
func (c *consulFSM) State() *state.Store {
func (c *FSM) State() *state.Store {
c.stateLock.RLock()
defer c.stateLock.RUnlock()
return c.state
}
func (c *consulFSM) Apply(log *raft.Log) interface{} {
func (c *FSM) Apply(log *raft.Log) interface{} {
buf := log.Data
msgType := structs.MessageType(buf[0])
@ -116,7 +116,7 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
}
}
func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} {
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now())
var req structs.RegisterRequest
@ -132,7 +132,7 @@ func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} {
return nil
}
func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now())
var req structs.DeregisterRequest
@ -162,7 +162,7 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
return nil
}
func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} {
var req structs.KVSRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
@ -209,7 +209,7 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
}
}
func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} {
func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} {
var req structs.SessionRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
@ -232,7 +232,7 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{}
}
}
func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
func (c *FSM) applyACLOperation(buf []byte, index uint64) interface{} {
var req structs.ACLRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
@ -266,7 +266,7 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
}
}
func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{} {
func (c *FSM) applyTombstoneOperation(buf []byte, index uint64) interface{} {
var req structs.TombstoneRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
@ -288,7 +288,7 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
// them in a single underlying transaction. This interface isn't 1:1 with the outer
// update interface that the coordinate endpoint exposes, so we made it single
// purpose and avoided the opcode convention.
func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
func (c *FSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
var updates structs.Coordinates
if err := structs.Decode(buf, &updates); err != nil {
panic(fmt.Errorf("failed to decode batch updates: %v", err))
@ -303,7 +303,7 @@ func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interfa
// applyPreparedQueryOperation applies the given prepared query operation to the
// state store.
func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} {
func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} {
var req structs.PreparedQueryRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
@ -324,7 +324,7 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf
}
}
func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
func (c *FSM) applyTxn(buf []byte, index uint64) interface{} {
var req structs.TxnRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
@ -338,7 +338,7 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
}
}
func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
func (c *FSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
var req structs.AutopilotSetConfigRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
@ -356,7 +356,7 @@ func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
return c.state.AutopilotSetConfig(index, &req.Config)
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
func (c *FSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Since(start))
}(time.Now())
@ -366,7 +366,7 @@ func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
// Restore streams in the snapshot and replaces the current state store with a
// new one based on the snapshot if all goes OK during the restore.
func (c *consulFSM) Restore(old io.ReadCloser) error {
func (c *FSM) Restore(old io.ReadCloser) error {
defer old.Close()
// Create a new state store.

View File

@ -1,8 +1,9 @@
package consul
package fsm
import (
"bytes"
"fmt"
"math/rand"
"os"
"reflect"
"testing"
@ -15,6 +16,7 @@ import (
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/coordinate"
"github.com/pascaldekloe/goe/verify"
)
@ -53,9 +55,20 @@ func generateUUID() (ret string) {
return ret
}
func generateRandomCoordinate() *coordinate.Coordinate {
config := coordinate.DefaultConfig()
coord := coordinate.NewCoordinate(config)
for i := range coord.Vec {
coord.Vec[i] = rand.NormFloat64()
}
coord.Error = rand.NormFloat64()
coord.Adjustment = rand.NormFloat64()
return coord
}
func TestFSM_RegisterNode(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -99,7 +112,7 @@ func TestFSM_RegisterNode(t *testing.T) {
func TestFSM_RegisterNode_Service(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -162,7 +175,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
func TestFSM_DeregisterService(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -224,7 +237,7 @@ func TestFSM_DeregisterService(t *testing.T) {
func TestFSM_DeregisterCheck(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -286,7 +299,7 @@ func TestFSM_DeregisterCheck(t *testing.T) {
func TestFSM_DeregisterNode(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -363,7 +376,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
func TestFSM_SnapshotRestore(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -459,7 +472,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Try to restore on a new FSM
fsm2, err := NewFSM(nil, os.Stderr)
fsm2, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -636,7 +649,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
func TestFSM_BadRestore(t *testing.T) {
t.Parallel()
// Create an FSM with some state.
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -674,7 +687,7 @@ func TestFSM_BadRestore(t *testing.T) {
func TestFSM_KVSDelete(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -720,7 +733,7 @@ func TestFSM_KVSDelete(t *testing.T) {
func TestFSM_KVSDeleteTree(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -767,7 +780,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -823,7 +836,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
func TestFSM_KVSCheckAndSet(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -880,7 +893,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
func TestFSM_CoordinateUpdate(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -921,7 +934,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
func TestFSM_SessionCreate_Destroy(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1001,7 +1014,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
func TestFSM_KVSLock(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1046,7 +1059,7 @@ func TestFSM_KVSLock(t *testing.T) {
func TestFSM_KVSUnlock(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1109,7 +1122,7 @@ func TestFSM_KVSUnlock(t *testing.T) {
func TestFSM_ACL_CRUD(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1229,7 +1242,7 @@ func TestFSM_ACL_CRUD(t *testing.T) {
func TestFSM_PreparedQuery_CRUD(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1327,7 +1340,7 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) {
func TestFSM_TombstoneReap(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1375,7 +1388,7 @@ func TestFSM_TombstoneReap(t *testing.T) {
func TestFSM_Txn(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1417,7 +1430,7 @@ func TestFSM_Txn(t *testing.T) {
func TestFSM_Autopilot(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1479,7 +1492,7 @@ func TestFSM_Autopilot(t *testing.T) {
func TestFSM_IgnoreUnknown(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -5,14 +5,25 @@ import (
"reflect"
"testing"
consulfsm "github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/raft"
)
func makeLog(buf []byte) *raft.Log {
return &raft.Log{
Index: 1,
Term: 1,
Type: raft.LogCommand,
Data: buf,
}
}
// Testing for GH-300 and GH-279
func TestHealthCheckRace(t *testing.T) {
t.Parallel()
fsm, err := NewFSM(nil, os.Stderr)
fsm, err := consulfsm.New(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -77,6 +77,24 @@ func TestRPC_NoLeader_Retry(t *testing.T) {
}
}
type MockSink struct {
*bytes.Buffer
cancel bool
}
func (m *MockSink) ID() string {
return "Mock"
}
func (m *MockSink) Cancel() error {
m.cancel = true
return nil
}
func (m *MockSink) Close() error {
return nil
}
func TestRPC_blockingQuery(t *testing.T) {
t.Parallel()
dir, s := testServer(t)

View File

@ -18,6 +18,7 @@ import (
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
@ -122,7 +123,7 @@ type Server struct {
// fsm is the state machine used with Raft to provide
// strong consistency.
fsm *consulFSM
fsm *fsm.FSM
// Logger uses the provided LogOutput
logger *log.Logger
@ -447,7 +448,7 @@ func (s *Server) setupRaft() error {
// Create the FSM.
var err error
s.fsm, err = NewFSM(s.tombstoneGC, s.config.LogOutput)
s.fsm, err = fsm.New(s.tombstoneGC, s.config.LogOutput)
if err != nil {
return err
}
@ -554,7 +555,7 @@ func (s *Server) setupRaft() error {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
tmpFsm, err := NewFSM(s.tombstoneGC, s.config.LogOutput)
tmpFsm, err := fsm.New(s.tombstoneGC, s.config.LogOutput)
if err != nil {
return fmt.Errorf("recovery failed to make temp FSM: %v", err)
}

View File

@ -1,6 +1,7 @@
package consul
import (
"fmt"
"os"
"strings"
"testing"
@ -9,9 +10,18 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func generateUUID() (ret string) {
var err error
if ret, err = uuid.GenerateUUID(); err != nil {
panic(fmt.Sprintf("Unable to generate a UUID, %v", err))
}
return ret
}
func TestInitializeSessionTimers(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)