2020-06-16 13:00:28 -04:00

798 lines
22 KiB
Go

package memdb
import (
"bytes"
"fmt"
"strings"
"sync/atomic"
"unsafe"
iradix "github.com/hashicorp/go-immutable-radix"
)
const (
id = "id"
)
var (
// ErrNotFound is returned when the requested item is not found
ErrNotFound = fmt.Errorf("not found")
)
// tableIndex is a tuple of (Table, Index) used for lookups
type tableIndex struct {
Table string
Index string
}
// Txn is a transaction against a MemDB.
// This can be a read or write transaction.
type Txn struct {
db *MemDB
write bool
rootTxn *iradix.Txn
after []func()
// changes is used to track the changes performed during the transaction. If
// it is nil at transaction start then changes are not tracked.
changes Changes
modified map[tableIndex]*iradix.Txn
}
// TrackChanges enables change tracking for the transaction. If called at any
// point before commit, subsequent mutations will be recorded and can be
// retrieved using ChangeSet. Once this has been called on a transaction it
// can't be unset. As with other Txn methods it's not safe to call this from a
// different goroutine than the one making mutations or committing the
// transaction.
func (txn *Txn) TrackChanges() {
if txn.changes == nil {
txn.changes = make(Changes, 0, 1)
}
}
// readableIndex returns a transaction usable for reading the given
// index in a table. If a write transaction is in progress, we may need
// to use an existing modified txn.
func (txn *Txn) readableIndex(table, index string) *iradix.Txn {
// Look for existing transaction
if txn.write && txn.modified != nil {
key := tableIndex{table, index}
exist, ok := txn.modified[key]
if ok {
return exist
}
}
// Create a read transaction
path := indexPath(table, index)
raw, _ := txn.rootTxn.Get(path)
indexTxn := raw.(*iradix.Tree).Txn()
return indexTxn
}
// writableIndex returns a transaction usable for modifying the
// given index in a table.
func (txn *Txn) writableIndex(table, index string) *iradix.Txn {
if txn.modified == nil {
txn.modified = make(map[tableIndex]*iradix.Txn)
}
// Look for existing transaction
key := tableIndex{table, index}
exist, ok := txn.modified[key]
if ok {
return exist
}
// Start a new transaction
path := indexPath(table, index)
raw, _ := txn.rootTxn.Get(path)
indexTxn := raw.(*iradix.Tree).Txn()
// If we are the primary DB, enable mutation tracking. Snapshots should
// not notify, otherwise we will trigger watches on the primary DB when
// the writes will not be visible.
indexTxn.TrackMutate(txn.db.primary)
// Keep this open for the duration of the txn
txn.modified[key] = indexTxn
return indexTxn
}
// Abort is used to cancel this transaction.
// This is a noop for read transactions.
func (txn *Txn) Abort() {
// Noop for a read transaction
if !txn.write {
return
}
// Check if already aborted or committed
if txn.rootTxn == nil {
return
}
// Clear the txn
txn.rootTxn = nil
txn.modified = nil
txn.changes = nil
// Release the writer lock since this is invalid
txn.db.writer.Unlock()
}
// Commit is used to finalize this transaction.
// This is a noop for read transactions.
func (txn *Txn) Commit() {
// Noop for a read transaction
if !txn.write {
return
}
// Check if already aborted or committed
if txn.rootTxn == nil {
return
}
// Commit each sub-transaction scoped to (table, index)
for key, subTxn := range txn.modified {
path := indexPath(key.Table, key.Index)
final := subTxn.CommitOnly()
txn.rootTxn.Insert(path, final)
}
// Update the root of the DB
newRoot := txn.rootTxn.CommitOnly()
atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot))
// Now issue all of the mutation updates (this is safe to call
// even if mutation tracking isn't enabled); we do this after
// the root pointer is swapped so that waking responders will
// see the new state.
for _, subTxn := range txn.modified {
subTxn.Notify()
}
txn.rootTxn.Notify()
// Clear the txn
txn.rootTxn = nil
txn.modified = nil
// Release the writer lock since this is invalid
txn.db.writer.Unlock()
// Run the deferred functions, if any
for i := len(txn.after); i > 0; i-- {
fn := txn.after[i-1]
fn()
}
}
// Insert is used to add or update an object into the given table
func (txn *Txn) Insert(table string, obj interface{}) error {
if !txn.write {
return fmt.Errorf("cannot insert in read-only transaction")
}
// Get the table schema
tableSchema, ok := txn.db.schema.Tables[table]
if !ok {
return fmt.Errorf("invalid table '%s'", table)
}
// Get the primary ID of the object
idSchema := tableSchema.Indexes[id]
idIndexer := idSchema.Indexer.(SingleIndexer)
ok, idVal, err := idIndexer.FromObject(obj)
if err != nil {
return fmt.Errorf("failed to build primary index: %v", err)
}
if !ok {
return fmt.Errorf("object missing primary index")
}
// Lookup the object by ID first, to see if this is an update
idTxn := txn.writableIndex(table, id)
existing, update := idTxn.Get(idVal)
// On an update, there is an existing object with the given
// primary ID. We do the update by deleting the current object
// and inserting the new object.
for name, indexSchema := range tableSchema.Indexes {
indexTxn := txn.writableIndex(table, name)
// Determine the new index value
var (
ok bool
vals [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var val []byte
ok, val, err = indexer.FromObject(obj)
vals = [][]byte{val}
case MultiIndexer:
ok, vals, err = indexer.FromObject(obj)
}
if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err)
}
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if ok && !indexSchema.Unique {
for i := range vals {
vals[i] = append(vals[i], idVal...)
}
}
// Handle the update by deleting from the index first
if update {
var (
okExist bool
valsExist [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var valExist []byte
okExist, valExist, err = indexer.FromObject(existing)
valsExist = [][]byte{valExist}
case MultiIndexer:
okExist, valsExist, err = indexer.FromObject(existing)
}
if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err)
}
if okExist {
for i, valExist := range valsExist {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
if !indexSchema.Unique {
valExist = append(valExist, idVal...)
}
// If we are writing to the same index with the same value,
// we can avoid the delete as the insert will overwrite the
// value anyways.
if i >= len(vals) || !bytes.Equal(valExist, vals[i]) {
indexTxn.Delete(valExist)
}
}
}
}
// If there is no index value, either this is an error or an expected
// case and we can skip updating
if !ok {
if indexSchema.AllowMissing {
continue
} else {
return fmt.Errorf("missing value for index '%s'", name)
}
}
// Update the value of the index
for _, val := range vals {
indexTxn.Insert(val, obj)
}
}
if txn.changes != nil {
txn.changes = append(txn.changes, Change{
Table: table,
Before: existing, // might be nil on a create
After: obj,
primaryKey: idVal,
})
}
return nil
}
// Delete is used to delete a single object from the given table
// This object must already exist in the table
func (txn *Txn) Delete(table string, obj interface{}) error {
if !txn.write {
return fmt.Errorf("cannot delete in read-only transaction")
}
// Get the table schema
tableSchema, ok := txn.db.schema.Tables[table]
if !ok {
return fmt.Errorf("invalid table '%s'", table)
}
// Get the primary ID of the object
idSchema := tableSchema.Indexes[id]
idIndexer := idSchema.Indexer.(SingleIndexer)
ok, idVal, err := idIndexer.FromObject(obj)
if err != nil {
return fmt.Errorf("failed to build primary index: %v", err)
}
if !ok {
return fmt.Errorf("object missing primary index")
}
// Lookup the object by ID first, check fi we should continue
idTxn := txn.writableIndex(table, id)
existing, ok := idTxn.Get(idVal)
if !ok {
return ErrNotFound
}
// Remove the object from all the indexes
for name, indexSchema := range tableSchema.Indexes {
indexTxn := txn.writableIndex(table, name)
// Handle the update by deleting from the index first
var (
ok bool
vals [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var val []byte
ok, val, err = indexer.FromObject(existing)
vals = [][]byte{val}
case MultiIndexer:
ok, vals, err = indexer.FromObject(existing)
}
if err != nil {
return fmt.Errorf("failed to build index '%s': %v", name, err)
}
if ok {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
for _, val := range vals {
if !indexSchema.Unique {
val = append(val, idVal...)
}
indexTxn.Delete(val)
}
}
}
if txn.changes != nil {
txn.changes = append(txn.changes, Change{
Table: table,
Before: existing,
After: nil, // Now nil indicates deletion
primaryKey: idVal,
})
}
return nil
}
// DeletePrefix is used to delete an entire subtree based on a prefix.
// The given index must be a prefix index, and will be used to perform a scan and enumerate the set of objects to delete.
// These will be removed from all other indexes, and then a special prefix operation will delete the objects from the given index in an efficient subtree delete operation.
// This is useful when you have a very large number of objects indexed by the given index, along with a much smaller number of entries in the other indexes for those objects.
func (txn *Txn) DeletePrefix(table string, prefix_index string, prefix string) (bool, error) {
if !txn.write {
return false, fmt.Errorf("cannot delete in read-only transaction")
}
if !strings.HasSuffix(prefix_index, "_prefix") {
return false, fmt.Errorf("Index name for DeletePrefix must be a prefix index, Got %v ", prefix_index)
}
deletePrefixIndex := strings.TrimSuffix(prefix_index, "_prefix")
// Get an iterator over all of the keys with the given prefix.
entries, err := txn.Get(table, prefix_index, prefix)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
// Get the table schema
tableSchema, ok := txn.db.schema.Tables[table]
if !ok {
return false, fmt.Errorf("invalid table '%s'", table)
}
foundAny := false
for entry := entries.Next(); entry != nil; entry = entries.Next() {
if !foundAny {
foundAny = true
}
// Get the primary ID of the object
idSchema := tableSchema.Indexes[id]
idIndexer := idSchema.Indexer.(SingleIndexer)
ok, idVal, err := idIndexer.FromObject(entry)
if err != nil {
return false, fmt.Errorf("failed to build primary index: %v", err)
}
if !ok {
return false, fmt.Errorf("object missing primary index")
}
if txn.changes != nil {
// Record the deletion
idTxn := txn.writableIndex(table, id)
existing, ok := idTxn.Get(idVal)
if ok {
txn.changes = append(txn.changes, Change{
Table: table,
Before: existing,
After: nil, // Now nil indicates deletion
primaryKey: idVal,
})
}
}
// Remove the object from all the indexes except the given prefix index
for name, indexSchema := range tableSchema.Indexes {
if name == deletePrefixIndex {
continue
}
indexTxn := txn.writableIndex(table, name)
// Handle the update by deleting from the index first
var (
ok bool
vals [][]byte
err error
)
switch indexer := indexSchema.Indexer.(type) {
case SingleIndexer:
var val []byte
ok, val, err = indexer.FromObject(entry)
vals = [][]byte{val}
case MultiIndexer:
ok, vals, err = indexer.FromObject(entry)
}
if err != nil {
return false, fmt.Errorf("failed to build index '%s': %v", name, err)
}
if ok {
// Handle non-unique index by computing a unique index.
// This is done by appending the primary key which must
// be unique anyways.
for _, val := range vals {
if !indexSchema.Unique {
val = append(val, idVal...)
}
indexTxn.Delete(val)
}
}
}
}
if foundAny {
indexTxn := txn.writableIndex(table, deletePrefixIndex)
ok = indexTxn.DeletePrefix([]byte(prefix))
if !ok {
panic(fmt.Errorf("prefix %v matched some entries but DeletePrefix did not delete any ", prefix))
}
return true, nil
}
return false, nil
}
// DeleteAll is used to delete all the objects in a given table
// matching the constraints on the index
func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) {
if !txn.write {
return 0, fmt.Errorf("cannot delete in read-only transaction")
}
// Get all the objects
iter, err := txn.Get(table, index, args...)
if err != nil {
return 0, err
}
// Put them into a slice so there are no safety concerns while actually
// performing the deletes
var objs []interface{}
for {
obj := iter.Next()
if obj == nil {
break
}
objs = append(objs, obj)
}
// Do the deletes
num := 0
for _, obj := range objs {
if err := txn.Delete(table, obj); err != nil {
return num, err
}
num++
}
return num, nil
}
// FirstWatch is used to return the first matching object for
// the given constraints on the index along with the watch channel
func (txn *Txn) FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) {
// Get the index value
indexSchema, val, err := txn.getIndexValue(table, index, args...)
if err != nil {
return nil, nil, err
}
// Get the index itself
indexTxn := txn.readableIndex(table, indexSchema.Name)
// Do an exact lookup
if indexSchema.Unique && val != nil && indexSchema.Name == index {
watch, obj, ok := indexTxn.GetWatch(val)
if !ok {
return watch, nil, nil
}
return watch, obj, nil
}
// Handle non-unique index by using an iterator and getting the first value
iter := indexTxn.Root().Iterator()
watch := iter.SeekPrefixWatch(val)
_, value, _ := iter.Next()
return watch, value, nil
}
// First is used to return the first matching object for
// the given constraints on the index
func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) {
_, val, err := txn.FirstWatch(table, index, args...)
return val, err
}
// LongestPrefix is used to fetch the longest prefix match for the given
// constraints on the index. Note that this will not work with the memdb
// StringFieldIndex because it adds null terminators which prevent the
// algorithm from correctly finding a match (it will get to right before the
// null and fail to find a leaf node). This should only be used where the prefix
// given is capable of matching indexed entries directly, which typically only
// applies to a custom indexer. See the unit test for an example.
func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interface{}, error) {
// Enforce that this only works on prefix indexes.
if !strings.HasSuffix(index, "_prefix") {
return nil, fmt.Errorf("must use '%s_prefix' on index", index)
}
// Get the index value.
indexSchema, val, err := txn.getIndexValue(table, index, args...)
if err != nil {
return nil, err
}
// This algorithm only makes sense against a unique index, otherwise the
// index keys will have the IDs appended to them.
if !indexSchema.Unique {
return nil, fmt.Errorf("index '%s' is not unique", index)
}
// Find the longest prefix match with the given index.
indexTxn := txn.readableIndex(table, indexSchema.Name)
if _, value, ok := indexTxn.Root().LongestPrefix(val); ok {
return value, nil
}
return nil, nil
}
// getIndexValue is used to get the IndexSchema and the value
// used to scan the index given the parameters. This handles prefix based
// scans when the index has the "_prefix" suffix. The index must support
// prefix iteration.
func (txn *Txn) getIndexValue(table, index string, args ...interface{}) (*IndexSchema, []byte, error) {
// Get the table schema
tableSchema, ok := txn.db.schema.Tables[table]
if !ok {
return nil, nil, fmt.Errorf("invalid table '%s'", table)
}
// Check for a prefix scan
prefixScan := false
if strings.HasSuffix(index, "_prefix") {
index = strings.TrimSuffix(index, "_prefix")
prefixScan = true
}
// Get the index schema
indexSchema, ok := tableSchema.Indexes[index]
if !ok {
return nil, nil, fmt.Errorf("invalid index '%s'", index)
}
// Hot-path for when there are no arguments
if len(args) == 0 {
return indexSchema, nil, nil
}
// Special case the prefix scanning
if prefixScan {
prefixIndexer, ok := indexSchema.Indexer.(PrefixIndexer)
if !ok {
return indexSchema, nil,
fmt.Errorf("index '%s' does not support prefix scanning", index)
}
val, err := prefixIndexer.PrefixFromArgs(args...)
if err != nil {
return indexSchema, nil, fmt.Errorf("index error: %v", err)
}
return indexSchema, val, err
}
// Get the exact match index
val, err := indexSchema.Indexer.FromArgs(args...)
if err != nil {
return indexSchema, nil, fmt.Errorf("index error: %v", err)
}
return indexSchema, val, err
}
// ResultIterator is used to iterate over a list of results
// from a Get query on a table.
type ResultIterator interface {
WatchCh() <-chan struct{}
Next() interface{}
}
// Get is used to construct a ResultIterator over all the
// rows that match the given constraints of an index.
func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) {
indexIter, val, err := txn.getIndexIterator(table, index, args...)
if err != nil {
return nil, err
}
// Seek the iterator to the appropriate sub-set
watchCh := indexIter.SeekPrefixWatch(val)
// Create an iterator
iter := &radixIterator{
iter: indexIter,
watchCh: watchCh,
}
return iter, nil
}
// LowerBound is used to construct a ResultIterator over all the the range of
// rows that have an index value greater than or equal to the provide args.
// Calling this then iterating until the rows are larger than required allows
// range scans within an index. It is not possible to watch the resulting
// iterator since the radix tree doesn't efficiently allow watching on lower
// bound changes. The WatchCh returned will be nill and so will block forever.
func (txn *Txn) LowerBound(table, index string, args ...interface{}) (ResultIterator, error) {
indexIter, val, err := txn.getIndexIterator(table, index, args...)
if err != nil {
return nil, err
}
// Seek the iterator to the appropriate sub-set
indexIter.SeekLowerBound(val)
// Create an iterator
iter := &radixIterator{
iter: indexIter,
}
return iter, nil
}
// objectID is a tuple of table name and the raw internal id byte slice
// converted to a string. It's only converted to a string to make it comparable
// so this struct can be used as a map index.
type objectID struct {
Table string
IndexVal string
}
// mutInfo stores metadata about mutations to allow collapsing multiple
// mutations to the same object into one.
type mutInfo struct {
firstBefore interface{}
lastIdx int
}
// Changes returns the set of object changes that have been made in the
// transaction so far. If change tracking is not enabled it wil always return
// nil. It can be called before or after Commit. If it is before Commit it will
// return all changes made so far which may not be the same as the final
// Changes. After abort it will always return nil. As with other Txn methods
// it's not safe to call this from a different goroutine than the one making
// mutations or committing the transaction. Mutations will appear in the order
// they were performed in the transaction but multiple operations to the same
// object will be collapsed so only the effective overall change to that object
// is present. If transaction operations are dependent (e.g. copy object X to Y
// then delete X) this might mean the set of mutations is incomplete to verify
// history, but it is complete in that the net effect is preserved (Y got a new
// value, X got removed).
func (txn *Txn) Changes() Changes {
if txn.changes == nil {
return nil
}
// De-duplicate mutations by key so all take effect at the point of the last
// write but we keep the mutations in order.
dups := make(map[objectID]mutInfo)
for i, m := range txn.changes {
oid := objectID{
Table: m.Table,
IndexVal: string(m.primaryKey),
}
// Store the latest mutation index for each key value
mi, ok := dups[oid]
if !ok {
// First entry for key, store the before value
mi.firstBefore = m.Before
}
mi.lastIdx = i
dups[oid] = mi
}
if len(dups) == len(txn.changes) {
// No duplicates found, fast path return it as is
return txn.changes
}
// Need to remove the duplicates
cs := make(Changes, 0, len(dups))
for i, m := range txn.changes {
oid := objectID{
Table: m.Table,
IndexVal: string(m.primaryKey),
}
mi := dups[oid]
if mi.lastIdx == i {
// This was the latest value for this key copy it with the before value in
// case it's different. Note that m is not a pointer so we are not
// modifying the txn.changeSet here - it's already a copy.
m.Before = mi.firstBefore
cs = append(cs, m)
}
}
// Store the de-duped version in case this is called again
txn.changes = cs
return cs
}
func (txn *Txn) getIndexIterator(table, index string, args ...interface{}) (*iradix.Iterator, []byte, error) {
// Get the index value to scan
indexSchema, val, err := txn.getIndexValue(table, index, args...)
if err != nil {
return nil, nil, err
}
// Get the index itself
indexTxn := txn.readableIndex(table, indexSchema.Name)
indexRoot := indexTxn.Root()
// Get an interator over the index
indexIter := indexRoot.Iterator()
return indexIter, val, nil
}
// Defer is used to push a new arbitrary function onto a stack which
// gets called when a transaction is committed and finished. Deferred
// functions are called in LIFO order, and only invoked at the end of
// write transactions.
func (txn *Txn) Defer(fn func()) {
txn.after = append(txn.after, fn)
}
// radixIterator is used to wrap an underlying iradix iterator.
// This is much more efficient than a sliceIterator as we are not
// materializing the entire view.
type radixIterator struct {
iter *iradix.Iterator
watchCh <-chan struct{}
}
func (r *radixIterator) WatchCh() <-chan struct{} {
return r.watchCh
}
func (r *radixIterator) Next() interface{} {
_, value, ok := r.iter.Next()
if !ok {
return nil
}
return value
}