consul: store the peerstore

This commit is contained in:
Armon Dadgar 2013-12-09 15:29:01 -08:00
parent 500927cbb1
commit dbc9eeb8a7
1 changed files with 11 additions and 9 deletions

View File

@ -46,6 +46,7 @@ type Server struct {
// DC to protect operations that require strong consistency // DC to protect operations that require strong consistency
raft *raft.Raft raft *raft.Raft
raftLayer *RaftLayer raftLayer *RaftLayer
raftPeers raft.PeerStore
raftStore *raft.SQLiteStore raftStore *raft.SQLiteStore
raftTransport *raft.NetworkTransport raftTransport *raft.NetworkTransport
@ -104,6 +105,12 @@ func NewServer(config *Config) (*Server, error) {
return nil, fmt.Errorf("Failed to start RPC layer: %v", err) return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
} }
// Initialize the Raft server
if err := s.setupRaft(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
// Start the Serf listeners to prevent a deadlock // Start the Serf listeners to prevent a deadlock
go s.lanEventHandler() go s.lanEventHandler()
go s.wanEventHandler() go s.wanEventHandler()
@ -125,12 +132,6 @@ func NewServer(config *Config) (*Server, error) {
return nil, fmt.Errorf("Failed to start wan serf: %v", err) return nil, fmt.Errorf("Failed to start wan serf: %v", err)
} }
// Initialize the Raft server
if err := s.setupRaft(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
return s, nil return s, nil
} }
@ -183,14 +184,14 @@ func (s *Server) setupRaft() error {
s.raftTransport = trans s.raftTransport = trans
// Setup the peer store // Setup the peer store
peers := raft.NewJSONPeers(path, trans) s.raftPeers = raft.NewJSONPeers(path, trans)
// Create the FSM // Create the FSM
s.fsm = &consulFSM{server: s} s.fsm = &consulFSM{server: s}
// Setup the Raft store // Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, store, store, snapshots, s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, store, store,
peers, trans) snapshots, s.raftPeers, trans)
if err != nil { if err != nil {
store.Close() store.Close()
trans.Close() trans.Close()
@ -202,6 +203,7 @@ func (s *Server) setupRaft() error {
// setupRPC is used to setup the RPC listener // setupRPC is used to setup the RPC listener
func (s *Server) setupRPC() error { func (s *Server) setupRPC() error {
// Register the handlers // Register the handlers
s.rpcServer.Register(&Status{server: s})
s.rpcServer.Register(&Raft{server: s}) s.rpcServer.Register(&Raft{server: s})
list, err := net.Listen("tcp", s.config.RPCAddr) list, err := net.Listen("tcp", s.config.RPCAddr)