consul: working on fsm state

This commit is contained in:
Armon Dadgar 2013-12-10 17:00:48 -08:00
parent 637137b997
commit 148607ac88
3 changed files with 111 additions and 7 deletions

View File

@ -6,11 +6,10 @@ import (
)
// consulFSM implements a finite state machine that is used
// along with Raft to provide strong consistency for various
// data that requires it. We implement this outside the Server
// to avoid exposing this outside the package.
// along with Raft to provide strong consistency. We implement
// this outside the Server to avoid exposing this outside the package.
type consulFSM struct {
server *Server
state *StateStore
}
// consulSnapshot is used to provide a snapshot of the current
@ -20,6 +19,19 @@ type consulSnapshot struct {
fsm *consulFSM
}
// NewFSM is used to construct a new FSM with a blank state
func NewFSM() (*consulFSM, error) {
state, err := NewStateStore()
if err != nil {
return nil, err
}
fsm := &consulFSM{
state: state,
}
return fsm, nil
}
func (c *consulFSM) Apply([]byte) interface{} {
return nil
}

View File

@ -166,6 +166,13 @@ func (s *Server) setupRaft() error {
return err
}
// Create the FSM
var err error
s.fsm, err = NewFSM()
if err != nil {
return err
}
// Create the SQLite store for logs and stable storage
store, err := raft.NewSQLiteStore(path)
if err != nil {
@ -187,9 +194,6 @@ func (s *Server) setupRaft() error {
// Setup the peer store
s.raftPeers = raft.NewJSONPeers(path, trans)
// Create the FSM
s.fsm = &consulFSM{server: s}
// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, store, store,
snapshots, s.raftPeers, trans)

88
consul/state_store.go Normal file
View File

@ -0,0 +1,88 @@
package consul
import (
"database/sql"
"fmt"
_ "github.com/mattn/go-sqlite3"
)
type namedQuery uint8
const (
queryInit namedQuery = iota
)
// The StateStore is responsible for maintaining all the Consul
// state. It is manipulated by the FSM which maintains consistency
// through the use of Raft. The goals of the StateStore are to provide
// high concurrency for read operations without blocking writes, and
// to provide write availability in the face of reads. The current
// implementation uses an in-memory SQLite database. This reduced the
// GC pressure on Go, and also gives us Multi-Version Concurrency Control
// for "free".
type StateStore struct {
db *sql.DB
prepared map[namedQuery]*sql.Stmt
}
// NewStateStore is used to create a new state store
func NewStateStore() (*StateStore, error) {
// Open the db
db, err := sql.Open("sqlite3", ":memory:")
if err != nil {
return nil, fmt.Errorf("failed to open db: %v", err)
}
s := &StateStore{
db: db,
prepared: make(map[namedQuery]*sql.Stmt),
}
// Ensure we can initialize
if err := s.initialize(); err != nil {
db.Close()
return nil, err
}
return s, nil
}
// Close is used to safely shutdown the state store
func (s *StateStore) Close() error {
return s.db.Close()
}
// initialize is used to setup the sqlite store for use
func (s *StateStore) initialize() error {
// Set the pragma first
pragmas := []string{
"pragma journal_mode=memory;",
}
for _, p := range pragmas {
if _, err := s.db.Exec(p); err != nil {
return fmt.Errorf("Failed to set '%s': %v", p, err)
}
}
// Create the tables
tables := []string{
`CREATE TABLE foo (idx integer primary key);`,
}
for _, t := range tables {
if _, err := s.db.Exec(t); err != nil {
return fmt.Errorf("Failed to call '%s': %v", t, err)
}
}
// Prepare the queries
queries := map[namedQuery]string{
queryInit: "SELECT 1",
}
for name, query := range queries {
stmt, err := s.db.Prepare(query)
if err != nil {
return fmt.Errorf("Failed to prepare '%s': %v", query, err)
}
s.prepared[name] = stmt
}
return nil
}