consul: Refactor txn handling in state store

This commit is contained in:
Armon Dadgar 2014-08-22 12:27:12 -07:00
parent 9e13633af8
commit 40ae65b277
1 changed files with 30 additions and 10 deletions

View File

@ -377,13 +377,20 @@ func (s *StateStore) QueryTables(q string) MDBTables {
// EnsureNode is used to ensure a given node exists, with the provided address // EnsureNode is used to ensure a given node exists, with the provided address
func (s *StateStore) EnsureNode(index uint64, node structs.Node) error { func (s *StateStore) EnsureNode(index uint64, node structs.Node) error {
// Start a new txn
tx, err := s.nodeTable.StartTxn(false, nil) tx, err := s.nodeTable.StartTxn(false, nil)
if err != nil { if err != nil {
return err return err
} }
defer tx.Abort() defer tx.Abort()
if err := s.ensureNodeTxn(index, node, tx); err != nil {
return err
}
return tx.Commit()
}
// ensureNodeTxn is used to ensure a given node exists, with the provided address
// within a given txn
func (s *StateStore) ensureNodeTxn(index uint64, node structs.Node, tx *MDBTxn) error {
if err := s.nodeTable.InsertTxn(tx, node); err != nil { if err := s.nodeTable.InsertTxn(tx, node); err != nil {
return err return err
} }
@ -391,7 +398,7 @@ func (s *StateStore) EnsureNode(index uint64, node structs.Node) error {
return err return err
} }
tx.Defer(func() { s.watch[s.nodeTable].Notify() }) tx.Defer(func() { s.watch[s.nodeTable].Notify() })
return tx.Commit() return nil
} }
// GetNode returns all the address of the known and if it was found // GetNode returns all the address of the known and if it was found
@ -428,7 +435,14 @@ func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeSe
panic(fmt.Errorf("Failed to start txn: %v", err)) panic(fmt.Errorf("Failed to start txn: %v", err))
} }
defer tx.Abort() defer tx.Abort()
if err := s.ensureServiceTxn(index, node, ns, tx); err != nil {
return nil
}
return tx.Commit()
}
// ensureServiceTxn is used to ensure a given node exposes a service in a transaction
func (s *StateStore) ensureServiceTxn(index uint64, node string, ns *structs.NodeService, tx *MDBTxn) error {
// Ensure the node exists // Ensure the node exists
res, err := s.nodeTable.GetTxn(tx, "id", node) res, err := s.nodeTable.GetTxn(tx, "id", node)
if err != nil { if err != nil {
@ -455,7 +469,7 @@ func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeSe
return err return err
} }
tx.Defer(func() { s.watch[s.serviceTable].Notify() }) tx.Defer(func() { s.watch[s.serviceTable].Notify() })
return tx.Commit() return nil
} }
// NodeServices is used to return all the services of a given node // NodeServices is used to return all the services of a given node
@ -699,17 +713,23 @@ func (s *StateStore) parseServiceNodes(tx *MDBTxn, table *MDBTable, res []interf
// EnsureCheck is used to create a check or updates it's state // EnsureCheck is used to create a check or updates it's state
func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error { func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error {
// Ensure we have a status
if check.Status == "" {
check.Status = structs.HealthUnknown
}
// Start the txn
tx, err := s.tables.StartTxn(false) tx, err := s.tables.StartTxn(false)
if err != nil { if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err)) panic(fmt.Errorf("Failed to start txn: %v", err))
} }
defer tx.Abort() defer tx.Abort()
if err := s.ensureCheckTxn(index, check, tx); err != nil {
return err
}
return tx.Commit()
}
// ensureCheckTxn is used to create a check or updates it's state in a transaction
func (s *StateStore) ensureCheckTxn(index uint64, check *structs.HealthCheck, tx *MDBTxn) error {
// Ensure we have a status
if check.Status == "" {
check.Status = structs.HealthUnknown
}
// Ensure the node exists // Ensure the node exists
res, err := s.nodeTable.GetTxn(tx, "id", check.Node) res, err := s.nodeTable.GetTxn(tx, "id", check.Node)
@ -750,7 +770,7 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error
return err return err
} }
tx.Defer(func() { s.watch[s.checkTable].Notify() }) tx.Defer(func() { s.watch[s.checkTable].Notify() })
return tx.Commit() return nil
} }
// DeleteNodeCheck is used to delete a node health check // DeleteNodeCheck is used to delete a node health check