From c8e763667f5f7ce5fe257b46f51e23d8e8b94de4 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 29 Nov 2017 12:43:27 -0800 Subject: [PATCH] Creates a registration mechanism for FSM commands. --- agent/consul/fsm/commands_oss.go | 263 ++++++ agent/consul/fsm/commands_oss_test.go | 1149 +++++++++++++++++++++++++ agent/consul/fsm/fsm.go | 311 +------ agent/consul/fsm/fsm_test.go | 1137 ------------------------ 4 files changed, 1457 insertions(+), 1403 deletions(-) create mode 100644 agent/consul/fsm/commands_oss.go create mode 100644 agent/consul/fsm/commands_oss_test.go diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go new file mode 100644 index 0000000000..2029f1723e --- /dev/null +++ b/agent/consul/fsm/commands_oss.go @@ -0,0 +1,263 @@ +package fsm + +import ( + "fmt" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" +) + +func init() { + registerCommand(structs.RegisterRequestType, (*FSM).applyRegister) + registerCommand(structs.DeregisterRequestType, (*FSM).applyDeregister) + registerCommand(structs.KVSRequestType, (*FSM).applyKVSOperation) + registerCommand(structs.SessionRequestType, (*FSM).applySessionOperation) + registerCommand(structs.ACLRequestType, (*FSM).applyACLOperation) + registerCommand(structs.TombstoneRequestType, (*FSM).applyTombstoneOperation) + registerCommand(structs.CoordinateBatchUpdateType, (*FSM).applyCoordinateBatchUpdate) + registerCommand(structs.PreparedQueryRequestType, (*FSM).applyPreparedQueryOperation) + registerCommand(structs.TxnRequestType, (*FSM).applyTxn) + registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate) +} + +func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now()) + defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now()) + var req structs.RegisterRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + // Apply all updates in a single transaction + if err := c.state.EnsureRegistration(index, &req); err != nil { + c.logger.Printf("[WARN] consul.fsm: EnsureRegistration failed: %v", err) + return err + } + return nil +} + +func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now()) + defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now()) + var req structs.DeregisterRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + // Either remove the service entry or the whole node. The precedence + // here is also baked into vetDeregisterWithACL() in acl.go, so if you + // make changes here, be sure to also adjust the code over there. + if req.ServiceID != "" { + if err := c.state.DeleteService(index, req.Node, req.ServiceID); err != nil { + c.logger.Printf("[WARN] consul.fsm: DeleteNodeService failed: %v", err) + return err + } + } else if req.CheckID != "" { + if err := c.state.DeleteCheck(index, req.Node, req.CheckID); err != nil { + c.logger.Printf("[WARN] consul.fsm: DeleteNodeCheck failed: %v", err) + return err + } + } else { + if err := c.state.DeleteNode(index, req.Node); err != nil { + c.logger.Printf("[WARN] consul.fsm: DeleteNode failed: %v", err) + return err + } + } + return nil +} + +func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} { + var req structs.KVSRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "kvs"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + switch req.Op { + case api.KVSet: + return c.state.KVSSet(index, &req.DirEnt) + case api.KVDelete: + return c.state.KVSDelete(index, req.DirEnt.Key) + case api.KVDeleteCAS: + act, err := c.state.KVSDeleteCAS(index, req.DirEnt.ModifyIndex, req.DirEnt.Key) + if err != nil { + return err + } + return act + case api.KVDeleteTree: + return c.state.KVSDeleteTree(index, req.DirEnt.Key) + case api.KVCAS: + act, err := c.state.KVSSetCAS(index, &req.DirEnt) + if err != nil { + return err + } + return act + case api.KVLock: + act, err := c.state.KVSLock(index, &req.DirEnt) + if err != nil { + return err + } + return act + case api.KVUnlock: + act, err := c.state.KVSUnlock(index, &req.DirEnt) + if err != nil { + return err + } + return act + default: + err := fmt.Errorf("Invalid KVS operation '%s'", req.Op) + c.logger.Printf("[WARN] consul.fsm: %v", err) + return err + } +} + +func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} { + var req structs.SessionRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "session"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + switch req.Op { + case structs.SessionCreate: + if err := c.state.SessionCreate(index, &req.Session); err != nil { + return err + } + return req.Session.ID + case structs.SessionDestroy: + return c.state.SessionDestroy(index, req.Session.ID) + default: + c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op) + return fmt.Errorf("Invalid Session operation '%s'", req.Op) + } +} + +func (c *FSM) applyACLOperation(buf []byte, index uint64) interface{} { + var req structs.ACLRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "acl"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + switch req.Op { + case structs.ACLBootstrapInit: + enabled, err := c.state.ACLBootstrapInit(index) + if err != nil { + return err + } + return enabled + case structs.ACLBootstrapNow: + if err := c.state.ACLBootstrap(index, &req.ACL); err != nil { + return err + } + return &req.ACL + case structs.ACLForceSet, structs.ACLSet: + if err := c.state.ACLSet(index, &req.ACL); err != nil { + return err + } + return req.ACL.ID + case structs.ACLDelete: + return c.state.ACLDelete(index, req.ACL.ID) + default: + c.logger.Printf("[WARN] consul.fsm: Invalid ACL operation '%s'", req.Op) + return fmt.Errorf("Invalid ACL operation '%s'", req.Op) + } +} + +func (c *FSM) applyTombstoneOperation(buf []byte, index uint64) interface{} { + var req structs.TombstoneRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "tombstone"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + defer metrics.MeasureSinceWithLabels([]string{"fsm", "tombstone"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + switch req.Op { + case structs.TombstoneReap: + return c.state.ReapTombstones(req.ReapIndex) + default: + c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op) + return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op) + } +} + +// applyCoordinateBatchUpdate processes a batch of coordinate updates and applies +// them in a single underlying transaction. This interface isn't 1:1 with the outer +// update interface that the coordinate endpoint exposes, so we made it single +// purpose and avoided the opcode convention. +func (c *FSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} { + var updates structs.Coordinates + if err := structs.Decode(buf, &updates); err != nil { + panic(fmt.Errorf("failed to decode batch updates: %v", err)) + } + defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", "batch-update"}, time.Now()) + defer metrics.MeasureSince([]string{"fsm", "coordinate", "batch-update"}, time.Now()) + if err := c.state.CoordinateBatchUpdate(index, updates); err != nil { + return err + } + return nil +} + +// applyPreparedQueryOperation applies the given prepared query operation to the +// state store. +func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} { + var req structs.PreparedQueryRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "prepared-query"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + defer metrics.MeasureSinceWithLabels([]string{"fsm", "prepared-query"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + switch req.Op { + case structs.PreparedQueryCreate, structs.PreparedQueryUpdate: + return c.state.PreparedQuerySet(index, req.Query) + case structs.PreparedQueryDelete: + return c.state.PreparedQueryDelete(index, req.Query.ID) + default: + c.logger.Printf("[WARN] consul.fsm: Invalid PreparedQuery operation '%s'", req.Op) + return fmt.Errorf("Invalid PreparedQuery operation '%s'", req.Op) + } +} + +func (c *FSM) applyTxn(buf []byte, index uint64) interface{} { + var req structs.TxnRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now()) + defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now()) + results, errors := c.state.TxnRW(index, req.Ops) + return structs.TxnResponse{ + Results: results, + Errors: errors, + } +} + +func (c *FSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} { + var req structs.AutopilotSetConfigRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now()) + defer metrics.MeasureSince([]string{"fsm", "autopilot"}, time.Now()) + + if req.CAS { + act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config) + if err != nil { + return err + } + return act + } + return c.state.AutopilotSetConfig(index, &req.Config) +} diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go new file mode 100644 index 0000000000..b98d9ea5e3 --- /dev/null +++ b/agent/consul/fsm/commands_oss_test.go @@ -0,0 +1,1149 @@ +package fsm + +import ( + "fmt" + "math/rand" + "os" + "reflect" + "testing" + "time" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/serf/coordinate" + "github.com/pascaldekloe/goe/verify" +) + +func generateUUID() (ret string) { + var err error + if ret, err = uuid.GenerateUUID(); err != nil { + panic(fmt.Sprintf("Unable to generate a UUID, %v", err)) + } + return ret +} + +func generateRandomCoordinate() *coordinate.Coordinate { + config := coordinate.DefaultConfig() + coord := coordinate.NewCoordinate(config) + for i := range coord.Vec { + coord.Vec[i] = rand.NormFloat64() + } + coord.Error = rand.NormFloat64() + coord.Adjustment = rand.NormFloat64() + return coord +} + +func TestFSM_RegisterNode(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + } + buf, err := structs.Encode(structs.RegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + _, node, err := fsm.state.GetNode("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if node == nil { + t.Fatalf("not found!") + } + if node.ModifyIndex != 1 { + t.Fatalf("bad index: %d", node.ModifyIndex) + } + + // Verify service registered + _, services, err := fsm.state.NodeServices(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if len(services.Services) != 0 { + t.Fatalf("Services: %v", services) + } +} + +func TestFSM_RegisterNode_Service(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"master"}, + Port: 8000, + }, + Check: &structs.HealthCheck{ + Node: "foo", + CheckID: "db", + Name: "db connectivity", + Status: api.HealthPassing, + ServiceID: "db", + }, + } + buf, err := structs.Encode(structs.RegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + _, node, err := fsm.state.GetNode("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if node == nil { + t.Fatalf("not found!") + } + + // Verify service registered + _, services, err := fsm.state.NodeServices(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if _, ok := services.Services["db"]; !ok { + t.Fatalf("not registered!") + } + + // Verify check + _, checks, err := fsm.state.NodeChecks(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if checks[0].CheckID != "db" { + t.Fatalf("not registered!") + } +} + +func TestFSM_DeregisterService(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"master"}, + Port: 8000, + }, + } + buf, err := structs.Encode(structs.RegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + dereg := structs.DeregisterRequest{ + Datacenter: "dc1", + Node: "foo", + ServiceID: "db", + } + buf, err = structs.Encode(structs.DeregisterRequestType, dereg) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + _, node, err := fsm.state.GetNode("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if node == nil { + t.Fatalf("not found!") + } + + // Verify service not registered + _, services, err := fsm.state.NodeServices(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if _, ok := services.Services["db"]; ok { + t.Fatalf("db registered!") + } +} + +func TestFSM_DeregisterCheck(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Check: &structs.HealthCheck{ + Node: "foo", + CheckID: "mem", + Name: "memory util", + Status: api.HealthPassing, + }, + } + buf, err := structs.Encode(structs.RegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + dereg := structs.DeregisterRequest{ + Datacenter: "dc1", + Node: "foo", + CheckID: "mem", + } + buf, err = structs.Encode(structs.DeregisterRequestType, dereg) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + _, node, err := fsm.state.GetNode("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if node == nil { + t.Fatalf("not found!") + } + + // Verify check not registered + _, checks, err := fsm.state.NodeChecks(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if len(checks) != 0 { + t.Fatalf("check registered!") + } +} + +func TestFSM_DeregisterNode(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"master"}, + Port: 8000, + }, + Check: &structs.HealthCheck{ + Node: "foo", + CheckID: "db", + Name: "db connectivity", + Status: api.HealthPassing, + ServiceID: "db", + }, + } + buf, err := structs.Encode(structs.RegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + dereg := structs.DeregisterRequest{ + Datacenter: "dc1", + Node: "foo", + } + buf, err = structs.Encode(structs.DeregisterRequestType, dereg) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are not registered + _, node, err := fsm.state.GetNode("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if node != nil { + t.Fatalf("found!") + } + + // Verify service not registered + _, services, err := fsm.state.NodeServices(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if services != nil { + t.Fatalf("Services: %v", services) + } + + // Verify checks not registered + _, checks, err := fsm.state.NodeChecks(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if len(checks) != 0 { + t.Fatalf("Services: %v", services) + } +} + +func TestFSM_KVSDelete(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: api.KVSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Run the delete + req.Op = api.KVDelete + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is not set + _, d, err := fsm.state.KVSGet(nil, "/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d != nil { + t.Fatalf("key present") + } +} + +func TestFSM_KVSDeleteTree(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: api.KVSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Run the delete tree + req.Op = api.KVDeleteTree + req.DirEnt.Key = "/test" + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is not set + _, d, err := fsm.state.KVSGet(nil, "/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d != nil { + t.Fatalf("key present") + } +} + +func TestFSM_KVSDeleteCheckAndSet(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: api.KVSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is set + _, d, err := fsm.state.KVSGet(nil, "/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("key missing") + } + + // Run the check-and-set + req.Op = api.KVDeleteCAS + req.DirEnt.ModifyIndex = d.ModifyIndex + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp.(bool) != true { + t.Fatalf("resp: %v", resp) + } + + // Verify key is gone + _, d, err = fsm.state.KVSGet(nil, "/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d != nil { + t.Fatalf("bad: %v", d) + } +} + +func TestFSM_KVSCheckAndSet(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: api.KVSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is set + _, d, err := fsm.state.KVSGet(nil, "/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("key missing") + } + + // Run the check-and-set + req.Op = api.KVCAS + req.DirEnt.ModifyIndex = d.ModifyIndex + req.DirEnt.Value = []byte("zip") + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp.(bool) != true { + t.Fatalf("resp: %v", resp) + } + + // Verify key is updated + _, d, err = fsm.state.KVSGet(nil, "/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if string(d.Value) != "zip" { + t.Fatalf("bad: %v", d) + } +} + +func TestFSM_KVSLock(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + session := &structs.Session{ID: generateUUID(), Node: "foo"} + fsm.state.SessionCreate(2, session) + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: api.KVLock, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Value: []byte("test"), + Session: session.ID, + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != true { + t.Fatalf("resp: %v", resp) + } + + // Verify key is locked + _, d, err := fsm.state.KVSGet(nil, "/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("missing") + } + if d.LockIndex != 1 { + t.Fatalf("bad: %v", *d) + } + if d.Session != session.ID { + t.Fatalf("bad: %v", *d) + } +} + +func TestFSM_KVSUnlock(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + session := &structs.Session{ID: generateUUID(), Node: "foo"} + fsm.state.SessionCreate(2, session) + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: api.KVLock, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Value: []byte("test"), + Session: session.ID, + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != true { + t.Fatalf("resp: %v", resp) + } + + req = structs.KVSRequest{ + Datacenter: "dc1", + Op: api.KVUnlock, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Value: []byte("test"), + Session: session.ID, + }, + } + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != true { + t.Fatalf("resp: %v", resp) + } + + // Verify key is unlocked + _, d, err := fsm.state.KVSGet(nil, "/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("missing") + } + if d.LockIndex != 1 { + t.Fatalf("bad: %v", *d) + } + if d.Session != "" { + t.Fatalf("bad: %v", *d) + } +} + +func TestFSM_CoordinateUpdate(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Register some nodes. + fsm.state.EnsureNode(1, &structs.Node{Node: "node1", Address: "127.0.0.1"}) + fsm.state.EnsureNode(2, &structs.Node{Node: "node2", Address: "127.0.0.1"}) + + // Write a batch of two coordinates. + updates := structs.Coordinates{ + &structs.Coordinate{ + Node: "node1", + Coord: generateRandomCoordinate(), + }, + &structs.Coordinate{ + Node: "node2", + Coord: generateRandomCoordinate(), + }, + } + buf, err := structs.Encode(structs.CoordinateBatchUpdateType, updates) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Read back the two coordinates to make sure they got updated. + _, coords, err := fsm.state.Coordinates(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if !reflect.DeepEqual(coords, updates) { + t.Fatalf("bad: %#v", coords) + } +} + +func TestFSM_SessionCreate_Destroy(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + fsm.state.EnsureCheck(2, &structs.HealthCheck{ + Node: "foo", + CheckID: "web", + Status: api.HealthPassing, + }) + + // Create a new session + req := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + ID: generateUUID(), + Node: "foo", + Checks: []types.CheckID{"web"}, + }, + } + buf, err := structs.Encode(structs.SessionRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if err, ok := resp.(error); ok { + t.Fatalf("resp: %v", err) + } + + // Get the session + id := resp.(string) + _, session, err := fsm.state.SessionGet(nil, id) + if err != nil { + t.Fatalf("err: %v", err) + } + if session == nil { + t.Fatalf("missing") + } + + // Verify the session + if session.ID != id { + t.Fatalf("bad: %v", *session) + } + if session.Node != "foo" { + t.Fatalf("bad: %v", *session) + } + if session.Checks[0] != "web" { + t.Fatalf("bad: %v", *session) + } + + // Try to destroy + destroy := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionDestroy, + Session: structs.Session{ + ID: id, + }, + } + buf, err = structs.Encode(structs.SessionRequestType, destroy) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + _, session, err = fsm.state.SessionGet(nil, id) + if err != nil { + t.Fatalf("err: %v", err) + } + if session != nil { + t.Fatalf("should be destroyed") + } +} + +func TestFSM_ACL_CRUD(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a new ACL. + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + ID: generateUUID(), + Name: "User token", + Type: structs.ACLTypeClient, + }, + } + buf, err := structs.Encode(structs.ACLRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if err, ok := resp.(error); ok { + t.Fatalf("resp: %v", err) + } + + // Get the ACL. + id := resp.(string) + _, acl, err := fsm.state.ACLGet(nil, id) + if err != nil { + t.Fatalf("err: %v", err) + } + if acl == nil { + t.Fatalf("missing") + } + + // Verify the ACL. + if acl.ID != id { + t.Fatalf("bad: %v", *acl) + } + if acl.Name != "User token" { + t.Fatalf("bad: %v", *acl) + } + if acl.Type != structs.ACLTypeClient { + t.Fatalf("bad: %v", *acl) + } + + // Try to destroy. + destroy := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLDelete, + ACL: structs.ACL{ + ID: id, + }, + } + buf, err = structs.Encode(structs.ACLRequestType, destroy) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + _, acl, err = fsm.state.ACLGet(nil, id) + if err != nil { + t.Fatalf("err: %v", err) + } + if acl != nil { + t.Fatalf("should be destroyed") + } + + // Initialize bootstrap (should work since we haven't made a management + // token). + init := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLBootstrapInit, + } + buf, err = structs.Encode(structs.ACLRequestType, init) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if enabled, ok := resp.(bool); !ok || !enabled { + t.Fatalf("resp: %v", resp) + } + gotB, err := fsm.state.ACLGetBootstrap() + if err != nil { + t.Fatalf("err: %v", err) + } + wantB := &structs.ACLBootstrap{ + AllowBootstrap: true, + RaftIndex: gotB.RaftIndex, + } + verify.Values(t, "", gotB, wantB) + + // Do a bootstrap. + bootstrap := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLBootstrapNow, + ACL: structs.ACL{ + ID: generateUUID(), + Name: "Bootstrap Token", + Type: structs.ACLTypeManagement, + }, + } + buf, err = structs.Encode(structs.ACLRequestType, bootstrap) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + respACL, ok := resp.(*structs.ACL) + if !ok { + t.Fatalf("resp: %v", resp) + } + bootstrap.ACL.CreateIndex = respACL.CreateIndex + bootstrap.ACL.ModifyIndex = respACL.ModifyIndex + verify.Values(t, "", respACL, &bootstrap.ACL) +} + +func TestFSM_PreparedQuery_CRUD(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Register a service to query on. + fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + fsm.state.EnsureService(2, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80}) + + // Create a new query. + query := structs.PreparedQueryRequest{ + Op: structs.PreparedQueryCreate, + Query: &structs.PreparedQuery{ + ID: generateUUID(), + Service: structs.ServiceQuery{ + Service: "web", + }, + }, + } + { + buf, err := structs.Encode(structs.PreparedQueryRequestType, query) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + } + + // Verify it's in the state store. + { + _, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID) + if err != nil { + t.Fatalf("err: %s", err) + } + + actual.CreateIndex, actual.ModifyIndex = 0, 0 + if !reflect.DeepEqual(actual, query.Query) { + t.Fatalf("bad: %v", actual) + } + } + + // Make an update to the query. + query.Op = structs.PreparedQueryUpdate + query.Query.Name = "my-query" + { + buf, err := structs.Encode(structs.PreparedQueryRequestType, query) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + } + + // Verify the update. + { + _, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID) + if err != nil { + t.Fatalf("err: %s", err) + } + + actual.CreateIndex, actual.ModifyIndex = 0, 0 + if !reflect.DeepEqual(actual, query.Query) { + t.Fatalf("bad: %v", actual) + } + } + + // Delete the query. + query.Op = structs.PreparedQueryDelete + { + buf, err := structs.Encode(structs.PreparedQueryRequestType, query) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + } + + // Make sure it's gone. + { + _, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID) + if err != nil { + t.Fatalf("err: %s", err) + } + + if actual != nil { + t.Fatalf("bad: %v", actual) + } + } +} + +func TestFSM_TombstoneReap(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create some tombstones + fsm.state.KVSSet(11, &structs.DirEntry{ + Key: "/remove", + Value: []byte("foo"), + }) + fsm.state.KVSDelete(12, "/remove") + idx, _, err := fsm.state.KVSList(nil, "/remove") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 12 { + t.Fatalf("bad index: %d", idx) + } + + // Create a new reap request + req := structs.TombstoneRequest{ + Datacenter: "dc1", + Op: structs.TombstoneReap, + ReapIndex: 12, + } + buf, err := structs.Encode(structs.TombstoneRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if err, ok := resp.(error); ok { + t.Fatalf("resp: %v", err) + } + + // Verify the tombstones are gone + snap := fsm.state.Snapshot() + defer snap.Close() + stones, err := snap.Tombstones() + if err != nil { + t.Fatalf("err: %s", err) + } + if stones.Next() != nil { + t.Fatalf("unexpected extra tombstones") + } +} + +func TestFSM_Txn(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Set a key using a transaction. + req := structs.TxnRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: api.KVSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + }, + }, + }, + } + buf, err := structs.Encode(structs.TxnRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if _, ok := resp.(structs.TxnResponse); !ok { + t.Fatalf("bad response type: %T", resp) + } + + // Verify key is set directly in the state store. + _, d, err := fsm.state.KVSGet(nil, "/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("missing") + } +} + +func TestFSM_Autopilot(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Set the autopilot config using a request. + req := structs.AutopilotSetConfigRequest{ + Datacenter: "dc1", + Config: structs.AutopilotConfig{ + CleanupDeadServers: true, + LastContactThreshold: 10 * time.Second, + MaxTrailingLogs: 300, + }, + } + buf, err := structs.Encode(structs.AutopilotRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if _, ok := resp.(error); ok { + t.Fatalf("bad: %v", resp) + } + + // Verify key is set directly in the state store. + _, config, err := fsm.state.AutopilotConfig() + if err != nil { + t.Fatalf("err: %v", err) + } + if config.CleanupDeadServers != req.Config.CleanupDeadServers { + t.Fatalf("bad: %v", config.CleanupDeadServers) + } + if config.LastContactThreshold != req.Config.LastContactThreshold { + t.Fatalf("bad: %v", config.LastContactThreshold) + } + if config.MaxTrailingLogs != req.Config.MaxTrailingLogs { + t.Fatalf("bad: %v", config.MaxTrailingLogs) + } + + // Now use CAS and provide an old index + req.CAS = true + req.Config.CleanupDeadServers = false + req.Config.ModifyIndex = config.ModifyIndex - 1 + buf, err = structs.Encode(structs.AutopilotRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if _, ok := resp.(error); ok { + t.Fatalf("bad: %v", resp) + } + + _, config, err = fsm.state.AutopilotConfig() + if err != nil { + t.Fatalf("err: %v", err) + } + if !config.CleanupDeadServers { + t.Fatalf("bad: %v", config.CleanupDeadServers) + } +} diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index 6defe6ca84..ea71abb607 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -10,7 +10,6 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/api" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/raft" ) @@ -18,6 +17,28 @@ import ( // msgpackHandle is a shared handle for encoding/decoding msgpack payloads var msgpackHandle = &codec.MsgpackHandle{} +// command is a command method on the FSM. +type command func(buf []byte, index uint64) interface{} + +// unboundCommand is a command method on the FSM, not yet bound to an FSM +// instance. +type unboundCommand func(c *FSM, buf []byte, index uint64) interface{} + +// commands is a map from message type to unbound command. +var commands map[structs.MessageType]unboundCommand + +// registerCommand registers a new command with the FSM, which should be done +// at package init() time. +func registerCommand(msg structs.MessageType, fn unboundCommand) { + if commands == nil { + commands = make(map[structs.MessageType]unboundCommand) + } + if commands[msg] != nil { + panic(fmt.Errorf("Message %d is already registered", msg)) + } + commands[msg] = fn +} + // FSM implements a finite state machine that is used // along with Raft to provide strong consistency. We implement // this outside the Server to avoid exposing this outside the package. @@ -26,6 +47,10 @@ type FSM struct { logger *log.Logger path string + // apply is built off the commands global and is used to route apply + // operations to their appropriate handlers. + apply map[structs.MessageType]command + // stateLock is only used to protect outside callers to State() from // racing with Restore(), which is called by Raft (it puts in a totally // new state store). Everything internal here is synchronized by the @@ -60,9 +85,19 @@ func New(gc *state.TombstoneGC, logOutput io.Writer) (*FSM, error) { fsm := &FSM{ logOutput: logOutput, logger: log.New(logOutput, "", log.LstdFlags), + apply: make(map[structs.MessageType]command), state: stateNew, gc: gc, } + + // Build out the apply dispatch table based on the registered commands. + for msg, fn := range commands { + thisFn := fn + fsm.apply[msg] = func(buf []byte, index uint64) interface{} { + return thisFn(fsm, buf, index) + } + } + return fsm, nil } @@ -86,274 +121,18 @@ func (c *FSM) Apply(log *raft.Log) interface{} { ignoreUnknown = true } - switch msgType { - case structs.RegisterRequestType: - return c.applyRegister(buf[1:], log.Index) - case structs.DeregisterRequestType: - return c.applyDeregister(buf[1:], log.Index) - case structs.KVSRequestType: - return c.applyKVSOperation(buf[1:], log.Index) - case structs.SessionRequestType: - return c.applySessionOperation(buf[1:], log.Index) - case structs.ACLRequestType: - return c.applyACLOperation(buf[1:], log.Index) - case structs.TombstoneRequestType: - return c.applyTombstoneOperation(buf[1:], log.Index) - case structs.CoordinateBatchUpdateType: - return c.applyCoordinateBatchUpdate(buf[1:], log.Index) - case structs.PreparedQueryRequestType: - return c.applyPreparedQueryOperation(buf[1:], log.Index) - case structs.TxnRequestType: - return c.applyTxn(buf[1:], log.Index) - case structs.AutopilotRequestType: - return c.applyAutopilotUpdate(buf[1:], log.Index) - default: - if ignoreUnknown { - c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) - return nil - } - panic(fmt.Errorf("failed to apply request: %#v", buf)) - } -} - -func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { - defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now()) - defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now()) - var req structs.RegisterRequest - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) + // Apply based on the dispatch table, if possible. + if fn, ok := c.apply[msgType]; ok { + return fn(buf[1:], log.Index) } - // Apply all updates in a single transaction - if err := c.state.EnsureRegistration(index, &req); err != nil { - c.logger.Printf("[WARN] consul.fsm: EnsureRegistration failed: %v", err) - return err + // Otherwise, see if it's safe to ignore. If not, we have to panic so + // that we crash and our state doesn't diverge. + if ignoreUnknown { + c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) + return nil } - return nil -} - -func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} { - defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now()) - defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now()) - var req structs.DeregisterRequest - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) - } - - // Either remove the service entry or the whole node. The precedence - // here is also baked into vetDeregisterWithACL() in acl.go, so if you - // make changes here, be sure to also adjust the code over there. - if req.ServiceID != "" { - if err := c.state.DeleteService(index, req.Node, req.ServiceID); err != nil { - c.logger.Printf("[WARN] consul.fsm: DeleteNodeService failed: %v", err) - return err - } - } else if req.CheckID != "" { - if err := c.state.DeleteCheck(index, req.Node, req.CheckID); err != nil { - c.logger.Printf("[WARN] consul.fsm: DeleteNodeCheck failed: %v", err) - return err - } - } else { - if err := c.state.DeleteNode(index, req.Node); err != nil { - c.logger.Printf("[WARN] consul.fsm: DeleteNode failed: %v", err) - return err - } - } - return nil -} - -func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} { - var req structs.KVSRequest - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) - } - defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "kvs"}, time.Now(), - []metrics.Label{{Name: "op", Value: string(req.Op)}}) - defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(), - []metrics.Label{{Name: "op", Value: string(req.Op)}}) - switch req.Op { - case api.KVSet: - return c.state.KVSSet(index, &req.DirEnt) - case api.KVDelete: - return c.state.KVSDelete(index, req.DirEnt.Key) - case api.KVDeleteCAS: - act, err := c.state.KVSDeleteCAS(index, req.DirEnt.ModifyIndex, req.DirEnt.Key) - if err != nil { - return err - } - return act - case api.KVDeleteTree: - return c.state.KVSDeleteTree(index, req.DirEnt.Key) - case api.KVCAS: - act, err := c.state.KVSSetCAS(index, &req.DirEnt) - if err != nil { - return err - } - return act - case api.KVLock: - act, err := c.state.KVSLock(index, &req.DirEnt) - if err != nil { - return err - } - return act - case api.KVUnlock: - act, err := c.state.KVSUnlock(index, &req.DirEnt) - if err != nil { - return err - } - return act - default: - err := fmt.Errorf("Invalid KVS operation '%s'", req.Op) - c.logger.Printf("[WARN] consul.fsm: %v", err) - return err - } -} - -func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} { - var req structs.SessionRequest - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) - } - defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "session"}, time.Now(), - []metrics.Label{{Name: "op", Value: string(req.Op)}}) - defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(), - []metrics.Label{{Name: "op", Value: string(req.Op)}}) - switch req.Op { - case structs.SessionCreate: - if err := c.state.SessionCreate(index, &req.Session); err != nil { - return err - } - return req.Session.ID - case structs.SessionDestroy: - return c.state.SessionDestroy(index, req.Session.ID) - default: - c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op) - return fmt.Errorf("Invalid Session operation '%s'", req.Op) - } -} - -func (c *FSM) applyACLOperation(buf []byte, index uint64) interface{} { - var req structs.ACLRequest - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) - } - defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "acl"}, time.Now(), - []metrics.Label{{Name: "op", Value: string(req.Op)}}) - defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl"}, time.Now(), - []metrics.Label{{Name: "op", Value: string(req.Op)}}) - switch req.Op { - case structs.ACLBootstrapInit: - enabled, err := c.state.ACLBootstrapInit(index) - if err != nil { - return err - } - return enabled - case structs.ACLBootstrapNow: - if err := c.state.ACLBootstrap(index, &req.ACL); err != nil { - return err - } - return &req.ACL - case structs.ACLForceSet, structs.ACLSet: - if err := c.state.ACLSet(index, &req.ACL); err != nil { - return err - } - return req.ACL.ID - case structs.ACLDelete: - return c.state.ACLDelete(index, req.ACL.ID) - default: - c.logger.Printf("[WARN] consul.fsm: Invalid ACL operation '%s'", req.Op) - return fmt.Errorf("Invalid ACL operation '%s'", req.Op) - } -} - -func (c *FSM) applyTombstoneOperation(buf []byte, index uint64) interface{} { - var req structs.TombstoneRequest - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) - } - defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "tombstone"}, time.Now(), - []metrics.Label{{Name: "op", Value: string(req.Op)}}) - defer metrics.MeasureSinceWithLabels([]string{"fsm", "tombstone"}, time.Now(), - []metrics.Label{{Name: "op", Value: string(req.Op)}}) - switch req.Op { - case structs.TombstoneReap: - return c.state.ReapTombstones(req.ReapIndex) - default: - c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op) - return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op) - } -} - -// applyCoordinateBatchUpdate processes a batch of coordinate updates and applies -// them in a single underlying transaction. This interface isn't 1:1 with the outer -// update interface that the coordinate endpoint exposes, so we made it single -// purpose and avoided the opcode convention. -func (c *FSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} { - var updates structs.Coordinates - if err := structs.Decode(buf, &updates); err != nil { - panic(fmt.Errorf("failed to decode batch updates: %v", err)) - } - defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", "batch-update"}, time.Now()) - defer metrics.MeasureSince([]string{"fsm", "coordinate", "batch-update"}, time.Now()) - if err := c.state.CoordinateBatchUpdate(index, updates); err != nil { - return err - } - return nil -} - -// applyPreparedQueryOperation applies the given prepared query operation to the -// state store. -func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} { - var req structs.PreparedQueryRequest - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) - } - - defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "prepared-query"}, time.Now(), - []metrics.Label{{Name: "op", Value: string(req.Op)}}) - defer metrics.MeasureSinceWithLabels([]string{"fsm", "prepared-query"}, time.Now(), - []metrics.Label{{Name: "op", Value: string(req.Op)}}) - switch req.Op { - case structs.PreparedQueryCreate, structs.PreparedQueryUpdate: - return c.state.PreparedQuerySet(index, req.Query) - case structs.PreparedQueryDelete: - return c.state.PreparedQueryDelete(index, req.Query.ID) - default: - c.logger.Printf("[WARN] consul.fsm: Invalid PreparedQuery operation '%s'", req.Op) - return fmt.Errorf("Invalid PreparedQuery operation '%s'", req.Op) - } -} - -func (c *FSM) applyTxn(buf []byte, index uint64) interface{} { - var req structs.TxnRequest - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) - } - defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now()) - defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now()) - results, errors := c.state.TxnRW(index, req.Ops) - return structs.TxnResponse{ - Results: results, - Errors: errors, - } -} - -func (c *FSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} { - var req structs.AutopilotSetConfigRequest - if err := structs.Decode(buf, &req); err != nil { - panic(fmt.Errorf("failed to decode request: %v", err)) - } - defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now()) - defer metrics.MeasureSince([]string{"fsm", "autopilot"}, time.Now()) - - if req.CAS { - act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config) - if err != nil { - return err - } - return act - } - return c.state.AutopilotSetConfig(index, &req.Config) + panic(fmt.Errorf("failed to apply request: %#v", buf)) } func (c *FSM) Snapshot() (raft.FSMSnapshot, error) { diff --git a/agent/consul/fsm/fsm_test.go b/agent/consul/fsm/fsm_test.go index 6b8cf9a129..70a91d6798 100644 --- a/agent/consul/fsm/fsm_test.go +++ b/agent/consul/fsm/fsm_test.go @@ -2,8 +2,6 @@ package fsm import ( "bytes" - "fmt" - "math/rand" "os" "reflect" "testing" @@ -13,10 +11,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" - "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-uuid" "github.com/hashicorp/raft" - "github.com/hashicorp/serf/coordinate" "github.com/pascaldekloe/goe/verify" ) @@ -47,333 +42,6 @@ func makeLog(buf []byte) *raft.Log { } } -func generateUUID() (ret string) { - var err error - if ret, err = uuid.GenerateUUID(); err != nil { - panic(fmt.Sprintf("Unable to generate a UUID, %v", err)) - } - return ret -} - -func generateRandomCoordinate() *coordinate.Coordinate { - config := coordinate.DefaultConfig() - coord := coordinate.NewCoordinate(config) - for i := range coord.Vec { - coord.Vec[i] = rand.NormFloat64() - } - coord.Error = rand.NormFloat64() - coord.Adjustment = rand.NormFloat64() - return coord -} - -func TestFSM_RegisterNode(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - req := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - } - buf, err := structs.Encode(structs.RegisterRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Verify we are registered - _, node, err := fsm.state.GetNode("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if node == nil { - t.Fatalf("not found!") - } - if node.ModifyIndex != 1 { - t.Fatalf("bad index: %d", node.ModifyIndex) - } - - // Verify service registered - _, services, err := fsm.state.NodeServices(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if len(services.Services) != 0 { - t.Fatalf("Services: %v", services) - } -} - -func TestFSM_RegisterNode_Service(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - req := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Service: &structs.NodeService{ - ID: "db", - Service: "db", - Tags: []string{"master"}, - Port: 8000, - }, - Check: &structs.HealthCheck{ - Node: "foo", - CheckID: "db", - Name: "db connectivity", - Status: api.HealthPassing, - ServiceID: "db", - }, - } - buf, err := structs.Encode(structs.RegisterRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Verify we are registered - _, node, err := fsm.state.GetNode("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if node == nil { - t.Fatalf("not found!") - } - - // Verify service registered - _, services, err := fsm.state.NodeServices(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if _, ok := services.Services["db"]; !ok { - t.Fatalf("not registered!") - } - - // Verify check - _, checks, err := fsm.state.NodeChecks(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if checks[0].CheckID != "db" { - t.Fatalf("not registered!") - } -} - -func TestFSM_DeregisterService(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - req := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Service: &structs.NodeService{ - ID: "db", - Service: "db", - Tags: []string{"master"}, - Port: 8000, - }, - } - buf, err := structs.Encode(structs.RegisterRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - dereg := structs.DeregisterRequest{ - Datacenter: "dc1", - Node: "foo", - ServiceID: "db", - } - buf, err = structs.Encode(structs.DeregisterRequestType, dereg) - if err != nil { - t.Fatalf("err: %v", err) - } - - resp = fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Verify we are registered - _, node, err := fsm.state.GetNode("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if node == nil { - t.Fatalf("not found!") - } - - // Verify service not registered - _, services, err := fsm.state.NodeServices(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if _, ok := services.Services["db"]; ok { - t.Fatalf("db registered!") - } -} - -func TestFSM_DeregisterCheck(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - req := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Check: &structs.HealthCheck{ - Node: "foo", - CheckID: "mem", - Name: "memory util", - Status: api.HealthPassing, - }, - } - buf, err := structs.Encode(structs.RegisterRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - dereg := structs.DeregisterRequest{ - Datacenter: "dc1", - Node: "foo", - CheckID: "mem", - } - buf, err = structs.Encode(structs.DeregisterRequestType, dereg) - if err != nil { - t.Fatalf("err: %v", err) - } - - resp = fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Verify we are registered - _, node, err := fsm.state.GetNode("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if node == nil { - t.Fatalf("not found!") - } - - // Verify check not registered - _, checks, err := fsm.state.NodeChecks(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if len(checks) != 0 { - t.Fatalf("check registered!") - } -} - -func TestFSM_DeregisterNode(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - req := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Service: &structs.NodeService{ - ID: "db", - Service: "db", - Tags: []string{"master"}, - Port: 8000, - }, - Check: &structs.HealthCheck{ - Node: "foo", - CheckID: "db", - Name: "db connectivity", - Status: api.HealthPassing, - ServiceID: "db", - }, - } - buf, err := structs.Encode(structs.RegisterRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - dereg := structs.DeregisterRequest{ - Datacenter: "dc1", - Node: "foo", - } - buf, err = structs.Encode(structs.DeregisterRequestType, dereg) - if err != nil { - t.Fatalf("err: %v", err) - } - - resp = fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Verify we are not registered - _, node, err := fsm.state.GetNode("foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if node != nil { - t.Fatalf("found!") - } - - // Verify service not registered - _, services, err := fsm.state.NodeServices(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if services != nil { - t.Fatalf("Services: %v", services) - } - - // Verify checks not registered - _, checks, err := fsm.state.NodeChecks(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if len(checks) != 0 { - t.Fatalf("Services: %v", services) - } -} - func TestFSM_SnapshotRestore(t *testing.T) { t.Parallel() fsm, err := New(nil, os.Stderr) @@ -685,811 +353,6 @@ func TestFSM_BadRestore(t *testing.T) { } } -func TestFSM_KVSDelete(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - req := structs.KVSRequest{ - Datacenter: "dc1", - Op: api.KVSet, - DirEnt: structs.DirEntry{ - Key: "/test/path", - Flags: 0, - Value: []byte("test"), - }, - } - buf, err := structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Run the delete - req.Op = api.KVDelete - buf, err = structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp = fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Verify key is not set - _, d, err := fsm.state.KVSGet(nil, "/test/path") - if err != nil { - t.Fatalf("err: %v", err) - } - if d != nil { - t.Fatalf("key present") - } -} - -func TestFSM_KVSDeleteTree(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - req := structs.KVSRequest{ - Datacenter: "dc1", - Op: api.KVSet, - DirEnt: structs.DirEntry{ - Key: "/test/path", - Flags: 0, - Value: []byte("test"), - }, - } - buf, err := structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Run the delete tree - req.Op = api.KVDeleteTree - req.DirEnt.Key = "/test" - buf, err = structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp = fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Verify key is not set - _, d, err := fsm.state.KVSGet(nil, "/test/path") - if err != nil { - t.Fatalf("err: %v", err) - } - if d != nil { - t.Fatalf("key present") - } -} - -func TestFSM_KVSDeleteCheckAndSet(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - req := structs.KVSRequest{ - Datacenter: "dc1", - Op: api.KVSet, - DirEnt: structs.DirEntry{ - Key: "/test/path", - Flags: 0, - Value: []byte("test"), - }, - } - buf, err := structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Verify key is set - _, d, err := fsm.state.KVSGet(nil, "/test/path") - if err != nil { - t.Fatalf("err: %v", err) - } - if d == nil { - t.Fatalf("key missing") - } - - // Run the check-and-set - req.Op = api.KVDeleteCAS - req.DirEnt.ModifyIndex = d.ModifyIndex - buf, err = structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp = fsm.Apply(makeLog(buf)) - if resp.(bool) != true { - t.Fatalf("resp: %v", resp) - } - - // Verify key is gone - _, d, err = fsm.state.KVSGet(nil, "/test/path") - if err != nil { - t.Fatalf("err: %v", err) - } - if d != nil { - t.Fatalf("bad: %v", d) - } -} - -func TestFSM_KVSCheckAndSet(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - req := structs.KVSRequest{ - Datacenter: "dc1", - Op: api.KVSet, - DirEnt: structs.DirEntry{ - Key: "/test/path", - Flags: 0, - Value: []byte("test"), - }, - } - buf, err := structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Verify key is set - _, d, err := fsm.state.KVSGet(nil, "/test/path") - if err != nil { - t.Fatalf("err: %v", err) - } - if d == nil { - t.Fatalf("key missing") - } - - // Run the check-and-set - req.Op = api.KVCAS - req.DirEnt.ModifyIndex = d.ModifyIndex - req.DirEnt.Value = []byte("zip") - buf, err = structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp = fsm.Apply(makeLog(buf)) - if resp.(bool) != true { - t.Fatalf("resp: %v", resp) - } - - // Verify key is updated - _, d, err = fsm.state.KVSGet(nil, "/test/path") - if err != nil { - t.Fatalf("err: %v", err) - } - if string(d.Value) != "zip" { - t.Fatalf("bad: %v", d) - } -} - -func TestFSM_CoordinateUpdate(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Register some nodes. - fsm.state.EnsureNode(1, &structs.Node{Node: "node1", Address: "127.0.0.1"}) - fsm.state.EnsureNode(2, &structs.Node{Node: "node2", Address: "127.0.0.1"}) - - // Write a batch of two coordinates. - updates := structs.Coordinates{ - &structs.Coordinate{ - Node: "node1", - Coord: generateRandomCoordinate(), - }, - &structs.Coordinate{ - Node: "node2", - Coord: generateRandomCoordinate(), - }, - } - buf, err := structs.Encode(structs.CoordinateBatchUpdateType, updates) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - // Read back the two coordinates to make sure they got updated. - _, coords, err := fsm.state.Coordinates(nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if !reflect.DeepEqual(coords, updates) { - t.Fatalf("bad: %#v", coords) - } -} - -func TestFSM_SessionCreate_Destroy(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) - fsm.state.EnsureCheck(2, &structs.HealthCheck{ - Node: "foo", - CheckID: "web", - Status: api.HealthPassing, - }) - - // Create a new session - req := structs.SessionRequest{ - Datacenter: "dc1", - Op: structs.SessionCreate, - Session: structs.Session{ - ID: generateUUID(), - Node: "foo", - Checks: []types.CheckID{"web"}, - }, - } - buf, err := structs.Encode(structs.SessionRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if err, ok := resp.(error); ok { - t.Fatalf("resp: %v", err) - } - - // Get the session - id := resp.(string) - _, session, err := fsm.state.SessionGet(nil, id) - if err != nil { - t.Fatalf("err: %v", err) - } - if session == nil { - t.Fatalf("missing") - } - - // Verify the session - if session.ID != id { - t.Fatalf("bad: %v", *session) - } - if session.Node != "foo" { - t.Fatalf("bad: %v", *session) - } - if session.Checks[0] != "web" { - t.Fatalf("bad: %v", *session) - } - - // Try to destroy - destroy := structs.SessionRequest{ - Datacenter: "dc1", - Op: structs.SessionDestroy, - Session: structs.Session{ - ID: id, - }, - } - buf, err = structs.Encode(structs.SessionRequestType, destroy) - if err != nil { - t.Fatalf("err: %v", err) - } - resp = fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - _, session, err = fsm.state.SessionGet(nil, id) - if err != nil { - t.Fatalf("err: %v", err) - } - if session != nil { - t.Fatalf("should be destroyed") - } -} - -func TestFSM_KVSLock(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) - session := &structs.Session{ID: generateUUID(), Node: "foo"} - fsm.state.SessionCreate(2, session) - - req := structs.KVSRequest{ - Datacenter: "dc1", - Op: api.KVLock, - DirEnt: structs.DirEntry{ - Key: "/test/path", - Value: []byte("test"), - Session: session.ID, - }, - } - buf, err := structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if resp != true { - t.Fatalf("resp: %v", resp) - } - - // Verify key is locked - _, d, err := fsm.state.KVSGet(nil, "/test/path") - if err != nil { - t.Fatalf("err: %v", err) - } - if d == nil { - t.Fatalf("missing") - } - if d.LockIndex != 1 { - t.Fatalf("bad: %v", *d) - } - if d.Session != session.ID { - t.Fatalf("bad: %v", *d) - } -} - -func TestFSM_KVSUnlock(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) - session := &structs.Session{ID: generateUUID(), Node: "foo"} - fsm.state.SessionCreate(2, session) - - req := structs.KVSRequest{ - Datacenter: "dc1", - Op: api.KVLock, - DirEnt: structs.DirEntry{ - Key: "/test/path", - Value: []byte("test"), - Session: session.ID, - }, - } - buf, err := structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if resp != true { - t.Fatalf("resp: %v", resp) - } - - req = structs.KVSRequest{ - Datacenter: "dc1", - Op: api.KVUnlock, - DirEnt: structs.DirEntry{ - Key: "/test/path", - Value: []byte("test"), - Session: session.ID, - }, - } - buf, err = structs.Encode(structs.KVSRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp = fsm.Apply(makeLog(buf)) - if resp != true { - t.Fatalf("resp: %v", resp) - } - - // Verify key is unlocked - _, d, err := fsm.state.KVSGet(nil, "/test/path") - if err != nil { - t.Fatalf("err: %v", err) - } - if d == nil { - t.Fatalf("missing") - } - if d.LockIndex != 1 { - t.Fatalf("bad: %v", *d) - } - if d.Session != "" { - t.Fatalf("bad: %v", *d) - } -} - -func TestFSM_ACL_CRUD(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Create a new ACL. - req := structs.ACLRequest{ - Datacenter: "dc1", - Op: structs.ACLSet, - ACL: structs.ACL{ - ID: generateUUID(), - Name: "User token", - Type: structs.ACLTypeClient, - }, - } - buf, err := structs.Encode(structs.ACLRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if err, ok := resp.(error); ok { - t.Fatalf("resp: %v", err) - } - - // Get the ACL. - id := resp.(string) - _, acl, err := fsm.state.ACLGet(nil, id) - if err != nil { - t.Fatalf("err: %v", err) - } - if acl == nil { - t.Fatalf("missing") - } - - // Verify the ACL. - if acl.ID != id { - t.Fatalf("bad: %v", *acl) - } - if acl.Name != "User token" { - t.Fatalf("bad: %v", *acl) - } - if acl.Type != structs.ACLTypeClient { - t.Fatalf("bad: %v", *acl) - } - - // Try to destroy. - destroy := structs.ACLRequest{ - Datacenter: "dc1", - Op: structs.ACLDelete, - ACL: structs.ACL{ - ID: id, - }, - } - buf, err = structs.Encode(structs.ACLRequestType, destroy) - if err != nil { - t.Fatalf("err: %v", err) - } - resp = fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - - _, acl, err = fsm.state.ACLGet(nil, id) - if err != nil { - t.Fatalf("err: %v", err) - } - if acl != nil { - t.Fatalf("should be destroyed") - } - - // Initialize bootstrap (should work since we haven't made a management - // token). - init := structs.ACLRequest{ - Datacenter: "dc1", - Op: structs.ACLBootstrapInit, - } - buf, err = structs.Encode(structs.ACLRequestType, init) - if err != nil { - t.Fatalf("err: %v", err) - } - resp = fsm.Apply(makeLog(buf)) - if enabled, ok := resp.(bool); !ok || !enabled { - t.Fatalf("resp: %v", resp) - } - gotB, err := fsm.state.ACLGetBootstrap() - if err != nil { - t.Fatalf("err: %v", err) - } - wantB := &structs.ACLBootstrap{ - AllowBootstrap: true, - RaftIndex: gotB.RaftIndex, - } - verify.Values(t, "", gotB, wantB) - - // Do a bootstrap. - bootstrap := structs.ACLRequest{ - Datacenter: "dc1", - Op: structs.ACLBootstrapNow, - ACL: structs.ACL{ - ID: generateUUID(), - Name: "Bootstrap Token", - Type: structs.ACLTypeManagement, - }, - } - buf, err = structs.Encode(structs.ACLRequestType, bootstrap) - if err != nil { - t.Fatalf("err: %v", err) - } - resp = fsm.Apply(makeLog(buf)) - respACL, ok := resp.(*structs.ACL) - if !ok { - t.Fatalf("resp: %v", resp) - } - bootstrap.ACL.CreateIndex = respACL.CreateIndex - bootstrap.ACL.ModifyIndex = respACL.ModifyIndex - verify.Values(t, "", respACL, &bootstrap.ACL) -} - -func TestFSM_PreparedQuery_CRUD(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Register a service to query on. - fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) - fsm.state.EnsureService(2, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80}) - - // Create a new query. - query := structs.PreparedQueryRequest{ - Op: structs.PreparedQueryCreate, - Query: &structs.PreparedQuery{ - ID: generateUUID(), - Service: structs.ServiceQuery{ - Service: "web", - }, - }, - } - { - buf, err := structs.Encode(structs.PreparedQueryRequestType, query) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - } - - // Verify it's in the state store. - { - _, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID) - if err != nil { - t.Fatalf("err: %s", err) - } - - actual.CreateIndex, actual.ModifyIndex = 0, 0 - if !reflect.DeepEqual(actual, query.Query) { - t.Fatalf("bad: %v", actual) - } - } - - // Make an update to the query. - query.Op = structs.PreparedQueryUpdate - query.Query.Name = "my-query" - { - buf, err := structs.Encode(structs.PreparedQueryRequestType, query) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - } - - // Verify the update. - { - _, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID) - if err != nil { - t.Fatalf("err: %s", err) - } - - actual.CreateIndex, actual.ModifyIndex = 0, 0 - if !reflect.DeepEqual(actual, query.Query) { - t.Fatalf("bad: %v", actual) - } - } - - // Delete the query. - query.Op = structs.PreparedQueryDelete - { - buf, err := structs.Encode(structs.PreparedQueryRequestType, query) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if resp != nil { - t.Fatalf("resp: %v", resp) - } - } - - // Make sure it's gone. - { - _, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID) - if err != nil { - t.Fatalf("err: %s", err) - } - - if actual != nil { - t.Fatalf("bad: %v", actual) - } - } -} - -func TestFSM_TombstoneReap(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Create some tombstones - fsm.state.KVSSet(11, &structs.DirEntry{ - Key: "/remove", - Value: []byte("foo"), - }) - fsm.state.KVSDelete(12, "/remove") - idx, _, err := fsm.state.KVSList(nil, "/remove") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 12 { - t.Fatalf("bad index: %d", idx) - } - - // Create a new reap request - req := structs.TombstoneRequest{ - Datacenter: "dc1", - Op: structs.TombstoneReap, - ReapIndex: 12, - } - buf, err := structs.Encode(structs.TombstoneRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if err, ok := resp.(error); ok { - t.Fatalf("resp: %v", err) - } - - // Verify the tombstones are gone - snap := fsm.state.Snapshot() - defer snap.Close() - stones, err := snap.Tombstones() - if err != nil { - t.Fatalf("err: %s", err) - } - if stones.Next() != nil { - t.Fatalf("unexpected extra tombstones") - } -} - -func TestFSM_Txn(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Set a key using a transaction. - req := structs.TxnRequest{ - Datacenter: "dc1", - Ops: structs.TxnOps{ - &structs.TxnOp{ - KV: &structs.TxnKVOp{ - Verb: api.KVSet, - DirEnt: structs.DirEntry{ - Key: "/test/path", - Flags: 0, - Value: []byte("test"), - }, - }, - }, - }, - } - buf, err := structs.Encode(structs.TxnRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if _, ok := resp.(structs.TxnResponse); !ok { - t.Fatalf("bad response type: %T", resp) - } - - // Verify key is set directly in the state store. - _, d, err := fsm.state.KVSGet(nil, "/test/path") - if err != nil { - t.Fatalf("err: %v", err) - } - if d == nil { - t.Fatalf("missing") - } -} - -func TestFSM_Autopilot(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Set the autopilot config using a request. - req := structs.AutopilotSetConfigRequest{ - Datacenter: "dc1", - Config: structs.AutopilotConfig{ - CleanupDeadServers: true, - LastContactThreshold: 10 * time.Second, - MaxTrailingLogs: 300, - }, - } - buf, err := structs.Encode(structs.AutopilotRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp := fsm.Apply(makeLog(buf)) - if _, ok := resp.(error); ok { - t.Fatalf("bad: %v", resp) - } - - // Verify key is set directly in the state store. - _, config, err := fsm.state.AutopilotConfig() - if err != nil { - t.Fatalf("err: %v", err) - } - if config.CleanupDeadServers != req.Config.CleanupDeadServers { - t.Fatalf("bad: %v", config.CleanupDeadServers) - } - if config.LastContactThreshold != req.Config.LastContactThreshold { - t.Fatalf("bad: %v", config.LastContactThreshold) - } - if config.MaxTrailingLogs != req.Config.MaxTrailingLogs { - t.Fatalf("bad: %v", config.MaxTrailingLogs) - } - - // Now use CAS and provide an old index - req.CAS = true - req.Config.CleanupDeadServers = false - req.Config.ModifyIndex = config.ModifyIndex - 1 - buf, err = structs.Encode(structs.AutopilotRequestType, req) - if err != nil { - t.Fatalf("err: %v", err) - } - resp = fsm.Apply(makeLog(buf)) - if _, ok := resp.(error); ok { - t.Fatalf("bad: %v", resp) - } - - _, config, err = fsm.state.AutopilotConfig() - if err != nil { - t.Fatalf("err: %v", err) - } - if !config.CleanupDeadServers { - t.Fatalf("bad: %v", config.CleanupDeadServers) - } -} - func TestFSM_IgnoreUnknown(t *testing.T) { t.Parallel() fsm, err := New(nil, os.Stderr)