consul: Adding SessionCreate and SessionRestore

This commit is contained in:
Armon Dadgar 2014-05-08 16:54:04 -07:00
parent 26ee0e3c76
commit 836e1b5a10
3 changed files with 152 additions and 17 deletions

View File

@ -734,6 +734,19 @@ func (t *MDBTable) SetLastIndexTxn(tx *MDBTxn, index uint64) error {
return tx.tx.Put(tx.dbis[t.Name], encRowId, encIndex, 0)
}
// SetMaxLastIndexTxn is used to set the last index within a transaction
// if it exceeds the current maximum
func (t *MDBTable) SetMaxLastIndexTxn(tx *MDBTxn, index uint64) error {
current, err := t.LastIndexTxn(tx)
if err != nil {
return err
}
if index > current {
return t.SetLastIndexTxn(tx, index)
}
return nil
}
// StartTxn is used to create a transaction that spans a list of tables
func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) {
var tx *MDBTxn

View File

@ -31,18 +31,18 @@ const (
// implementation uses the Lightning Memory-Mapped Database (MDB).
// This gives us Multi-Version Concurrency Control for "free"
type StateStore struct {
logger *log.Logger
path string
env *mdb.Env
nodeTable *MDBTable
serviceTable *MDBTable
checkTable *MDBTable
kvsTable *MDBTable
sessionTable *MDBTable
sessionChecksTable *MDBTable
tables MDBTables
watch map[*MDBTable]*NotifyGroup
queryTables map[string]MDBTables
logger *log.Logger
path string
env *mdb.Env
nodeTable *MDBTable
serviceTable *MDBTable
checkTable *MDBTable
kvsTable *MDBTable
sessionTable *MDBTable
sessionCheckTable *MDBTable
tables MDBTables
watch map[*MDBTable]*NotifyGroup
queryTables map[string]MDBTables
}
// StateSnapshot is used to provide a point-in-time snapshot
@ -253,7 +253,7 @@ func (s *StateStore) initialize() error {
},
}
s.sessionChecksTable = &MDBTable{
s.sessionCheckTable = &MDBTable{
Name: dbSessionChecks,
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
@ -272,7 +272,7 @@ func (s *StateStore) initialize() error {
// Store the set of tables
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable,
s.kvsTable, s.sessionTable, s.sessionChecksTable}
s.kvsTable, s.sessionTable, s.sessionCheckTable}
for _, table := range s.tables {
table.Env = s.env
table.Encoder = encoder
@ -1091,6 +1091,127 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er
return true, tx.Commit()
}
// SessionCreate is used to create a new session. The
// ID will be populated on a successful return
func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error {
// Assign the create index
session.CreateIndex = index
// Start the transaction
tables := MDBTables{s.nodeTable, s.checkTable,
s.sessionTable, s.sessionCheckTable}
tx, err := tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
// Verify that the node exists
res, err := s.nodeTable.GetTxn(tx, "id", session.Node)
if err != nil {
return err
}
if len(res) == 0 {
return fmt.Errorf("Missing node registration")
}
// Verify that the checks exist and are not critical
for _, checkId := range session.Checks {
res, err := s.checkTable.GetTxn(tx, "id", session.Node, checkId)
if err != nil {
return err
}
if len(res) == 0 {
return fmt.Errorf("Missing check '%s' registration", checkId)
}
chk := res[0].(*structs.HealthCheck)
if chk.Status == structs.HealthCritical {
return fmt.Errorf("Check '%s' is in %s state", checkId, chk.Status)
}
}
// Generate a new session ID, verify uniqueness
session.ID = generateUUID()
for {
res, err = s.sessionTable.GetTxn(tx, "id", session.ID)
if err != nil {
return err
}
// Quit if this ID is unique
if len(res) == 0 {
break
}
}
// Insert the session
if err := s.sessionTable.InsertTxn(tx, session); err != nil {
return err
}
// Insert the check mappings
sCheck := sessionCheck{Node: session.Node, Session: session.ID}
for _, checkID := range session.Checks {
sCheck.CheckID = checkID
if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil {
return err
}
}
// Trigger the update notifications
if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.sessionTable].Notify()
if err := s.sessionCheckTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.sessionCheckTable].Notify()
return tx.Commit()
}
// SessionRestore is used to restore a session. It should only be used when
// doing a restore, otherwise SessionCreate should be used.
func (s *StateStore) SessionRestore(session *structs.Session) error {
// Start the transaction
tables := MDBTables{s.nodeTable, s.checkTable,
s.sessionTable, s.sessionCheckTable}
tx, err := tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
// Insert the session
if err := s.sessionTable.InsertTxn(tx, session); err != nil {
return err
}
// Insert the check mappings
sCheck := sessionCheck{Node: session.Node, Session: session.ID}
for _, checkID := range session.Checks {
sCheck.CheckID = checkID
if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil {
return err
}
}
// Trigger the update notifications
index := session.CreateIndex
if err := s.sessionTable.SetMaxLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.sessionTable].Notify()
if err := s.sessionCheckTable.SetMaxLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.sessionCheckTable].Notify()
return tx.Commit()
}
// Snapshot is used to create a point in time snapshot
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
// Begin a new txn on all tables

View File

@ -338,9 +338,10 @@ type IndexedKeyList struct {
// Session is used to represent an open session in the KV store.
// This issued to associate node checks with acquired locks.
type Session struct {
ID string
Node string
Checks []string
ID string
Node string
Checks []string
CreateIndex uint64
}
// Decode is used to decode a MsgPack encoded object