mirror of https://github.com/status-im/consul.git
consul: Adding support for sessions to FSM
This commit is contained in:
parent
8772068163
commit
137d21fbac
|
@ -67,6 +67,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
|||
return c.applyDeregister(buf[1:], log.Index)
|
||||
case structs.KVSRequestType:
|
||||
return c.applyKVSOperation(buf[1:], log.Index)
|
||||
case structs.SessionRequestType:
|
||||
return c.applySessionOperation(buf[1:], log.Index)
|
||||
default:
|
||||
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
||||
}
|
||||
|
@ -152,6 +154,20 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
|||
} else {
|
||||
return act
|
||||
}
|
||||
case structs.KVSLock:
|
||||
act, err := c.state.KVSLock(index, &req.DirEnt)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
return act
|
||||
}
|
||||
case structs.KVSUnlock:
|
||||
act, err := c.state.KVSUnlock(index, &req.DirEnt)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
return act
|
||||
}
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid KVS operation '%s'", req.Op)
|
||||
|
@ -159,6 +175,23 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.SessionRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
switch req.Op {
|
||||
case structs.SessionCreate:
|
||||
return c.state.SessionCreate(index, &req.Session)
|
||||
case structs.SessionDestroy:
|
||||
return c.state.SessionDestroy(index, req.Session.ID)
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid Session operation '%s'", req.Op)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||
defer func(start time.Time) {
|
||||
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
||||
|
@ -222,6 +255,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||
return err
|
||||
}
|
||||
|
||||
case structs.SessionRequestType:
|
||||
var req structs.Session
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.state.SessionRestore(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unrecognized msg type: %v", msgType)
|
||||
}
|
||||
|
@ -244,6 +286,25 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := s.persistNodes(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.persistSessions(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.persistKV(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
// Get all the nodes
|
||||
nodes := s.state.Nodes()
|
||||
|
||||
|
@ -258,7 +319,6 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|||
// Register the node itself
|
||||
sink.Write([]byte{byte(structs.RegisterRequestType)})
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -268,7 +328,6 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|||
req.Service = srv
|
||||
sink.Write([]byte{byte(structs.RegisterRequestType)})
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -280,16 +339,31 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|||
req.Check = check
|
||||
sink.Write([]byte{byte(structs.RegisterRequestType)})
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Enable GC of the ndoes
|
||||
nodes = nil
|
||||
func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
sessions, err := s.state.SessionList()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Dump the KVS entries
|
||||
for _, s := range sessions {
|
||||
sink.Write([]byte{byte(structs.SessionRequestType)})
|
||||
if err := encoder.Encode(s); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *consulSnapshot) persistKV(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
streamCh := make(chan interface{}, 256)
|
||||
errorCh := make(chan error)
|
||||
go func() {
|
||||
|
@ -298,25 +372,21 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|||
}
|
||||
}()
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case raw := <-streamCh:
|
||||
if raw == nil {
|
||||
break OUTER
|
||||
return nil
|
||||
}
|
||||
sink.Write([]byte{byte(structs.KVSRequestType)})
|
||||
if err := encoder.Encode(raw); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
case err := <-errorCh:
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ const (
|
|||
RegisterRequestType MessageType = iota
|
||||
DeregisterRequestType
|
||||
KVSRequestType
|
||||
SessionRequestType
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -348,6 +349,25 @@ type Session struct {
|
|||
CreateIndex uint64
|
||||
}
|
||||
|
||||
type SessionOp string
|
||||
|
||||
const (
|
||||
SessionCreate SessionOp = "create"
|
||||
SessionDestroy = "destroy"
|
||||
)
|
||||
|
||||
// SessionRequest is used to operate on sessions
|
||||
type SessionRequest struct {
|
||||
Datacenter string
|
||||
Op SessionOp // Which operation are we performing
|
||||
Session Session // Which session
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
func (r *SessionRequest) RequestDatacenter() string {
|
||||
return r.Datacenter
|
||||
}
|
||||
|
||||
// Decode is used to decode a MsgPack encoded object
|
||||
func Decode(buf []byte, out interface{}) error {
|
||||
var handle codec.MsgpackHandle
|
||||
|
|
Loading…
Reference in New Issue