From e3ddf8fa00cefdd526276561d978d66bbe38c4e8 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 8 Jan 2014 10:27:37 -0800 Subject: [PATCH] Support multi-table transactions with MDBTable --- consul/mdb_table.go | 146 ++++++++++++++++++++++++++++++-------------- 1 file changed, 101 insertions(+), 45 deletions(-) diff --git a/consul/mdb_table.go b/consul/mdb_table.go index fb03fc6ce4..e1dea2f6fb 100644 --- a/consul/mdb_table.go +++ b/consul/mdb_table.go @@ -17,8 +17,7 @@ var ( /* An MDB table is a logical representation of a table, which is a generic row store. It provides a simple mechanism to store rows - using a "row id", but then accesses can be done using any number - of named indexes + using a row id, while maintaining any number of secondary indexes. */ type MDBTable struct { lastRowID uint64 // Last used rowID @@ -29,6 +28,9 @@ type MDBTable struct { Decoder func([]byte) interface{} } +// MDBTables is used for when we have a collection of tables +type MDBTables []*MDBTable + // An Index is named, and uses a series of column values to // map to the row-id containing the table type MDBIndex struct { @@ -51,7 +53,9 @@ type MDBTxn struct { // Abort is used to close the transaction func (t *MDBTxn) Abort() { - t.tx.Abort() + if t != nil && t.tx != nil { + t.tx.Abort() + } } // Commit is used to commit a transaction @@ -60,14 +64,18 @@ func (t *MDBTxn) Commit() error { } type RowID uint64 -type IndexFunc func([]string) string +type IndexFunc func(*MDBIndex, []string) string // DefaultIndexFunc is used if no IdxFunc is provided. It joins // the columns using '||' which is reasonably unlikely to occur. // We also prefix with a byte to ensure we never have a zero length // key -func DefaultIndexFunc(parts []string) string { - return "_" + strings.Join(parts, "||") +func DefaultIndexFunc(idx *MDBIndex, parts []string) string { + if len(parts) == 0 { + return "_" + } + prefix := "_" + strings.Join(parts, "||") + "||" + return prefix } // Init is used to initialize the MDBTable and ensure it's ready @@ -129,7 +137,7 @@ func (t *MDBTable) createTable() error { // restoreLastRowID is used to set the last rowID that we've used func (t *MDBTable) restoreLastRowID() error { - tx, err := t.StartTxn(true) + tx, err := t.StartTxn(true, nil) if err != nil { return err } @@ -159,23 +167,35 @@ func (t *MDBTable) nextRowID() uint64 { } // startTxn is used to start a transaction -func (t *MDBTable) StartTxn(readonly bool) (*MDBTxn, error) { +func (t *MDBTable) StartTxn(readonly bool, mdbTxn *MDBTxn) (*MDBTxn, error) { var txFlags uint = 0 + var tx *mdb.Txn + var err error + + // Ensure the modes agree + if mdbTxn != nil { + if mdbTxn.readonly != readonly { + return nil, fmt.Errorf("Cannot mix read/write transactions") + } + tx = mdbTxn.tx + goto EXTEND + } + if readonly { txFlags |= mdb.RDONLY } - tx, err := t.Env.BeginTxn(nil, txFlags) + tx, err = t.Env.BeginTxn(nil, txFlags) if err != nil { return nil, err } - mdbTxn := &MDBTxn{ + mdbTxn = &MDBTxn{ readonly: readonly, tx: tx, dbis: make(map[string]mdb.DBI), } - +EXTEND: dbi, err := tx.DBIOpen(t.Name, 0) if err != nil { tx.Abort() @@ -211,6 +231,22 @@ func (t *MDBTable) objIndexKeys(obj interface{}) (map[string][]byte, error) { // Insert is used to insert or update an object func (t *MDBTable) Insert(obj interface{}) error { + // Start a new txn + tx, err := t.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + if err := t.InsertTxn(tx, obj); err != nil { + return err + } + return tx.Commit() +} + +// Insert is used to insert or update an object within +// a given transaction +func (t *MDBTable) InsertTxn(tx *MDBTxn, obj interface{}) error { var n int // Construct the indexes keys indexes, err := t.objIndexKeys(obj) @@ -221,13 +257,6 @@ func (t *MDBTable) Insert(obj interface{}) error { // Encode the obj raw := t.Encoder(obj) - // Start a new txn - tx, err := t.StartTxn(false) - if err != nil { - return err - } - defer tx.Abort() - // Scan and check if this primary key already exists primaryDbi := tx.dbis[t.Indexes["id"].dbiName] _, err = tx.tx.Get(primaryDbi, indexes["id"]) @@ -235,7 +264,7 @@ func (t *MDBTable) Insert(obj interface{}) error { goto AFTER_DELETE } - // Delete the existing row{ + // Delete the existing row n, err = t.deleteWithIndex(tx, t.Indexes["id"], indexes["id"]) if err != nil { return err @@ -260,26 +289,30 @@ AFTER_DELETE: return err } } - - return tx.Commit() + return nil } // Get is used to lookup one or more rows. An index an appropriate // fields are specified. The fields can be a prefix of the index. func (t *MDBTable) Get(index string, parts ...string) ([]interface{}, error) { + // Start a readonly txn + tx, err := t.StartTxn(true, nil) + if err != nil { + return nil, err + } + defer tx.Abort() + return t.GetTxn(tx, index, parts...) +} + +// GetTxn is like Get but it operates within a specific transaction. +// This can be used for read that span multiple tables +func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interface{}, error) { // Get the associated index idx, key, err := t.getIndex(index, parts) if err != nil { return nil, err } - // Start a readonly txn - tx, err := t.StartTxn(true) - if err != nil { - return nil, err - } - defer tx.Abort() - // Accumulate the results var results []interface{} err = idx.iterate(tx, key, func(encRowId, res []byte) bool { @@ -306,7 +339,7 @@ func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, er } // Construct the key - key := []byte(idx.IdxFunc(parts)) + key := idx.keyFromParts(parts...) return idx, key, nil } @@ -314,27 +347,31 @@ func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, er // fields are specified. The fields can be a prefix of the index. // Returns the rows deleted or an error. func (t *MDBTable) Delete(index string, parts ...string) (num int, err error) { + // Start a write txn + tx, err := t.StartTxn(false, nil) + if err != nil { + return 0, err + } + defer tx.Abort() + + num, err = t.DeleteTxn(tx, index, parts...) + if err != nil { + return 0, err + } + return num, tx.Commit() +} + +// DeleteTxn is like Delete, but occurs in a specific transaction +// that can span multiple tables. +func (t *MDBTable) DeleteTxn(tx *MDBTxn, index string, parts ...string) (int, error) { // Get the associated index idx, key, err := t.getIndex(index, parts) if err != nil { return 0, err } - // Start a write txn - tx, err := t.StartTxn(false) - if err != nil { - return 0, err - } - defer tx.Abort() - // Delete with the index - num, err = t.deleteWithIndex(tx, idx, key) - if err != nil { - return 0, err - } - - // Attempt a commit - return num, tx.Commit() + return t.deleteWithIndex(tx, idx, key) } // deleteWithIndex deletes all associated rows while scanning @@ -451,8 +488,13 @@ func (i *MDBIndex) keyFromObject(obj interface{}) ([]byte, error) { } parts = append(parts, val) } - key := i.IdxFunc(parts) - return []byte(key), nil + key := i.keyFromParts(parts...) + return key, nil +} + +// keyFromParts returns the key from component parts +func (i *MDBIndex) keyFromParts(parts ...string) []byte { + return []byte(i.IdxFunc(i, parts)) } // iterate is used to iterate over keys matching the prefix, @@ -512,3 +554,17 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, } return nil } + +// StartTxn is used to create a transaction that spans a list of tables +func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) { + var tx *MDBTxn + for _, table := range t { + newTx, err := table.StartTxn(readonly, tx) + if err != nil { + tx.Abort() + return nil, err + } + tx = newTx + } + return tx, nil +}