mirror of https://github.com/status-im/consul.git
Creates a registration mechanism for FSM commands.
This commit is contained in:
parent
78292662d7
commit
c8e763667f
|
@ -0,0 +1,263 @@
|
||||||
|
package fsm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registerCommand(structs.RegisterRequestType, (*FSM).applyRegister)
|
||||||
|
registerCommand(structs.DeregisterRequestType, (*FSM).applyDeregister)
|
||||||
|
registerCommand(structs.KVSRequestType, (*FSM).applyKVSOperation)
|
||||||
|
registerCommand(structs.SessionRequestType, (*FSM).applySessionOperation)
|
||||||
|
registerCommand(structs.ACLRequestType, (*FSM).applyACLOperation)
|
||||||
|
registerCommand(structs.TombstoneRequestType, (*FSM).applyTombstoneOperation)
|
||||||
|
registerCommand(structs.CoordinateBatchUpdateType, (*FSM).applyCoordinateBatchUpdate)
|
||||||
|
registerCommand(structs.PreparedQueryRequestType, (*FSM).applyPreparedQueryOperation)
|
||||||
|
registerCommand(structs.TxnRequestType, (*FSM).applyTxn)
|
||||||
|
registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
|
||||||
|
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
|
||||||
|
defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now())
|
||||||
|
var req structs.RegisterRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply all updates in a single transaction
|
||||||
|
if err := c.state.EnsureRegistration(index, &req); err != nil {
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: EnsureRegistration failed: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {
|
||||||
|
defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now())
|
||||||
|
defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now())
|
||||||
|
var req structs.DeregisterRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Either remove the service entry or the whole node. The precedence
|
||||||
|
// here is also baked into vetDeregisterWithACL() in acl.go, so if you
|
||||||
|
// make changes here, be sure to also adjust the code over there.
|
||||||
|
if req.ServiceID != "" {
|
||||||
|
if err := c.state.DeleteService(index, req.Node, req.ServiceID); err != nil {
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: DeleteNodeService failed: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else if req.CheckID != "" {
|
||||||
|
if err := c.state.DeleteCheck(index, req.Node, req.CheckID); err != nil {
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: DeleteNodeCheck failed: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := c.state.DeleteNode(index, req.Node); err != nil {
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: DeleteNode failed: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
||||||
|
var req structs.KVSRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "kvs"}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||||
|
switch req.Op {
|
||||||
|
case api.KVSet:
|
||||||
|
return c.state.KVSSet(index, &req.DirEnt)
|
||||||
|
case api.KVDelete:
|
||||||
|
return c.state.KVSDelete(index, req.DirEnt.Key)
|
||||||
|
case api.KVDeleteCAS:
|
||||||
|
act, err := c.state.KVSDeleteCAS(index, req.DirEnt.ModifyIndex, req.DirEnt.Key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return act
|
||||||
|
case api.KVDeleteTree:
|
||||||
|
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
|
||||||
|
case api.KVCAS:
|
||||||
|
act, err := c.state.KVSSetCAS(index, &req.DirEnt)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return act
|
||||||
|
case api.KVLock:
|
||||||
|
act, err := c.state.KVSLock(index, &req.DirEnt)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return act
|
||||||
|
case api.KVUnlock:
|
||||||
|
act, err := c.state.KVSUnlock(index, &req.DirEnt)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return act
|
||||||
|
default:
|
||||||
|
err := fmt.Errorf("Invalid KVS operation '%s'", req.Op)
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} {
|
||||||
|
var req structs.SessionRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "session"}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||||
|
switch req.Op {
|
||||||
|
case structs.SessionCreate:
|
||||||
|
if err := c.state.SessionCreate(index, &req.Session); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return req.Session.ID
|
||||||
|
case structs.SessionDestroy:
|
||||||
|
return c.state.SessionDestroy(index, req.Session.ID)
|
||||||
|
default:
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op)
|
||||||
|
return fmt.Errorf("Invalid Session operation '%s'", req.Op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FSM) applyACLOperation(buf []byte, index uint64) interface{} {
|
||||||
|
var req structs.ACLRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "acl"}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl"}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||||
|
switch req.Op {
|
||||||
|
case structs.ACLBootstrapInit:
|
||||||
|
enabled, err := c.state.ACLBootstrapInit(index)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return enabled
|
||||||
|
case structs.ACLBootstrapNow:
|
||||||
|
if err := c.state.ACLBootstrap(index, &req.ACL); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return &req.ACL
|
||||||
|
case structs.ACLForceSet, structs.ACLSet:
|
||||||
|
if err := c.state.ACLSet(index, &req.ACL); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return req.ACL.ID
|
||||||
|
case structs.ACLDelete:
|
||||||
|
return c.state.ACLDelete(index, req.ACL.ID)
|
||||||
|
default:
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: Invalid ACL operation '%s'", req.Op)
|
||||||
|
return fmt.Errorf("Invalid ACL operation '%s'", req.Op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FSM) applyTombstoneOperation(buf []byte, index uint64) interface{} {
|
||||||
|
var req structs.TombstoneRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "tombstone"}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "tombstone"}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||||
|
switch req.Op {
|
||||||
|
case structs.TombstoneReap:
|
||||||
|
return c.state.ReapTombstones(req.ReapIndex)
|
||||||
|
default:
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op)
|
||||||
|
return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyCoordinateBatchUpdate processes a batch of coordinate updates and applies
|
||||||
|
// them in a single underlying transaction. This interface isn't 1:1 with the outer
|
||||||
|
// update interface that the coordinate endpoint exposes, so we made it single
|
||||||
|
// purpose and avoided the opcode convention.
|
||||||
|
func (c *FSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
|
||||||
|
var updates structs.Coordinates
|
||||||
|
if err := structs.Decode(buf, &updates); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode batch updates: %v", err))
|
||||||
|
}
|
||||||
|
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", "batch-update"}, time.Now())
|
||||||
|
defer metrics.MeasureSince([]string{"fsm", "coordinate", "batch-update"}, time.Now())
|
||||||
|
if err := c.state.CoordinateBatchUpdate(index, updates); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyPreparedQueryOperation applies the given prepared query operation to the
|
||||||
|
// state store.
|
||||||
|
func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} {
|
||||||
|
var req structs.PreparedQueryRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "prepared-query"}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||||
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "prepared-query"}, time.Now(),
|
||||||
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||||
|
switch req.Op {
|
||||||
|
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
|
||||||
|
return c.state.PreparedQuerySet(index, req.Query)
|
||||||
|
case structs.PreparedQueryDelete:
|
||||||
|
return c.state.PreparedQueryDelete(index, req.Query.ID)
|
||||||
|
default:
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: Invalid PreparedQuery operation '%s'", req.Op)
|
||||||
|
return fmt.Errorf("Invalid PreparedQuery operation '%s'", req.Op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FSM) applyTxn(buf []byte, index uint64) interface{} {
|
||||||
|
var req structs.TxnRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now())
|
||||||
|
defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now())
|
||||||
|
results, errors := c.state.TxnRW(index, req.Ops)
|
||||||
|
return structs.TxnResponse{
|
||||||
|
Results: results,
|
||||||
|
Errors: errors,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
|
||||||
|
var req structs.AutopilotSetConfigRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now())
|
||||||
|
defer metrics.MeasureSince([]string{"fsm", "autopilot"}, time.Now())
|
||||||
|
|
||||||
|
if req.CAS {
|
||||||
|
act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return act
|
||||||
|
}
|
||||||
|
return c.state.AutopilotSetConfig(index, &req.Config)
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -10,7 +10,6 @@ import (
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
)
|
)
|
||||||
|
@ -18,6 +17,28 @@ import (
|
||||||
// msgpackHandle is a shared handle for encoding/decoding msgpack payloads
|
// msgpackHandle is a shared handle for encoding/decoding msgpack payloads
|
||||||
var msgpackHandle = &codec.MsgpackHandle{}
|
var msgpackHandle = &codec.MsgpackHandle{}
|
||||||
|
|
||||||
|
// command is a command method on the FSM.
|
||||||
|
type command func(buf []byte, index uint64) interface{}
|
||||||
|
|
||||||
|
// unboundCommand is a command method on the FSM, not yet bound to an FSM
|
||||||
|
// instance.
|
||||||
|
type unboundCommand func(c *FSM, buf []byte, index uint64) interface{}
|
||||||
|
|
||||||
|
// commands is a map from message type to unbound command.
|
||||||
|
var commands map[structs.MessageType]unboundCommand
|
||||||
|
|
||||||
|
// registerCommand registers a new command with the FSM, which should be done
|
||||||
|
// at package init() time.
|
||||||
|
func registerCommand(msg structs.MessageType, fn unboundCommand) {
|
||||||
|
if commands == nil {
|
||||||
|
commands = make(map[structs.MessageType]unboundCommand)
|
||||||
|
}
|
||||||
|
if commands[msg] != nil {
|
||||||
|
panic(fmt.Errorf("Message %d is already registered", msg))
|
||||||
|
}
|
||||||
|
commands[msg] = fn
|
||||||
|
}
|
||||||
|
|
||||||
// FSM implements a finite state machine that is used
|
// FSM implements a finite state machine that is used
|
||||||
// along with Raft to provide strong consistency. We implement
|
// along with Raft to provide strong consistency. We implement
|
||||||
// this outside the Server to avoid exposing this outside the package.
|
// this outside the Server to avoid exposing this outside the package.
|
||||||
|
@ -26,6 +47,10 @@ type FSM struct {
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
path string
|
path string
|
||||||
|
|
||||||
|
// apply is built off the commands global and is used to route apply
|
||||||
|
// operations to their appropriate handlers.
|
||||||
|
apply map[structs.MessageType]command
|
||||||
|
|
||||||
// stateLock is only used to protect outside callers to State() from
|
// stateLock is only used to protect outside callers to State() from
|
||||||
// racing with Restore(), which is called by Raft (it puts in a totally
|
// racing with Restore(), which is called by Raft (it puts in a totally
|
||||||
// new state store). Everything internal here is synchronized by the
|
// new state store). Everything internal here is synchronized by the
|
||||||
|
@ -60,9 +85,19 @@ func New(gc *state.TombstoneGC, logOutput io.Writer) (*FSM, error) {
|
||||||
fsm := &FSM{
|
fsm := &FSM{
|
||||||
logOutput: logOutput,
|
logOutput: logOutput,
|
||||||
logger: log.New(logOutput, "", log.LstdFlags),
|
logger: log.New(logOutput, "", log.LstdFlags),
|
||||||
|
apply: make(map[structs.MessageType]command),
|
||||||
state: stateNew,
|
state: stateNew,
|
||||||
gc: gc,
|
gc: gc,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build out the apply dispatch table based on the registered commands.
|
||||||
|
for msg, fn := range commands {
|
||||||
|
thisFn := fn
|
||||||
|
fsm.apply[msg] = func(buf []byte, index uint64) interface{} {
|
||||||
|
return thisFn(fsm, buf, index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return fsm, nil
|
return fsm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,275 +121,19 @@ func (c *FSM) Apply(log *raft.Log) interface{} {
|
||||||
ignoreUnknown = true
|
ignoreUnknown = true
|
||||||
}
|
}
|
||||||
|
|
||||||
switch msgType {
|
// Apply based on the dispatch table, if possible.
|
||||||
case structs.RegisterRequestType:
|
if fn, ok := c.apply[msgType]; ok {
|
||||||
return c.applyRegister(buf[1:], log.Index)
|
return fn(buf[1:], log.Index)
|
||||||
case structs.DeregisterRequestType:
|
}
|
||||||
return c.applyDeregister(buf[1:], log.Index)
|
|
||||||
case structs.KVSRequestType:
|
// Otherwise, see if it's safe to ignore. If not, we have to panic so
|
||||||
return c.applyKVSOperation(buf[1:], log.Index)
|
// that we crash and our state doesn't diverge.
|
||||||
case structs.SessionRequestType:
|
|
||||||
return c.applySessionOperation(buf[1:], log.Index)
|
|
||||||
case structs.ACLRequestType:
|
|
||||||
return c.applyACLOperation(buf[1:], log.Index)
|
|
||||||
case structs.TombstoneRequestType:
|
|
||||||
return c.applyTombstoneOperation(buf[1:], log.Index)
|
|
||||||
case structs.CoordinateBatchUpdateType:
|
|
||||||
return c.applyCoordinateBatchUpdate(buf[1:], log.Index)
|
|
||||||
case structs.PreparedQueryRequestType:
|
|
||||||
return c.applyPreparedQueryOperation(buf[1:], log.Index)
|
|
||||||
case structs.TxnRequestType:
|
|
||||||
return c.applyTxn(buf[1:], log.Index)
|
|
||||||
case structs.AutopilotRequestType:
|
|
||||||
return c.applyAutopilotUpdate(buf[1:], log.Index)
|
|
||||||
default:
|
|
||||||
if ignoreUnknown {
|
if ignoreUnknown {
|
||||||
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
|
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
|
|
||||||
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
|
|
||||||
defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now())
|
|
||||||
var req structs.RegisterRequest
|
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply all updates in a single transaction
|
|
||||||
if err := c.state.EnsureRegistration(index, &req); err != nil {
|
|
||||||
c.logger.Printf("[WARN] consul.fsm: EnsureRegistration failed: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {
|
|
||||||
defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now())
|
|
||||||
defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now())
|
|
||||||
var req structs.DeregisterRequest
|
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Either remove the service entry or the whole node. The precedence
|
|
||||||
// here is also baked into vetDeregisterWithACL() in acl.go, so if you
|
|
||||||
// make changes here, be sure to also adjust the code over there.
|
|
||||||
if req.ServiceID != "" {
|
|
||||||
if err := c.state.DeleteService(index, req.Node, req.ServiceID); err != nil {
|
|
||||||
c.logger.Printf("[WARN] consul.fsm: DeleteNodeService failed: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else if req.CheckID != "" {
|
|
||||||
if err := c.state.DeleteCheck(index, req.Node, req.CheckID); err != nil {
|
|
||||||
c.logger.Printf("[WARN] consul.fsm: DeleteNodeCheck failed: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if err := c.state.DeleteNode(index, req.Node); err != nil {
|
|
||||||
c.logger.Printf("[WARN] consul.fsm: DeleteNode failed: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
|
||||||
var req structs.KVSRequest
|
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
||||||
}
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "kvs"}, time.Now(),
|
|
||||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(),
|
|
||||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
||||||
switch req.Op {
|
|
||||||
case api.KVSet:
|
|
||||||
return c.state.KVSSet(index, &req.DirEnt)
|
|
||||||
case api.KVDelete:
|
|
||||||
return c.state.KVSDelete(index, req.DirEnt.Key)
|
|
||||||
case api.KVDeleteCAS:
|
|
||||||
act, err := c.state.KVSDeleteCAS(index, req.DirEnt.ModifyIndex, req.DirEnt.Key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return act
|
|
||||||
case api.KVDeleteTree:
|
|
||||||
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
|
|
||||||
case api.KVCAS:
|
|
||||||
act, err := c.state.KVSSetCAS(index, &req.DirEnt)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return act
|
|
||||||
case api.KVLock:
|
|
||||||
act, err := c.state.KVSLock(index, &req.DirEnt)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return act
|
|
||||||
case api.KVUnlock:
|
|
||||||
act, err := c.state.KVSUnlock(index, &req.DirEnt)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return act
|
|
||||||
default:
|
|
||||||
err := fmt.Errorf("Invalid KVS operation '%s'", req.Op)
|
|
||||||
c.logger.Printf("[WARN] consul.fsm: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} {
|
|
||||||
var req structs.SessionRequest
|
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
||||||
}
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "session"}, time.Now(),
|
|
||||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(),
|
|
||||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
||||||
switch req.Op {
|
|
||||||
case structs.SessionCreate:
|
|
||||||
if err := c.state.SessionCreate(index, &req.Session); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return req.Session.ID
|
|
||||||
case structs.SessionDestroy:
|
|
||||||
return c.state.SessionDestroy(index, req.Session.ID)
|
|
||||||
default:
|
|
||||||
c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op)
|
|
||||||
return fmt.Errorf("Invalid Session operation '%s'", req.Op)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) applyACLOperation(buf []byte, index uint64) interface{} {
|
|
||||||
var req structs.ACLRequest
|
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
||||||
}
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "acl"}, time.Now(),
|
|
||||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl"}, time.Now(),
|
|
||||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
||||||
switch req.Op {
|
|
||||||
case structs.ACLBootstrapInit:
|
|
||||||
enabled, err := c.state.ACLBootstrapInit(index)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return enabled
|
|
||||||
case structs.ACLBootstrapNow:
|
|
||||||
if err := c.state.ACLBootstrap(index, &req.ACL); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return &req.ACL
|
|
||||||
case structs.ACLForceSet, structs.ACLSet:
|
|
||||||
if err := c.state.ACLSet(index, &req.ACL); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return req.ACL.ID
|
|
||||||
case structs.ACLDelete:
|
|
||||||
return c.state.ACLDelete(index, req.ACL.ID)
|
|
||||||
default:
|
|
||||||
c.logger.Printf("[WARN] consul.fsm: Invalid ACL operation '%s'", req.Op)
|
|
||||||
return fmt.Errorf("Invalid ACL operation '%s'", req.Op)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) applyTombstoneOperation(buf []byte, index uint64) interface{} {
|
|
||||||
var req structs.TombstoneRequest
|
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
||||||
}
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "tombstone"}, time.Now(),
|
|
||||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "tombstone"}, time.Now(),
|
|
||||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
||||||
switch req.Op {
|
|
||||||
case structs.TombstoneReap:
|
|
||||||
return c.state.ReapTombstones(req.ReapIndex)
|
|
||||||
default:
|
|
||||||
c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op)
|
|
||||||
return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// applyCoordinateBatchUpdate processes a batch of coordinate updates and applies
|
|
||||||
// them in a single underlying transaction. This interface isn't 1:1 with the outer
|
|
||||||
// update interface that the coordinate endpoint exposes, so we made it single
|
|
||||||
// purpose and avoided the opcode convention.
|
|
||||||
func (c *FSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
|
|
||||||
var updates structs.Coordinates
|
|
||||||
if err := structs.Decode(buf, &updates); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode batch updates: %v", err))
|
|
||||||
}
|
|
||||||
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", "batch-update"}, time.Now())
|
|
||||||
defer metrics.MeasureSince([]string{"fsm", "coordinate", "batch-update"}, time.Now())
|
|
||||||
if err := c.state.CoordinateBatchUpdate(index, updates); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// applyPreparedQueryOperation applies the given prepared query operation to the
|
|
||||||
// state store.
|
|
||||||
func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} {
|
|
||||||
var req structs.PreparedQueryRequest
|
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "prepared-query"}, time.Now(),
|
|
||||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "prepared-query"}, time.Now(),
|
|
||||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
||||||
switch req.Op {
|
|
||||||
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
|
|
||||||
return c.state.PreparedQuerySet(index, req.Query)
|
|
||||||
case structs.PreparedQueryDelete:
|
|
||||||
return c.state.PreparedQueryDelete(index, req.Query.ID)
|
|
||||||
default:
|
|
||||||
c.logger.Printf("[WARN] consul.fsm: Invalid PreparedQuery operation '%s'", req.Op)
|
|
||||||
return fmt.Errorf("Invalid PreparedQuery operation '%s'", req.Op)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) applyTxn(buf []byte, index uint64) interface{} {
|
|
||||||
var req structs.TxnRequest
|
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
||||||
}
|
|
||||||
defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now())
|
|
||||||
defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now())
|
|
||||||
results, errors := c.state.TxnRW(index, req.Ops)
|
|
||||||
return structs.TxnResponse{
|
|
||||||
Results: results,
|
|
||||||
Errors: errors,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
|
|
||||||
var req structs.AutopilotSetConfigRequest
|
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
||||||
}
|
|
||||||
defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now())
|
|
||||||
defer metrics.MeasureSince([]string{"fsm", "autopilot"}, time.Now())
|
|
||||||
|
|
||||||
if req.CAS {
|
|
||||||
act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return act
|
|
||||||
}
|
|
||||||
return c.state.AutopilotSetConfig(index, &req.Config)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) Snapshot() (raft.FSMSnapshot, error) {
|
func (c *FSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue