mirror of
https://github.com/status-im/consul.git
synced 2025-02-19 17:14:37 +00:00
consul: Adding FSM support for KVS operations
This commit is contained in:
parent
9cfea72d2a
commit
f91e12fe30
@ -65,6 +65,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
|||||||
return c.decodeRegister(buf[1:], log.Index)
|
return c.decodeRegister(buf[1:], log.Index)
|
||||||
case structs.DeregisterRequestType:
|
case structs.DeregisterRequestType:
|
||||||
return c.applyDeregister(buf[1:], log.Index)
|
return c.applyDeregister(buf[1:], log.Index)
|
||||||
|
case structs.KVSRequestType:
|
||||||
|
return c.applyKVSOperation(buf[1:], log.Index)
|
||||||
default:
|
default:
|
||||||
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
||||||
}
|
}
|
||||||
@ -131,6 +133,32 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
||||||
|
var req structs.KVSRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
switch req.Op {
|
||||||
|
case structs.KVSSet:
|
||||||
|
return c.state.KVSSet(index, &req.DirEnt)
|
||||||
|
case structs.KVSDelete:
|
||||||
|
return c.state.KVSDelete(index, req.DirEnt.Key)
|
||||||
|
case structs.KVSDeleteTree:
|
||||||
|
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
|
||||||
|
case structs.KVSCAS:
|
||||||
|
act, err := c.state.KVSCheckAndSet(index, &req.DirEnt)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
return act
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op)
|
||||||
|
return fmt.Errorf("Invalid KVS operation '%s'", req.Op)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
||||||
@ -152,6 +180,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
c.state.Close()
|
||||||
c.state = state
|
c.state = state
|
||||||
|
|
||||||
// Create a decoder
|
// Create a decoder
|
||||||
|
@ -18,6 +18,7 @@ type MessageType uint8
|
|||||||
const (
|
const (
|
||||||
RegisterRequestType MessageType = iota
|
RegisterRequestType MessageType = iota
|
||||||
DeregisterRequestType
|
DeregisterRequestType
|
||||||
|
KVSRequestType
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
Loading…
x
Reference in New Issue
Block a user