From 10f3bdf4ffea8db0a1a4ff6dbb425243e79035cd Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 23 Jan 2017 23:41:18 -0800 Subject: [PATCH] Updates go-immutable-radix and go-memdb to get fine-grained watches. --- .../hashicorp/go-immutable-radix/iradix.go | 291 ++++++++++++++---- .../hashicorp/go-immutable-radix/iter.go | 14 +- .../hashicorp/go-immutable-radix/node.go | 33 +- .../hashicorp/go-immutable-radix/raw_iter.go | 78 +++++ vendor/github.com/hashicorp/go-memdb/memdb.go | 3 + .../github.com/hashicorp/go-memdb/schema.go | 2 +- vendor/github.com/hashicorp/go-memdb/txn.go | 45 ++- vendor/github.com/hashicorp/go-memdb/watch.go | 108 +++++++ .../hashicorp/go-memdb/watch_few.go | 116 +++++++ vendor/vendor.json | 12 +- 10 files changed, 622 insertions(+), 80 deletions(-) create mode 100644 vendor/github.com/hashicorp/go-immutable-radix/raw_iter.go create mode 100644 vendor/github.com/hashicorp/go-memdb/watch.go create mode 100644 vendor/github.com/hashicorp/go-memdb/watch_few.go diff --git a/vendor/github.com/hashicorp/go-immutable-radix/iradix.go b/vendor/github.com/hashicorp/go-immutable-radix/iradix.go index 8d26fc95f4..1f63f769eb 100644 --- a/vendor/github.com/hashicorp/go-immutable-radix/iradix.go +++ b/vendor/github.com/hashicorp/go-immutable-radix/iradix.go @@ -2,6 +2,7 @@ package iradix import ( "bytes" + "strings" "github.com/hashicorp/golang-lru/simplelru" ) @@ -11,7 +12,9 @@ const ( // cache used per transaction. This is used to cache the updates // to the nodes near the root, while the leaves do not need to be // cached. This is important for very large transactions to prevent - // the modified cache from growing to be enormous. + // the modified cache from growing to be enormous. This is also used + // to set the max size of the mutation notify maps since those should + // also be bounded in a similar way. defaultModifiedCache = 8192 ) @@ -27,7 +30,11 @@ type Tree struct { // New returns an empty Tree func New() *Tree { - t := &Tree{root: &Node{}} + t := &Tree{ + root: &Node{ + mutateCh: make(chan struct{}), + }, + } return t } @@ -40,75 +47,148 @@ func (t *Tree) Len() int { // atomically and returns a new tree when committed. A transaction // is not thread safe, and should only be used by a single goroutine. type Txn struct { - root *Node - size int - modified *simplelru.LRU + // root is the modified root for the transaction. + root *Node + + // snap is a snapshot of the root node for use if we have to run the + // slow notify algorithm. + snap *Node + + // size tracks the size of the tree as it is modified during the + // transaction. + size int + + // writable is a cache of writable nodes that have been created during + // the course of the transaction. This allows us to re-use the same + // nodes for further writes and avoid unnecessary copies of nodes that + // have never been exposed outside the transaction. This will only hold + // up to defaultModifiedCache number of entries. + writable *simplelru.LRU + + // trackChannels is used to hold channels that need to be notified to + // signal mutation of the tree. This will only hold up to + // defaultModifiedCache number of entries, after which we will set the + // trackOverflow flag, which will cause us to use a more expensive + // algorithm to perform the notifications. Mutation tracking is only + // performed if trackMutate is true. + trackChannels map[*chan struct{}]struct{} + trackOverflow bool + trackMutate bool } // Txn starts a new transaction that can be used to mutate the tree func (t *Tree) Txn() *Txn { txn := &Txn{ root: t.root, + snap: t.root, size: t.size, } return txn } -// writeNode returns a node to be modified, if the current -// node as already been modified during the course of -// the transaction, it is used in-place. -func (t *Txn) writeNode(n *Node) *Node { - // Ensure the modified set exists - if t.modified == nil { +// TrackMutate can be used to toggle if mutations are tracked. If this is enabled +// then notifications will be issued for affected internal nodes and leaves when +// the transaction is committed. +func (t *Txn) TrackMutate(track bool) { + t.trackMutate = track +} + +// trackChannel safely attempts to track the given mutation channel, setting the +// overflow flag if we can no longer track any more. This limits the amount of +// state that will accumulate during a transaction and we have a slower algorithm +// to switch to if we overflow. +func (t *Txn) trackChannel(ch *chan struct{}) { + // In overflow, make sure we don't store any more objects. + if t.trackOverflow { + return + } + + // Create the map on the fly when we need it. + if t.trackChannels == nil { + t.trackChannels = make(map[*chan struct{}]struct{}) + } + + // If this would overflow the state we reject it and set the flag (since + // we aren't tracking everything that's required any longer). + if len(t.trackChannels) >= defaultModifiedCache { + t.trackOverflow = true + return + } + + // Otherwise we are good to track it. + t.trackChannels[ch] = struct{}{} +} + +// writeNode returns a node to be modified, if the current node has already been +// modified during the course of the transaction, it is used in-place. Set +// forLeafUpdate to true if you are getting a write node to update the leaf, +// which will set leaf mutation tracking appropriately as well. +func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node { + // Ensure the writable set exists. + if t.writable == nil { lru, err := simplelru.NewLRU(defaultModifiedCache, nil) if err != nil { panic(err) } - t.modified = lru + t.writable = lru } - // If this node has already been modified, we can - // continue to use it during this transaction. - if _, ok := t.modified.Get(n); ok { + // If this node has already been modified, we can continue to use it + // during this transaction. If a node gets kicked out of cache then we + // *may* notify for its mutation if we end up copying the node again, + // but we don't make any guarantees about notifying for intermediate + // mutations that were never exposed outside of a transaction. + if _, ok := t.writable.Get(n); ok { return n } - // Copy the existing node - nc := new(Node) + // Mark this node as being mutated. + if t.trackMutate { + t.trackChannel(&(n.mutateCh)) + } + + // Mark its leaf as being mutated, if appropriate. + if t.trackMutate && forLeafUpdate && n.leaf != nil { + t.trackChannel(&(n.leaf.mutateCh)) + } + + // Copy the existing node. + nc := &Node{ + mutateCh: make(chan struct{}), + leaf: n.leaf, + } if n.prefix != nil { nc.prefix = make([]byte, len(n.prefix)) copy(nc.prefix, n.prefix) } - if n.leaf != nil { - nc.leaf = new(leafNode) - *nc.leaf = *n.leaf - } if len(n.edges) != 0 { nc.edges = make([]edge, len(n.edges)) copy(nc.edges, n.edges) } - // Mark this node as modified - t.modified.Add(nc, nil) + // Mark this node as writable. + t.writable.Add(nc, nil) return nc } // insert does a recursive insertion func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface{}, bool) { - // Handle key exhaution + // Handle key exhaustion if len(search) == 0 { - nc := t.writeNode(n) + var oldVal interface{} + didUpdate := false if n.isLeaf() { - old := nc.leaf.val - nc.leaf.val = v - return nc, old, true - } else { - nc.leaf = &leafNode{ - key: k, - val: v, - } - return nc, nil, false + oldVal = n.leaf.val + didUpdate = true } + + nc := t.writeNode(n, true) + nc.leaf = &leafNode{ + mutateCh: make(chan struct{}), + key: k, + val: v, + } + return nc, oldVal, didUpdate } // Look for the edge @@ -119,14 +199,16 @@ func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface e := edge{ label: search[0], node: &Node{ + mutateCh: make(chan struct{}), leaf: &leafNode{ - key: k, - val: v, + mutateCh: make(chan struct{}), + key: k, + val: v, }, prefix: search, }, } - nc := t.writeNode(n) + nc := t.writeNode(n, false) nc.addEdge(e) return nc, nil, false } @@ -137,7 +219,7 @@ func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface search = search[commonPrefix:] newChild, oldVal, didUpdate := t.insert(child, k, search, v) if newChild != nil { - nc := t.writeNode(n) + nc := t.writeNode(n, false) nc.edges[idx].node = newChild return nc, oldVal, didUpdate } @@ -145,9 +227,10 @@ func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface } // Split the node - nc := t.writeNode(n) + nc := t.writeNode(n, false) splitNode := &Node{ - prefix: search[:commonPrefix], + mutateCh: make(chan struct{}), + prefix: search[:commonPrefix], } nc.replaceEdge(edge{ label: search[0], @@ -155,7 +238,7 @@ func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface }) // Restore the existing child node - modChild := t.writeNode(child) + modChild := t.writeNode(child, false) splitNode.addEdge(edge{ label: modChild.prefix[commonPrefix], node: modChild, @@ -164,8 +247,9 @@ func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface // Create a new leaf node leaf := &leafNode{ - key: k, - val: v, + mutateCh: make(chan struct{}), + key: k, + val: v, } // If the new key is a subset, add to to this node @@ -179,8 +263,9 @@ func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface splitNode.addEdge(edge{ label: search[0], node: &Node{ - leaf: leaf, - prefix: search, + mutateCh: make(chan struct{}), + leaf: leaf, + prefix: search, }, }) return nc, nil, false @@ -188,14 +273,14 @@ func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface // delete does a recursive deletion func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) { - // Check for key exhaution + // Check for key exhaustion if len(search) == 0 { if !n.isLeaf() { return nil, nil } // Remove the leaf node - nc := t.writeNode(n) + nc := t.writeNode(n, true) nc.leaf = nil // Check if this node should be merged @@ -219,8 +304,11 @@ func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) { return nil, nil } - // Copy this node - nc := t.writeNode(n) + // Copy this node. WATCH OUT - it's safe to pass "false" here because we + // will only ADD a leaf via nc.mergeChilde() if there isn't one due to + // the !nc.isLeaf() check in the logic just below. This is pretty subtle, + // so be careful if you change any of the logic here. + nc := t.writeNode(n, false) // Delete the edge if the node has no edges if newChild.leaf == nil && len(newChild.edges) == 0 { @@ -274,10 +362,109 @@ func (t *Txn) Get(k []byte) (interface{}, bool) { return t.root.Get(k) } -// Commit is used to finalize the transaction and return a new tree +// GetWatch is used to lookup a specific key, returning +// the watch channel, value and if it was found +func (t *Txn) GetWatch(k []byte) (<-chan struct{}, interface{}, bool) { + return t.root.GetWatch(k) +} + +// Commit is used to finalize the transaction and return a new tree. If mutation +// tracking is turned on then notifications will also be issued. func (t *Txn) Commit() *Tree { - t.modified = nil - return &Tree{t.root, t.size} + nt := t.commit() + if t.trackMutate { + t.notify() + } + return nt +} + +// commit is an internal helper for Commit(), useful for unit tests. +func (t *Txn) commit() *Tree { + nt := &Tree{t.root, t.size} + t.writable = nil + return nt +} + +// slowNotify does a complete comparison of the before and after trees in order +// to trigger notifications. This doesn't require any additional state but it +// is very expensive to compute. +func (t *Txn) slowNotify() { + snapIter := t.snap.rawIterator() + rootIter := t.root.rawIterator() + for snapIter.Front() != nil || rootIter.Front() != nil { + // If we've exhausted the nodes in the old snapshot, we know + // there's nothing remaining to notify. + if snapIter.Front() == nil { + return + } + snapElem := snapIter.Front() + + // If we've exhausted the nodes in the new root, we know we need + // to invalidate everything that remains in the old snapshot. We + // know from the loop condition there's something in the old + // snapshot. + if rootIter.Front() == nil { + close(snapElem.mutateCh) + if snapElem.isLeaf() { + close(snapElem.leaf.mutateCh) + } + snapIter.Next() + continue + } + + // Do one string compare so we can check the various conditions + // below without repeating the compare. + cmp := strings.Compare(snapIter.Path(), rootIter.Path()) + + // If the snapshot is behind the root, then we must have deleted + // this node during the transaction. + if cmp < 0 { + close(snapElem.mutateCh) + if snapElem.isLeaf() { + close(snapElem.leaf.mutateCh) + } + snapIter.Next() + continue + } + + // If the snapshot is ahead of the root, then we must have added + // this node during the transaction. + if cmp > 0 { + rootIter.Next() + continue + } + + // If we have the same path, then we need to see if we mutated a + // node and possibly the leaf. + rootElem := rootIter.Front() + if snapElem != rootElem { + close(snapElem.mutateCh) + if snapElem.leaf != nil && (snapElem.leaf != rootElem.leaf) { + close(snapElem.leaf.mutateCh) + } + } + snapIter.Next() + rootIter.Next() + } +} + +// notify is used along with TrackMutate to trigger notifications. This should +// only be done once a transaction is committed. +func (t *Txn) notify() { + // If we've overflowed the tracking state we can't use it in any way and + // need to do a full tree compare. + if t.trackOverflow { + t.slowNotify() + } else { + for ch := range t.trackChannels { + close(*ch) + } + } + + // Clean up the tracking state so that a re-notify is safe (will trigger + // the else clause above which will be a no-op). + t.trackChannels = nil + t.trackOverflow = false } // Insert is used to add or update a given key. The return provides diff --git a/vendor/github.com/hashicorp/go-immutable-radix/iter.go b/vendor/github.com/hashicorp/go-immutable-radix/iter.go index 75cbaa110f..9815e02538 100644 --- a/vendor/github.com/hashicorp/go-immutable-radix/iter.go +++ b/vendor/github.com/hashicorp/go-immutable-radix/iter.go @@ -9,11 +9,13 @@ type Iterator struct { stack []edges } -// SeekPrefix is used to seek the iterator to a given prefix -func (i *Iterator) SeekPrefix(prefix []byte) { +// SeekPrefixWatch is used to seek the iterator to a given prefix +// and returns the watch channel of the finest granularity +func (i *Iterator) SeekPrefixWatch(prefix []byte) (watch <-chan struct{}) { // Wipe the stack i.stack = nil n := i.node + watch = n.mutateCh search := prefix for { // Check for key exhaution @@ -29,6 +31,9 @@ func (i *Iterator) SeekPrefix(prefix []byte) { return } + // Update to the finest granularity as the search makes progress + watch = n.mutateCh + // Consume the search prefix if bytes.HasPrefix(search, n.prefix) { search = search[len(n.prefix):] @@ -43,6 +48,11 @@ func (i *Iterator) SeekPrefix(prefix []byte) { } } +// SeekPrefix is used to seek the iterator to a given prefix +func (i *Iterator) SeekPrefix(prefix []byte) { + i.SeekPrefixWatch(prefix) +} + // Next returns the next node in order func (i *Iterator) Next() ([]byte, interface{}, bool) { // Initialize our stack if needed diff --git a/vendor/github.com/hashicorp/go-immutable-radix/node.go b/vendor/github.com/hashicorp/go-immutable-radix/node.go index fea6f63436..cf7137f93c 100644 --- a/vendor/github.com/hashicorp/go-immutable-radix/node.go +++ b/vendor/github.com/hashicorp/go-immutable-radix/node.go @@ -12,8 +12,9 @@ type WalkFn func(k []byte, v interface{}) bool // leafNode is used to represent a value type leafNode struct { - key []byte - val interface{} + mutateCh chan struct{} + key []byte + val interface{} } // edge is used to represent an edge node @@ -24,6 +25,9 @@ type edge struct { // Node is an immutable node in the radix tree type Node struct { + // mutateCh is closed if this node is modified + mutateCh chan struct{} + // leaf is used to store possible leaf leaf *leafNode @@ -105,13 +109,14 @@ func (n *Node) mergeChild() { } } -func (n *Node) Get(k []byte) (interface{}, bool) { +func (n *Node) GetWatch(k []byte) (<-chan struct{}, interface{}, bool) { search := k + watch := n.mutateCh for { - // Check for key exhaution + // Check for key exhaustion if len(search) == 0 { if n.isLeaf() { - return n.leaf.val, true + return n.leaf.mutateCh, n.leaf.val, true } break } @@ -122,6 +127,9 @@ func (n *Node) Get(k []byte) (interface{}, bool) { break } + // Update to the finest granularity as the search makes progress + watch = n.mutateCh + // Consume the search prefix if bytes.HasPrefix(search, n.prefix) { search = search[len(n.prefix):] @@ -129,7 +137,12 @@ func (n *Node) Get(k []byte) (interface{}, bool) { break } } - return nil, false + return watch, nil, false +} + +func (n *Node) Get(k []byte) (interface{}, bool) { + _, val, ok := n.GetWatch(k) + return val, ok } // LongestPrefix is like Get, but instead of an @@ -204,6 +217,14 @@ func (n *Node) Iterator() *Iterator { return &Iterator{node: n} } +// rawIterator is used to return a raw iterator at the given node to walk the +// tree. +func (n *Node) rawIterator() *rawIterator { + iter := &rawIterator{node: n} + iter.Next() + return iter +} + // Walk is used to walk the tree func (n *Node) Walk(fn WalkFn) { recursiveWalk(n, fn) diff --git a/vendor/github.com/hashicorp/go-immutable-radix/raw_iter.go b/vendor/github.com/hashicorp/go-immutable-radix/raw_iter.go new file mode 100644 index 0000000000..04814c1323 --- /dev/null +++ b/vendor/github.com/hashicorp/go-immutable-radix/raw_iter.go @@ -0,0 +1,78 @@ +package iradix + +// rawIterator visits each of the nodes in the tree, even the ones that are not +// leaves. It keeps track of the effective path (what a leaf at a given node +// would be called), which is useful for comparing trees. +type rawIterator struct { + // node is the starting node in the tree for the iterator. + node *Node + + // stack keeps track of edges in the frontier. + stack []rawStackEntry + + // pos is the current position of the iterator. + pos *Node + + // path is the effective path of the current iterator position, + // regardless of whether the current node is a leaf. + path string +} + +// rawStackEntry is used to keep track of the cumulative common path as well as +// its associated edges in the frontier. +type rawStackEntry struct { + path string + edges edges +} + +// Front returns the current node that has been iterated to. +func (i *rawIterator) Front() *Node { + return i.pos +} + +// Path returns the effective path of the current node, even if it's not actually +// a leaf. +func (i *rawIterator) Path() string { + return i.path +} + +// Next advances the iterator to the next node. +func (i *rawIterator) Next() { + // Initialize our stack if needed. + if i.stack == nil && i.node != nil { + i.stack = []rawStackEntry{ + rawStackEntry{ + edges: edges{ + edge{node: i.node}, + }, + }, + } + } + + for len(i.stack) > 0 { + // Inspect the last element of the stack. + n := len(i.stack) + last := i.stack[n-1] + elem := last.edges[0].node + + // Update the stack. + if len(last.edges) > 1 { + i.stack[n-1].edges = last.edges[1:] + } else { + i.stack = i.stack[:n-1] + } + + // Push the edges onto the frontier. + if len(elem.edges) > 0 { + path := last.path + string(elem.prefix) + i.stack = append(i.stack, rawStackEntry{path, elem.edges}) + } + + i.pos = elem + i.path = last.path + string(elem.prefix) + return + } + + i.pos = nil + i.path = "" +} diff --git a/vendor/github.com/hashicorp/go-memdb/memdb.go b/vendor/github.com/hashicorp/go-memdb/memdb.go index 1d708517db..13817547be 100644 --- a/vendor/github.com/hashicorp/go-memdb/memdb.go +++ b/vendor/github.com/hashicorp/go-memdb/memdb.go @@ -15,6 +15,7 @@ import ( type MemDB struct { schema *DBSchema root unsafe.Pointer // *iradix.Tree underneath + primary bool // There can only be a single writter at once writer sync.Mutex @@ -31,6 +32,7 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) { db := &MemDB{ schema: schema, root: unsafe.Pointer(iradix.New()), + primary: true, } if err := db.initialize(); err != nil { return nil, err @@ -65,6 +67,7 @@ func (db *MemDB) Snapshot() *MemDB { clone := &MemDB{ schema: db.schema, root: unsafe.Pointer(db.getRoot()), + primary: false, } return clone } diff --git a/vendor/github.com/hashicorp/go-memdb/schema.go b/vendor/github.com/hashicorp/go-memdb/schema.go index 26d0fcb99f..d7210f91cd 100644 --- a/vendor/github.com/hashicorp/go-memdb/schema.go +++ b/vendor/github.com/hashicorp/go-memdb/schema.go @@ -38,7 +38,7 @@ func (s *TableSchema) Validate() error { return fmt.Errorf("missing table name") } if len(s.Indexes) == 0 { - return fmt.Errorf("missing table schemas for '%s'", s.Name) + return fmt.Errorf("missing table indexes for '%s'", s.Name) } if _, ok := s.Indexes["id"]; !ok { return fmt.Errorf("must have id index") diff --git a/vendor/github.com/hashicorp/go-memdb/txn.go b/vendor/github.com/hashicorp/go-memdb/txn.go index fa73c9a3f1..a069a9fd99 100644 --- a/vendor/github.com/hashicorp/go-memdb/txn.go +++ b/vendor/github.com/hashicorp/go-memdb/txn.go @@ -70,6 +70,11 @@ func (txn *Txn) writableIndex(table, index string) *iradix.Txn { raw, _ := txn.rootTxn.Get(path) indexTxn := raw.(*iradix.Tree).Txn() + // If we are the primary DB, enable mutation tracking. Snapshots should + // not notify, otherwise we will trigger watches on the primary DB when + // the writes will not be visible. + indexTxn.TrackMutate(txn.db.primary) + // Keep this open for the duration of the txn txn.modified[key] = indexTxn return indexTxn @@ -352,13 +357,13 @@ func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) return num, nil } -// First is used to return the first matching object for -// the given constraints on the index -func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) { +// FirstWatch is used to return the first matching object for +// the given constraints on the index along with the watch channel +func (txn *Txn) FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) { // Get the index value indexSchema, val, err := txn.getIndexValue(table, index, args...) if err != nil { - return nil, err + return nil, nil, err } // Get the index itself @@ -366,18 +371,25 @@ func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, er // Do an exact lookup if indexSchema.Unique && val != nil && indexSchema.Name == index { - obj, ok := indexTxn.Get(val) + watch, obj, ok := indexTxn.GetWatch(val) if !ok { - return nil, nil + return watch, nil, nil } - return obj, nil + return watch, obj, nil } // Handle non-unique index by using an iterator and getting the first value iter := indexTxn.Root().Iterator() - iter.SeekPrefix(val) + watch := iter.SeekPrefixWatch(val) _, value, _ := iter.Next() - return value, nil + return watch, value, nil +} + +// First is used to return the first matching object for +// the given constraints on the index +func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) { + _, val, err := txn.FirstWatch(table, index, args...) + return val, err } // LongestPrefix is used to fetch the longest prefix match for the given @@ -468,6 +480,7 @@ func (txn *Txn) getIndexValue(table, index string, args ...interface{}) (*IndexS // ResultIterator is used to iterate over a list of results // from a Get query on a table. type ResultIterator interface { + WatchCh() <-chan struct{} Next() interface{} } @@ -488,11 +501,12 @@ func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, e indexIter := indexRoot.Iterator() // Seek the iterator to the appropriate sub-set - indexIter.SeekPrefix(val) + watchCh := indexIter.SeekPrefixWatch(val) // Create an iterator iter := &radixIterator{ - iter: indexIter, + iter: indexIter, + watchCh: watchCh, } return iter, nil } @@ -506,10 +520,15 @@ func (txn *Txn) Defer(fn func()) { } // radixIterator is used to wrap an underlying iradix iterator. -// This is much mroe efficient than a sliceIterator as we are not +// This is much more efficient than a sliceIterator as we are not // materializing the entire view. type radixIterator struct { - iter *iradix.Iterator + iter *iradix.Iterator + watchCh <-chan struct{} +} + +func (r *radixIterator) WatchCh() <-chan struct{} { + return r.watchCh } func (r *radixIterator) Next() interface{} { diff --git a/vendor/github.com/hashicorp/go-memdb/watch.go b/vendor/github.com/hashicorp/go-memdb/watch.go new file mode 100644 index 0000000000..7c4a3ba6ee --- /dev/null +++ b/vendor/github.com/hashicorp/go-memdb/watch.go @@ -0,0 +1,108 @@ +package memdb + +import "time" + +// WatchSet is a collection of watch channels. +type WatchSet map[<-chan struct{}]struct{} + +// NewWatchSet constructs a new watch set. +func NewWatchSet() WatchSet { + return make(map[<-chan struct{}]struct{}) +} + +// Add appends a watchCh to the WatchSet if non-nil. +func (w WatchSet) Add(watchCh <-chan struct{}) { + if w == nil { + return + } + + if _, ok := w[watchCh]; !ok { + w[watchCh] = struct{}{} + } +} + +// AddWithLimit appends a watchCh to the WatchSet if non-nil, and if the given +// softLimit hasn't been exceeded. Otherwise, it will watch the given alternate +// channel. It's expected that the altCh will be the same on many calls to this +// function, so you will exceed the soft limit a little bit if you hit this, but +// not by much. +// +// This is useful if you want to track individual items up to some limit, after +// which you watch a higher-level channel (usually a channel from start start of +// an iterator higher up in the radix tree) that will watch a superset of items. +func (w WatchSet) AddWithLimit(softLimit int, watchCh <-chan struct{}, altCh <-chan struct{}) { + // This is safe for a nil WatchSet so we don't need to check that here. + if len(w) < softLimit { + w.Add(watchCh) + } else { + w.Add(altCh) + } +} + +// Watch is used to wait for either the watch set to trigger or a timeout. +// Returns true on timeout. +func (w WatchSet) Watch(timeoutCh <-chan time.Time) bool { + if w == nil { + return false + } + + if n := len(w); n <= aFew { + idx := 0 + chunk := make([]<-chan struct{}, aFew) + for watchCh := range w { + chunk[idx] = watchCh + idx++ + } + return watchFew(chunk, timeoutCh) + } else { + return w.watchMany(timeoutCh) + } +} + +// watchMany is used if there are many watchers. +func (w WatchSet) watchMany(timeoutCh <-chan time.Time) bool { + // Make a fake timeout channel we can feed into watchFew to cancel all + // the blocking goroutines. + doneCh := make(chan time.Time) + defer close(doneCh) + + // Set up a goroutine for each watcher. + triggerCh := make(chan struct{}, 1) + watcher := func(chunk []<-chan struct{}) { + if timeout := watchFew(chunk, doneCh); !timeout { + select { + case triggerCh <- struct{}{}: + default: + } + } + } + + // Apportion the watch channels into chunks we can feed into the + // watchFew helper. + idx := 0 + chunk := make([]<-chan struct{}, aFew) + for watchCh := range w { + subIdx := idx % aFew + chunk[subIdx] = watchCh + idx++ + + // Fire off this chunk and start a fresh one. + if idx%aFew == 0 { + go watcher(chunk) + chunk = make([]<-chan struct{}, aFew) + } + } + + // Make sure to watch any residual channels in the last chunk. + if idx%aFew != 0 { + go watcher(chunk) + } + + // Wait for a channel to trigger or timeout. + select { + case <-triggerCh: + return false + case <-timeoutCh: + return true + } +} diff --git a/vendor/github.com/hashicorp/go-memdb/watch_few.go b/vendor/github.com/hashicorp/go-memdb/watch_few.go new file mode 100644 index 0000000000..f2bb19db17 --- /dev/null +++ b/vendor/github.com/hashicorp/go-memdb/watch_few.go @@ -0,0 +1,116 @@ +//go:generate sh -c "go run watch-gen/main.go >watch_few.go" +package memdb + +import( + "time" +) + +// aFew gives how many watchers this function is wired to support. You must +// always pass a full slice of this length, but unused channels can be nil. +const aFew = 32 + +// watchFew is used if there are only a few watchers as a performance +// optimization. +func watchFew(ch []<-chan struct{}, timeoutCh <-chan time.Time) bool { + select { + + case <-ch[0]: + return false + + case <-ch[1]: + return false + + case <-ch[2]: + return false + + case <-ch[3]: + return false + + case <-ch[4]: + return false + + case <-ch[5]: + return false + + case <-ch[6]: + return false + + case <-ch[7]: + return false + + case <-ch[8]: + return false + + case <-ch[9]: + return false + + case <-ch[10]: + return false + + case <-ch[11]: + return false + + case <-ch[12]: + return false + + case <-ch[13]: + return false + + case <-ch[14]: + return false + + case <-ch[15]: + return false + + case <-ch[16]: + return false + + case <-ch[17]: + return false + + case <-ch[18]: + return false + + case <-ch[19]: + return false + + case <-ch[20]: + return false + + case <-ch[21]: + return false + + case <-ch[22]: + return false + + case <-ch[23]: + return false + + case <-ch[24]: + return false + + case <-ch[25]: + return false + + case <-ch[26]: + return false + + case <-ch[27]: + return false + + case <-ch[28]: + return false + + case <-ch[29]: + return false + + case <-ch[30]: + return false + + case <-ch[31]: + return false + + case <-timeoutCh: + return true + } +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 47533cfa7a..7f732334c0 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -426,16 +426,16 @@ "revisionTime": "2016-04-07T17:41:26Z" }, { - "checksumSHA1": "qmE9mO0WW6ALLpUU81rXDyspP5M=", + "checksumSHA1": "jPxyofQxI1PRPq6LPc6VlcRn5fI=", "path": "github.com/hashicorp/go-immutable-radix", - "revision": "afc5a0dbb18abdf82c277a7bc01533e81fa1d6b8", - "revisionTime": "2016-06-09T02:05:29Z" + "revision": "76b5f4e390910df355bfb9b16b41899538594a05", + "revisionTime": "2017-01-13T02:29:29Z" }, { - "checksumSHA1": "ZpTDFeRvXFwIvSHRD8eDYHxaj4Y=", + "checksumSHA1": "K8Fsgt1llTXP0EwqdBzvSGdKOKc=", "path": "github.com/hashicorp/go-memdb", - "revision": "d2d2b77acab85aa635614ac17ea865969f56009e", - "revisionTime": "2017-01-07T16:22:14Z" + "revision": "c01f56b44823e8ba697e23c18d12dca984b85aca", + "revisionTime": "2017-01-23T15:32:28Z" }, { "checksumSHA1": "TNlVzNR1OaajcNi3CbQ3bGbaLGU=",