diff --git a/consul/fsm.go b/consul/fsm.go index 22854729fd..c2a1bac251 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -67,6 +67,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.applyDeregister(buf[1:], log.Index) case structs.KVSRequestType: return c.applyKVSOperation(buf[1:], log.Index) + case structs.SessionRequestType: + return c.applySessionOperation(buf[1:], log.Index) default: panic(fmt.Errorf("failed to apply request: %#v", buf)) } @@ -152,6 +154,20 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { } else { return act } + case structs.KVSLock: + act, err := c.state.KVSLock(index, &req.DirEnt) + if err != nil { + return err + } else { + return act + } + case structs.KVSUnlock: + act, err := c.state.KVSUnlock(index, &req.DirEnt) + if err != nil { + return err + } else { + return act + } default: c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op) return fmt.Errorf("Invalid KVS operation '%s'", req.Op) @@ -159,6 +175,23 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { return nil } +func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} { + var req structs.SessionRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + switch req.Op { + case structs.SessionCreate: + return c.state.SessionCreate(index, &req.Session) + case structs.SessionDestroy: + return c.state.SessionDestroy(index, req.Session.ID) + default: + c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op) + return fmt.Errorf("Invalid Session operation '%s'", req.Op) + } + return nil +} + func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { defer func(start time.Time) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) @@ -222,6 +255,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { return err } + case structs.SessionRequestType: + var req structs.Session + if err := dec.Decode(&req); err != nil { + return err + } + if err := c.state.SessionRestore(&req); err != nil { + return err + } + default: return fmt.Errorf("Unrecognized msg type: %v", msgType) } @@ -244,6 +286,25 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { return err } + if err := s.persistNodes(sink, encoder); err != nil { + sink.Cancel() + return err + } + + if err := s.persistSessions(sink, encoder); err != nil { + sink.Cancel() + return err + } + + if err := s.persistKV(sink, encoder); err != nil { + sink.Cancel() + return err + } + return nil +} + +func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, + encoder *codec.Encoder) error { // Get all the nodes nodes := s.state.Nodes() @@ -258,7 +319,6 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { // Register the node itself sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { - sink.Cancel() return err } @@ -268,7 +328,6 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { req.Service = srv sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { - sink.Cancel() return err } } @@ -280,16 +339,31 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { req.Check = check sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { - sink.Cancel() return err } } } + return nil +} - // Enable GC of the ndoes - nodes = nil +func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + sessions, err := s.state.SessionList() + if err != nil { + return err + } - // Dump the KVS entries + for _, s := range sessions { + sink.Write([]byte{byte(structs.SessionRequestType)}) + if err := encoder.Encode(s); err != nil { + return err + } + } + return nil +} + +func (s *consulSnapshot) persistKV(sink raft.SnapshotSink, + encoder *codec.Encoder) error { streamCh := make(chan interface{}, 256) errorCh := make(chan error) go func() { @@ -298,25 +372,21 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { } }() -OUTER: for { select { case raw := <-streamCh: if raw == nil { - break OUTER + return nil } sink.Write([]byte{byte(structs.KVSRequestType)}) if err := encoder.Encode(raw); err != nil { - sink.Cancel() return err } case err := <-errorCh: - sink.Cancel() return err } } - return nil } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 9f91166127..10c423b036 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -19,6 +19,7 @@ const ( RegisterRequestType MessageType = iota DeregisterRequestType KVSRequestType + SessionRequestType ) const ( @@ -348,6 +349,25 @@ type Session struct { CreateIndex uint64 } +type SessionOp string + +const ( + SessionCreate SessionOp = "create" + SessionDestroy = "destroy" +) + +// SessionRequest is used to operate on sessions +type SessionRequest struct { + Datacenter string + Op SessionOp // Which operation are we performing + Session Session // Which session + WriteRequest +} + +func (r *SessionRequest) RequestDatacenter() string { + return r.Datacenter +} + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { var handle codec.MsgpackHandle