mirror of https://github.com/status-im/consul.git
Govendor update go-memdb and go-immutable-radix to pick up changes for DeletePrefix
This commit is contained in:
parent
36acf8d6a4
commit
b841c99b87
|
@ -183,6 +183,31 @@ func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node {
|
|||
return nc
|
||||
}
|
||||
|
||||
// Visit all the nodes in the tree under n, and add their mutateChannels to the transaction
|
||||
// Returns the size of the subtree visited
|
||||
func (t *Txn) trackChannelsAndCount(n *Node) int {
|
||||
// Count only leaf nodes
|
||||
leaves := 0
|
||||
if n.leaf != nil {
|
||||
leaves = 1
|
||||
}
|
||||
// Mark this node as being mutated.
|
||||
if t.trackMutate {
|
||||
t.trackChannel(n.mutateCh)
|
||||
}
|
||||
|
||||
// Mark its leaf as being mutated, if appropriate.
|
||||
if t.trackMutate && n.leaf != nil {
|
||||
t.trackChannel(n.leaf.mutateCh)
|
||||
}
|
||||
|
||||
// Recurse on the children
|
||||
for _, e := range n.edges {
|
||||
leaves += t.trackChannelsAndCount(e.node)
|
||||
}
|
||||
return leaves
|
||||
}
|
||||
|
||||
// mergeChild is called to collapse the given node with its child. This is only
|
||||
// called when the given node is not a leaf and has a single edge.
|
||||
func (t *Txn) mergeChild(n *Node) {
|
||||
|
@ -357,6 +382,56 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
|
|||
return nc, leaf
|
||||
}
|
||||
|
||||
// delete does a recursive deletion
|
||||
func (t *Txn) deletePrefix(parent, n *Node, search []byte) (*Node, int) {
|
||||
// Check for key exhaustion
|
||||
if len(search) == 0 {
|
||||
nc := t.writeNode(n, true)
|
||||
if n.isLeaf() {
|
||||
nc.leaf = nil
|
||||
}
|
||||
nc.edges = nil
|
||||
return nc, t.trackChannelsAndCount(n)
|
||||
}
|
||||
|
||||
// Look for an edge
|
||||
label := search[0]
|
||||
idx, child := n.getEdge(label)
|
||||
// We make sure that either the child node's prefix starts with the search term, or the search term starts with the child node's prefix
|
||||
// Need to do both so that we can delete prefixes that don't correspond to any node in the tree
|
||||
if child == nil || (!bytes.HasPrefix(child.prefix, search) && !bytes.HasPrefix(search, child.prefix)) {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
// Consume the search prefix
|
||||
if len(child.prefix) > len(search) {
|
||||
search = []byte("")
|
||||
} else {
|
||||
search = search[len(child.prefix):]
|
||||
}
|
||||
newChild, numDeletions := t.deletePrefix(n, child, search)
|
||||
if newChild == nil {
|
||||
return nil, 0
|
||||
}
|
||||
// Copy this node. WATCH OUT - it's safe to pass "false" here because we
|
||||
// will only ADD a leaf via nc.mergeChild() if there isn't one due to
|
||||
// the !nc.isLeaf() check in the logic just below. This is pretty subtle,
|
||||
// so be careful if you change any of the logic here.
|
||||
|
||||
nc := t.writeNode(n, false)
|
||||
|
||||
// Delete the edge if the node has no edges
|
||||
if newChild.leaf == nil && len(newChild.edges) == 0 {
|
||||
nc.delEdge(label)
|
||||
if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
|
||||
t.mergeChild(nc)
|
||||
}
|
||||
} else {
|
||||
nc.edges[idx].node = newChild
|
||||
}
|
||||
return nc, numDeletions
|
||||
}
|
||||
|
||||
// Insert is used to add or update a given key. The return provides
|
||||
// the previous value and a bool indicating if any was set.
|
||||
func (t *Txn) Insert(k []byte, v interface{}) (interface{}, bool) {
|
||||
|
@ -384,6 +459,19 @@ func (t *Txn) Delete(k []byte) (interface{}, bool) {
|
|||
return nil, false
|
||||
}
|
||||
|
||||
// DeletePrefix is used to delete an entire subtree that matches the prefix
|
||||
// This will delete all nodes under that prefix
|
||||
func (t *Txn) DeletePrefix(prefix []byte) bool {
|
||||
newRoot, numDeletions := t.deletePrefix(nil, t.root, prefix)
|
||||
if newRoot != nil {
|
||||
t.root = newRoot
|
||||
t.size = t.size - numDeletions
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
||||
}
|
||||
|
||||
// Root returns the current root of the radix tree within this
|
||||
// transaction. The root is not safe across insert and delete operations,
|
||||
// but can be used to read the current state during a transaction.
|
||||
|
@ -524,6 +612,14 @@ func (t *Tree) Delete(k []byte) (*Tree, interface{}, bool) {
|
|||
return txn.Commit(), old, ok
|
||||
}
|
||||
|
||||
// DeletePrefix is used to delete all nodes starting with a given prefix. Returns the new tree,
|
||||
// and a bool indicating if the prefix matched any nodes
|
||||
func (t *Tree) DeletePrefix(k []byte) (*Tree, bool) {
|
||||
txn := t.Txn()
|
||||
ok := txn.DeletePrefix(k)
|
||||
return txn.Commit(), ok
|
||||
}
|
||||
|
||||
// Root returns the root node of the tree which can be used for richer
|
||||
// query operations.
|
||||
func (t *Tree) Root() *Node {
|
||||
|
|
|
@ -21,6 +21,11 @@ The database provides the following:
|
|||
a single field index, or more advanced compound field indexes. Certain types like
|
||||
UUID can be efficiently compressed from strings into byte indexes for reduced
|
||||
storage requirements.
|
||||
|
||||
* Watches - Callers can populate a watch set as part of a query, which can be used to
|
||||
detect when a modification has been made to the database which affects the query
|
||||
results. This lets callers easily watch for changes in the database in a very general
|
||||
way.
|
||||
|
||||
For the underlying immutable radix trees, see [go-immutable-radix](https://github.com/hashicorp/go-immutable-radix).
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package memdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
@ -249,6 +250,79 @@ func (s *StringMapFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
|
|||
return []byte(key), nil
|
||||
}
|
||||
|
||||
// UintFieldIndex is used to extract a uint field from an object using
|
||||
// reflection and builds an index on that field.
|
||||
type UintFieldIndex struct {
|
||||
Field string
|
||||
}
|
||||
|
||||
func (u *UintFieldIndex) FromObject(obj interface{}) (bool, []byte, error) {
|
||||
v := reflect.ValueOf(obj)
|
||||
v = reflect.Indirect(v) // Dereference the pointer if any
|
||||
|
||||
fv := v.FieldByName(u.Field)
|
||||
if !fv.IsValid() {
|
||||
return false, nil,
|
||||
fmt.Errorf("field '%s' for %#v is invalid", u.Field, obj)
|
||||
}
|
||||
|
||||
// Check the type
|
||||
k := fv.Kind()
|
||||
size, ok := IsUintType(k)
|
||||
if !ok {
|
||||
return false, nil, fmt.Errorf("field %q is of type %v; want a uint", u.Field, k)
|
||||
}
|
||||
|
||||
// Get the value and encode it
|
||||
val := fv.Uint()
|
||||
buf := make([]byte, size)
|
||||
binary.PutUvarint(buf, val)
|
||||
|
||||
return true, buf, nil
|
||||
}
|
||||
|
||||
func (u *UintFieldIndex) FromArgs(args ...interface{}) ([]byte, error) {
|
||||
if len(args) != 1 {
|
||||
return nil, fmt.Errorf("must provide only a single argument")
|
||||
}
|
||||
|
||||
v := reflect.ValueOf(args[0])
|
||||
if !v.IsValid() {
|
||||
return nil, fmt.Errorf("%#v is invalid", args[0])
|
||||
}
|
||||
|
||||
k := v.Kind()
|
||||
size, ok := IsUintType(k)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("arg is of type %v; want a uint", k)
|
||||
}
|
||||
|
||||
val := v.Uint()
|
||||
buf := make([]byte, size)
|
||||
binary.PutUvarint(buf, val)
|
||||
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
// IsUintType returns whether the passed type is a type of uint and the number
|
||||
// of bytes needed to encode the type.
|
||||
func IsUintType(k reflect.Kind) (size int, okay bool) {
|
||||
switch k {
|
||||
case reflect.Uint:
|
||||
return binary.MaxVarintLen64, true
|
||||
case reflect.Uint8:
|
||||
return 2, true
|
||||
case reflect.Uint16:
|
||||
return binary.MaxVarintLen16, true
|
||||
case reflect.Uint32:
|
||||
return binary.MaxVarintLen32, true
|
||||
case reflect.Uint64:
|
||||
return binary.MaxVarintLen64, true
|
||||
default:
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
|
||||
// UUIDFieldIndex is used to extract a field from an object
|
||||
// using reflection and builds an index on that field by treating
|
||||
// it as a UUID. This is an optimization to using a StringFieldIndex
|
||||
|
|
|
@ -13,8 +13,8 @@ import (
|
|||
// on values. The database makes use of immutable radix trees to provide
|
||||
// transactions and MVCC.
|
||||
type MemDB struct {
|
||||
schema *DBSchema
|
||||
root unsafe.Pointer // *iradix.Tree underneath
|
||||
schema *DBSchema
|
||||
root unsafe.Pointer // *iradix.Tree underneath
|
||||
primary bool
|
||||
|
||||
// There can only be a single writter at once
|
||||
|
@ -30,8 +30,8 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) {
|
|||
|
||||
// Create the MemDB
|
||||
db := &MemDB{
|
||||
schema: schema,
|
||||
root: unsafe.Pointer(iradix.New()),
|
||||
schema: schema,
|
||||
root: unsafe.Pointer(iradix.New()),
|
||||
primary: true,
|
||||
}
|
||||
if err := db.initialize(); err != nil {
|
||||
|
@ -65,8 +65,8 @@ func (db *MemDB) Txn(write bool) *Txn {
|
|||
// operations to the existing DB.
|
||||
func (db *MemDB) Snapshot() *MemDB {
|
||||
clone := &MemDB{
|
||||
schema: db.schema,
|
||||
root: unsafe.Pointer(db.getRoot()),
|
||||
schema: db.schema,
|
||||
root: unsafe.Pointer(db.getRoot()),
|
||||
primary: false,
|
||||
}
|
||||
return clone
|
||||
|
|
|
@ -117,14 +117,23 @@ func (txn *Txn) Commit() {
|
|||
// Commit each sub-transaction scoped to (table, index)
|
||||
for key, subTxn := range txn.modified {
|
||||
path := indexPath(key.Table, key.Index)
|
||||
final := subTxn.Commit()
|
||||
final := subTxn.CommitOnly()
|
||||
txn.rootTxn.Insert(path, final)
|
||||
}
|
||||
|
||||
// Update the root of the DB
|
||||
newRoot := txn.rootTxn.Commit()
|
||||
newRoot := txn.rootTxn.CommitOnly()
|
||||
atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot))
|
||||
|
||||
// Now issue all of the mutation updates (this is safe to call
|
||||
// even if mutation tracking isn't enabled); we do this after
|
||||
// the root pointer is swapped so that waking responders will
|
||||
// see the new state.
|
||||
for _, subTxn := range txn.modified {
|
||||
subTxn.Notify()
|
||||
}
|
||||
txn.rootTxn.Notify()
|
||||
|
||||
// Clear the txn
|
||||
txn.rootTxn = nil
|
||||
txn.modified = nil
|
||||
|
@ -321,6 +330,96 @@ func (txn *Txn) Delete(table string, obj interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// DeletePrefix is used to delete an entire subtree based on a prefix.
|
||||
// The given index must be a prefix index, and will be used to perform a scan and enumerate the set of objects to delete.
|
||||
// These will be removed from all other indexes, and then a special prefix operation will delete the objects from the given index in an efficient subtree delete operation.
|
||||
// This is useful when you have a very large number of objects indexed by the given index, along with a much smaller number of entries in the other indexes for those objects.
|
||||
func (txn *Txn) DeletePrefix(table string, prefix_index string, prefix string) (bool, error) {
|
||||
if !txn.write {
|
||||
return false, fmt.Errorf("cannot delete in read-only transaction")
|
||||
}
|
||||
|
||||
if !strings.HasSuffix(prefix_index, "_prefix") {
|
||||
return false, fmt.Errorf("Index name for DeletePrefix must be a prefix index, Got %v ", prefix_index)
|
||||
}
|
||||
|
||||
deletePrefixIndex := strings.TrimSuffix(prefix_index, "_prefix")
|
||||
|
||||
// Get an iterator over all of the keys with the given prefix.
|
||||
entries, err := txn.Get(table, prefix_index, prefix)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
// Get the table schema
|
||||
tableSchema, ok := txn.db.schema.Tables[table]
|
||||
if !ok {
|
||||
return false, fmt.Errorf("invalid table '%s'", table)
|
||||
}
|
||||
|
||||
foundAny := false
|
||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||
if !foundAny {
|
||||
foundAny = true
|
||||
}
|
||||
// Get the primary ID of the object
|
||||
idSchema := tableSchema.Indexes[id]
|
||||
idIndexer := idSchema.Indexer.(SingleIndexer)
|
||||
ok, idVal, err := idIndexer.FromObject(entry)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to build primary index: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
return false, fmt.Errorf("object missing primary index")
|
||||
}
|
||||
// Remove the object from all the indexes except the given prefix index
|
||||
for name, indexSchema := range tableSchema.Indexes {
|
||||
if name == deletePrefixIndex {
|
||||
continue
|
||||
}
|
||||
indexTxn := txn.writableIndex(table, name)
|
||||
|
||||
// Handle the update by deleting from the index first
|
||||
var (
|
||||
ok bool
|
||||
vals [][]byte
|
||||
err error
|
||||
)
|
||||
switch indexer := indexSchema.Indexer.(type) {
|
||||
case SingleIndexer:
|
||||
var val []byte
|
||||
ok, val, err = indexer.FromObject(entry)
|
||||
vals = [][]byte{val}
|
||||
case MultiIndexer:
|
||||
ok, vals, err = indexer.FromObject(entry)
|
||||
}
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to build index '%s': %v", name, err)
|
||||
}
|
||||
|
||||
if ok {
|
||||
// Handle non-unique index by computing a unique index.
|
||||
// This is done by appending the primary key which must
|
||||
// be unique anyways.
|
||||
for _, val := range vals {
|
||||
if !indexSchema.Unique {
|
||||
val = append(val, idVal...)
|
||||
}
|
||||
indexTxn.Delete(val)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if foundAny {
|
||||
indexTxn := txn.writableIndex(table, deletePrefixIndex)
|
||||
ok = indexTxn.DeletePrefix([]byte(prefix))
|
||||
if !ok {
|
||||
panic(fmt.Errorf("prefix %v matched some entries but DeletePrefix did not delete any ", prefix))
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// DeleteAll is used to delete all the objects in a given table
|
||||
// matching the constraints on the index
|
||||
func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) {
|
||||
|
|
|
@ -63,8 +63,8 @@
|
|||
{"checksumSHA1":"cdOCt0Yb+hdErz8NAQqayxPmRsY=","path":"github.com/hashicorp/errwrap","revision":"7554cd9344cec97297fa6649b055a8c98c2a1e55","revisionTime":"2014-10-28T05:47:10Z"},
|
||||
{"checksumSHA1":"nd3S1qkFv7zZxA9be0bw4nT0pe0=","path":"github.com/hashicorp/go-checkpoint","revision":"e4b2dc34c0f698ee04750bf2035d8b9384233e1b","revisionTime":"2015-10-22T18:15:14Z"},
|
||||
{"checksumSHA1":"b8F628srIitj5p7Y130xc9k0QWs=","path":"github.com/hashicorp/go-cleanhttp","revision":"3573b8b52aa7b37b9358d966a898feb387f62437","revisionTime":"2017-02-11T01:34:15Z"},
|
||||
{"checksumSHA1":"zvmksNyW6g+Fd/bywd4vcn8rp+M=","path":"github.com/hashicorp/go-immutable-radix","revision":"d0852f9e7b91ec9633735052bdab00bf802b353c","revisionTime":"2017-02-14T00:45:45Z"},
|
||||
{"checksumSHA1":"K8Fsgt1llTXP0EwqdBzvSGdKOKc=","path":"github.com/hashicorp/go-memdb","revision":"c01f56b44823e8ba697e23c18d12dca984b85aca","revisionTime":"2017-01-23T15:32:28Z"},
|
||||
{"checksumSHA1":"Cas2nprG6pWzf05A2F/OlnjUu2Y=","path":"github.com/hashicorp/go-immutable-radix","revision":"8aac2701530899b64bdea735a1de8da899815220","revisionTime":"2017-07-25T22:12:15Z"},
|
||||
{"checksumSHA1":"T65qvYBTy4rYks7oN+U0muEqtRw=","path":"github.com/hashicorp/go-memdb","revision":"2b2d6c35e14e7557ea1003e707d5e179fa315028","revisionTime":"2017-07-25T22:15:03Z"},
|
||||
{"checksumSHA1":"TNlVzNR1OaajcNi3CbQ3bGbaLGU=","path":"github.com/hashicorp/go-msgpack/codec","revision":"fa3f63826f7c23912c15263591e65d54d080b458","revisionTime":"2015-05-18T23:42:57Z"},
|
||||
{"checksumSHA1":"lrSl49G23l6NhfilxPM0XFs5rZo=","path":"github.com/hashicorp/go-multierror","revision":"d30f09973e19c1dfcd120b2d9c4f168e68d6b5d5","revisionTime":"2015-09-16T20:57:42Z"},
|
||||
{"checksumSHA1":"ErJHGU6AVPZM9yoY/xV11TwSjQs=","path":"github.com/hashicorp/go-retryablehttp","revision":"6e85be8fee1dcaa02c0eaaac2df5a8fbecf94145","revisionTime":"2016-09-30T03:51:02Z"},
|
||||
|
|
Loading…
Reference in New Issue