Support multi-table transactions with MDBTable

This commit is contained in:
Armon Dadgar 2014-01-08 10:27:37 -08:00
parent ee81a5744a
commit e3ddf8fa00
1 changed files with 101 additions and 45 deletions

View File

@ -17,8 +17,7 @@ var (
/*
An MDB table is a logical representation of a table, which is a
generic row store. It provides a simple mechanism to store rows
using a "row id", but then accesses can be done using any number
of named indexes
using a row id, while maintaining any number of secondary indexes.
*/
type MDBTable struct {
lastRowID uint64 // Last used rowID
@ -29,6 +28,9 @@ type MDBTable struct {
Decoder func([]byte) interface{}
}
// MDBTables is used for when we have a collection of tables
type MDBTables []*MDBTable
// An Index is named, and uses a series of column values to
// map to the row-id containing the table
type MDBIndex struct {
@ -51,7 +53,9 @@ type MDBTxn struct {
// Abort is used to close the transaction
func (t *MDBTxn) Abort() {
t.tx.Abort()
if t != nil && t.tx != nil {
t.tx.Abort()
}
}
// Commit is used to commit a transaction
@ -60,14 +64,18 @@ func (t *MDBTxn) Commit() error {
}
type RowID uint64
type IndexFunc func([]string) string
type IndexFunc func(*MDBIndex, []string) string
// DefaultIndexFunc is used if no IdxFunc is provided. It joins
// the columns using '||' which is reasonably unlikely to occur.
// We also prefix with a byte to ensure we never have a zero length
// key
func DefaultIndexFunc(parts []string) string {
return "_" + strings.Join(parts, "||")
func DefaultIndexFunc(idx *MDBIndex, parts []string) string {
if len(parts) == 0 {
return "_"
}
prefix := "_" + strings.Join(parts, "||") + "||"
return prefix
}
// Init is used to initialize the MDBTable and ensure it's ready
@ -129,7 +137,7 @@ func (t *MDBTable) createTable() error {
// restoreLastRowID is used to set the last rowID that we've used
func (t *MDBTable) restoreLastRowID() error {
tx, err := t.StartTxn(true)
tx, err := t.StartTxn(true, nil)
if err != nil {
return err
}
@ -159,23 +167,35 @@ func (t *MDBTable) nextRowID() uint64 {
}
// startTxn is used to start a transaction
func (t *MDBTable) StartTxn(readonly bool) (*MDBTxn, error) {
func (t *MDBTable) StartTxn(readonly bool, mdbTxn *MDBTxn) (*MDBTxn, error) {
var txFlags uint = 0
var tx *mdb.Txn
var err error
// Ensure the modes agree
if mdbTxn != nil {
if mdbTxn.readonly != readonly {
return nil, fmt.Errorf("Cannot mix read/write transactions")
}
tx = mdbTxn.tx
goto EXTEND
}
if readonly {
txFlags |= mdb.RDONLY
}
tx, err := t.Env.BeginTxn(nil, txFlags)
tx, err = t.Env.BeginTxn(nil, txFlags)
if err != nil {
return nil, err
}
mdbTxn := &MDBTxn{
mdbTxn = &MDBTxn{
readonly: readonly,
tx: tx,
dbis: make(map[string]mdb.DBI),
}
EXTEND:
dbi, err := tx.DBIOpen(t.Name, 0)
if err != nil {
tx.Abort()
@ -211,6 +231,22 @@ func (t *MDBTable) objIndexKeys(obj interface{}) (map[string][]byte, error) {
// Insert is used to insert or update an object
func (t *MDBTable) Insert(obj interface{}) error {
// Start a new txn
tx, err := t.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
if err := t.InsertTxn(tx, obj); err != nil {
return err
}
return tx.Commit()
}
// Insert is used to insert or update an object within
// a given transaction
func (t *MDBTable) InsertTxn(tx *MDBTxn, obj interface{}) error {
var n int
// Construct the indexes keys
indexes, err := t.objIndexKeys(obj)
@ -221,13 +257,6 @@ func (t *MDBTable) Insert(obj interface{}) error {
// Encode the obj
raw := t.Encoder(obj)
// Start a new txn
tx, err := t.StartTxn(false)
if err != nil {
return err
}
defer tx.Abort()
// Scan and check if this primary key already exists
primaryDbi := tx.dbis[t.Indexes["id"].dbiName]
_, err = tx.tx.Get(primaryDbi, indexes["id"])
@ -235,7 +264,7 @@ func (t *MDBTable) Insert(obj interface{}) error {
goto AFTER_DELETE
}
// Delete the existing row{
// Delete the existing row
n, err = t.deleteWithIndex(tx, t.Indexes["id"], indexes["id"])
if err != nil {
return err
@ -260,26 +289,30 @@ AFTER_DELETE:
return err
}
}
return tx.Commit()
return nil
}
// Get is used to lookup one or more rows. An index an appropriate
// fields are specified. The fields can be a prefix of the index.
func (t *MDBTable) Get(index string, parts ...string) ([]interface{}, error) {
// Start a readonly txn
tx, err := t.StartTxn(true, nil)
if err != nil {
return nil, err
}
defer tx.Abort()
return t.GetTxn(tx, index, parts...)
}
// GetTxn is like Get but it operates within a specific transaction.
// This can be used for read that span multiple tables
func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interface{}, error) {
// Get the associated index
idx, key, err := t.getIndex(index, parts)
if err != nil {
return nil, err
}
// Start a readonly txn
tx, err := t.StartTxn(true)
if err != nil {
return nil, err
}
defer tx.Abort()
// Accumulate the results
var results []interface{}
err = idx.iterate(tx, key, func(encRowId, res []byte) bool {
@ -306,7 +339,7 @@ func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, er
}
// Construct the key
key := []byte(idx.IdxFunc(parts))
key := idx.keyFromParts(parts...)
return idx, key, nil
}
@ -314,27 +347,31 @@ func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, er
// fields are specified. The fields can be a prefix of the index.
// Returns the rows deleted or an error.
func (t *MDBTable) Delete(index string, parts ...string) (num int, err error) {
// Start a write txn
tx, err := t.StartTxn(false, nil)
if err != nil {
return 0, err
}
defer tx.Abort()
num, err = t.DeleteTxn(tx, index, parts...)
if err != nil {
return 0, err
}
return num, tx.Commit()
}
// DeleteTxn is like Delete, but occurs in a specific transaction
// that can span multiple tables.
func (t *MDBTable) DeleteTxn(tx *MDBTxn, index string, parts ...string) (int, error) {
// Get the associated index
idx, key, err := t.getIndex(index, parts)
if err != nil {
return 0, err
}
// Start a write txn
tx, err := t.StartTxn(false)
if err != nil {
return 0, err
}
defer tx.Abort()
// Delete with the index
num, err = t.deleteWithIndex(tx, idx, key)
if err != nil {
return 0, err
}
// Attempt a commit
return num, tx.Commit()
return t.deleteWithIndex(tx, idx, key)
}
// deleteWithIndex deletes all associated rows while scanning
@ -451,8 +488,13 @@ func (i *MDBIndex) keyFromObject(obj interface{}) ([]byte, error) {
}
parts = append(parts, val)
}
key := i.IdxFunc(parts)
return []byte(key), nil
key := i.keyFromParts(parts...)
return key, nil
}
// keyFromParts returns the key from component parts
func (i *MDBIndex) keyFromParts(parts ...string) []byte {
return []byte(i.IdxFunc(i, parts))
}
// iterate is used to iterate over keys matching the prefix,
@ -512,3 +554,17 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
}
return nil
}
// StartTxn is used to create a transaction that spans a list of tables
func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) {
var tx *MDBTxn
for _, table := range t {
newTx, err := table.StartTxn(readonly, tx)
if err != nil {
tx.Abort()
return nil, err
}
tx = newTx
}
return tx, nil
}