From 78292662d7e78a14120f7b0dcc1ed4bfed362bd4 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 28 Nov 2017 18:01:17 -0800 Subject: [PATCH] Moves the FSM into its own package. This will help make it clearer what happens when we add some registration plumbing for the different operations and snapshots. --- agent/consul/{ => fsm}/fsm.go | 40 ++++++++++---------- agent/consul/{ => fsm}/fsm_test.go | 59 ++++++++++++++++++------------ agent/consul/issue_test.go | 13 ++++++- agent/consul/rpc_test.go | 18 +++++++++ agent/consul/server.go | 7 ++-- agent/consul/session_ttl_test.go | 10 +++++ 6 files changed, 100 insertions(+), 47 deletions(-) rename agent/consul/{ => fsm}/fsm.go (94%) rename agent/consul/{ => fsm}/fsm_test.go (96%) diff --git a/agent/consul/fsm.go b/agent/consul/fsm/fsm.go similarity index 94% rename from agent/consul/fsm.go rename to agent/consul/fsm/fsm.go index 4c67ddae4e..6defe6ca84 100644 --- a/agent/consul/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -1,4 +1,4 @@ -package consul +package fsm import ( "fmt" @@ -18,10 +18,10 @@ import ( // msgpackHandle is a shared handle for encoding/decoding msgpack payloads var msgpackHandle = &codec.MsgpackHandle{} -// consulFSM implements a finite state machine that is used +// FSM implements a finite state machine that is used // along with Raft to provide strong consistency. We implement // this outside the Server to avoid exposing this outside the package. -type consulFSM struct { +type FSM struct { logOutput io.Writer logger *log.Logger path string @@ -50,14 +50,14 @@ type snapshotHeader struct { LastIndex uint64 } -// NewFSM is used to construct a new FSM with a blank state -func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) { +// New is used to construct a new FSM with a blank state. +func New(gc *state.TombstoneGC, logOutput io.Writer) (*FSM, error) { stateNew, err := state.NewStateStore(gc) if err != nil { return nil, err } - fsm := &consulFSM{ + fsm := &FSM{ logOutput: logOutput, logger: log.New(logOutput, "", log.LstdFlags), state: stateNew, @@ -67,13 +67,13 @@ func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) { } // State is used to return a handle to the current state -func (c *consulFSM) State() *state.Store { +func (c *FSM) State() *state.Store { c.stateLock.RLock() defer c.stateLock.RUnlock() return c.state } -func (c *consulFSM) Apply(log *raft.Log) interface{} { +func (c *FSM) Apply(log *raft.Log) interface{} { buf := log.Data msgType := structs.MessageType(buf[0]) @@ -116,7 +116,7 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { } } -func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} { +func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now()) defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now()) var req structs.RegisterRequest @@ -132,7 +132,7 @@ func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} { return nil } -func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} { +func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now()) defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now()) var req structs.DeregisterRequest @@ -162,7 +162,7 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} { return nil } -func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { +func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} { var req structs.KVSRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) @@ -209,7 +209,7 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { } } -func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} { +func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} { var req structs.SessionRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) @@ -232,7 +232,7 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} } } -func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} { +func (c *FSM) applyACLOperation(buf []byte, index uint64) interface{} { var req structs.ACLRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) @@ -266,7 +266,7 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} { } } -func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{} { +func (c *FSM) applyTombstoneOperation(buf []byte, index uint64) interface{} { var req structs.TombstoneRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) @@ -288,7 +288,7 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{ // them in a single underlying transaction. This interface isn't 1:1 with the outer // update interface that the coordinate endpoint exposes, so we made it single // purpose and avoided the opcode convention. -func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} { +func (c *FSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} { var updates structs.Coordinates if err := structs.Decode(buf, &updates); err != nil { panic(fmt.Errorf("failed to decode batch updates: %v", err)) @@ -303,7 +303,7 @@ func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interfa // applyPreparedQueryOperation applies the given prepared query operation to the // state store. -func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} { +func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} { var req structs.PreparedQueryRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) @@ -324,7 +324,7 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf } } -func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} { +func (c *FSM) applyTxn(buf []byte, index uint64) interface{} { var req structs.TxnRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) @@ -338,7 +338,7 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} { } } -func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} { +func (c *FSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} { var req structs.AutopilotSetConfigRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) @@ -356,7 +356,7 @@ func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} { return c.state.AutopilotSetConfig(index, &req.Config) } -func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { +func (c *FSM) Snapshot() (raft.FSMSnapshot, error) { defer func(start time.Time) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Since(start)) }(time.Now()) @@ -366,7 +366,7 @@ func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { // Restore streams in the snapshot and replaces the current state store with a // new one based on the snapshot if all goes OK during the restore. -func (c *consulFSM) Restore(old io.ReadCloser) error { +func (c *FSM) Restore(old io.ReadCloser) error { defer old.Close() // Create a new state store. diff --git a/agent/consul/fsm_test.go b/agent/consul/fsm/fsm_test.go similarity index 96% rename from agent/consul/fsm_test.go rename to agent/consul/fsm/fsm_test.go index 1e38557903..6b8cf9a129 100644 --- a/agent/consul/fsm_test.go +++ b/agent/consul/fsm/fsm_test.go @@ -1,8 +1,9 @@ -package consul +package fsm import ( "bytes" "fmt" + "math/rand" "os" "reflect" "testing" @@ -15,6 +16,7 @@ import ( "github.com/hashicorp/consul/types" "github.com/hashicorp/go-uuid" "github.com/hashicorp/raft" + "github.com/hashicorp/serf/coordinate" "github.com/pascaldekloe/goe/verify" ) @@ -53,9 +55,20 @@ func generateUUID() (ret string) { return ret } +func generateRandomCoordinate() *coordinate.Coordinate { + config := coordinate.DefaultConfig() + coord := coordinate.NewCoordinate(config) + for i := range coord.Vec { + coord.Vec[i] = rand.NormFloat64() + } + coord.Error = rand.NormFloat64() + coord.Adjustment = rand.NormFloat64() + return coord +} + func TestFSM_RegisterNode(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -99,7 +112,7 @@ func TestFSM_RegisterNode(t *testing.T) { func TestFSM_RegisterNode_Service(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -162,7 +175,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) { func TestFSM_DeregisterService(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -224,7 +237,7 @@ func TestFSM_DeregisterService(t *testing.T) { func TestFSM_DeregisterCheck(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -286,7 +299,7 @@ func TestFSM_DeregisterCheck(t *testing.T) { func TestFSM_DeregisterNode(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -363,7 +376,7 @@ func TestFSM_DeregisterNode(t *testing.T) { func TestFSM_SnapshotRestore(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -459,7 +472,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { } // Try to restore on a new FSM - fsm2, err := NewFSM(nil, os.Stderr) + fsm2, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -636,7 +649,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { func TestFSM_BadRestore(t *testing.T) { t.Parallel() // Create an FSM with some state. - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -674,7 +687,7 @@ func TestFSM_BadRestore(t *testing.T) { func TestFSM_KVSDelete(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -720,7 +733,7 @@ func TestFSM_KVSDelete(t *testing.T) { func TestFSM_KVSDeleteTree(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -767,7 +780,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) { func TestFSM_KVSDeleteCheckAndSet(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -823,7 +836,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) { func TestFSM_KVSCheckAndSet(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -880,7 +893,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) { func TestFSM_CoordinateUpdate(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -921,7 +934,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) { func TestFSM_SessionCreate_Destroy(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -1001,7 +1014,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) { func TestFSM_KVSLock(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -1046,7 +1059,7 @@ func TestFSM_KVSLock(t *testing.T) { func TestFSM_KVSUnlock(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -1109,7 +1122,7 @@ func TestFSM_KVSUnlock(t *testing.T) { func TestFSM_ACL_CRUD(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -1229,7 +1242,7 @@ func TestFSM_ACL_CRUD(t *testing.T) { func TestFSM_PreparedQuery_CRUD(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -1327,7 +1340,7 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) { func TestFSM_TombstoneReap(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -1375,7 +1388,7 @@ func TestFSM_TombstoneReap(t *testing.T) { func TestFSM_Txn(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -1417,7 +1430,7 @@ func TestFSM_Txn(t *testing.T) { func TestFSM_Autopilot(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -1479,7 +1492,7 @@ func TestFSM_Autopilot(t *testing.T) { func TestFSM_IgnoreUnknown(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } diff --git a/agent/consul/issue_test.go b/agent/consul/issue_test.go index 9f7581ed0b..f514642ab9 100644 --- a/agent/consul/issue_test.go +++ b/agent/consul/issue_test.go @@ -5,14 +5,25 @@ import ( "reflect" "testing" + consulfsm "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/raft" ) +func makeLog(buf []byte) *raft.Log { + return &raft.Log{ + Index: 1, + Term: 1, + Type: raft.LogCommand, + Data: buf, + } +} + // Testing for GH-300 and GH-279 func TestHealthCheckRace(t *testing.T) { t.Parallel() - fsm, err := NewFSM(nil, os.Stderr) + fsm, err := consulfsm.New(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 2551878313..7baa3f2359 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -77,6 +77,24 @@ func TestRPC_NoLeader_Retry(t *testing.T) { } } +type MockSink struct { + *bytes.Buffer + cancel bool +} + +func (m *MockSink) ID() string { + return "Mock" +} + +func (m *MockSink) Cancel() error { + m.cancel = true + return nil +} + +func (m *MockSink) Close() error { + return nil +} + func TestRPC_blockingQuery(t *testing.T) { t.Parallel() dir, s := testServer(t) diff --git a/agent/consul/server.go b/agent/consul/server.go index 5edc49294b..bb64e96f7e 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -18,6 +18,7 @@ import ( "time" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" @@ -122,7 +123,7 @@ type Server struct { // fsm is the state machine used with Raft to provide // strong consistency. - fsm *consulFSM + fsm *fsm.FSM // Logger uses the provided LogOutput logger *log.Logger @@ -447,7 +448,7 @@ func (s *Server) setupRaft() error { // Create the FSM. var err error - s.fsm, err = NewFSM(s.tombstoneGC, s.config.LogOutput) + s.fsm, err = fsm.New(s.tombstoneGC, s.config.LogOutput) if err != nil { return err } @@ -554,7 +555,7 @@ func (s *Server) setupRaft() error { return fmt.Errorf("recovery failed to parse peers.json: %v", err) } - tmpFsm, err := NewFSM(s.tombstoneGC, s.config.LogOutput) + tmpFsm, err := fsm.New(s.tombstoneGC, s.config.LogOutput) if err != nil { return fmt.Errorf("recovery failed to make temp FSM: %v", err) } diff --git a/agent/consul/session_ttl_test.go b/agent/consul/session_ttl_test.go index 2bc7dc7aae..5ede76462e 100644 --- a/agent/consul/session_ttl_test.go +++ b/agent/consul/session_ttl_test.go @@ -1,6 +1,7 @@ package consul import ( + "fmt" "os" "strings" "testing" @@ -9,9 +10,18 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testutil/retry" + "github.com/hashicorp/go-uuid" "github.com/hashicorp/net-rpc-msgpackrpc" ) +func generateUUID() (ret string) { + var err error + if ret, err = uuid.GenerateUUID(); err != nil { + panic(fmt.Sprintf("Unable to generate a UUID, %v", err)) + } + return ret +} + func TestInitializeSessionTimers(t *testing.T) { t.Parallel() dir1, s1 := testServer(t)