mirror of https://github.com/status-im/consul.git
Adds in basic query template lookups and vendors newly-updated memdb as well as improved iradix tree.
This commit is contained in:
parent
07514214e1
commit
328d138466
|
@ -121,11 +121,11 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/hashicorp/go-immutable-radix",
|
||||
"Rev": "12e90058b2897552deea141eff51bb7a07a09e63"
|
||||
"Rev": "8e8ed81f8f0bf1bdd829593fdd5c29922c1ea990"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/hashicorp/go-memdb",
|
||||
"Rev": "31949d523ade8a236956c6f1761e9dcf902d1638"
|
||||
"Rev": "98f52f52d7a476958fa9da671354d270c50661a7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/hashicorp/go-msgpack/codec",
|
||||
|
|
|
@ -33,18 +33,6 @@ func toPreparedQuery(wrapped interface{}) *structs.PreparedQuery {
|
|||
return wrapped.(*queryWrapper).PreparedQuery
|
||||
}
|
||||
|
||||
// isQueryWild returns the wild-ness of a query. See isWild for details.
|
||||
func isQueryWild(query *structs.PreparedQuery) bool {
|
||||
return query != nil && prepared_query.IsTemplate(query) && query.Name == ""
|
||||
}
|
||||
|
||||
// isWrappedWild is used to determine if the given wrapped query is a wild one,
|
||||
// which means it has an empty Name and it's a template. See the comments for
|
||||
// "wild" in schema.go for more details and to see where this is used.
|
||||
func isWrappedWild(obj interface{}) (bool, error) {
|
||||
return isQueryWild(toPreparedQuery(obj)), nil
|
||||
}
|
||||
|
||||
// PreparedQueries is used to pull all the prepared queries from the snapshot.
|
||||
func (s *StateSnapshot) PreparedQueries() (structs.PreparedQueries, error) {
|
||||
queries, err := s.tx.Get("prepared-queries", "id")
|
||||
|
@ -123,7 +111,9 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc
|
|||
}
|
||||
|
||||
// Verify that the query name doesn't already exist, or that we are
|
||||
// updating the same instance that has this name.
|
||||
// updating the same instance that has this name. If this is a template
|
||||
// and the name is empty then we make sure there's not an empty template
|
||||
// already registered.
|
||||
if query.Name != "" {
|
||||
wrapped, err := tx.First("prepared-queries", "name", query.Name)
|
||||
if err != nil {
|
||||
|
@ -133,18 +123,14 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc
|
|||
if other != nil && (existing == nil || existing.ID != other.ID) {
|
||||
return fmt.Errorf("name '%s' aliases an existing query name", query.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Similarly, if this is the wild query make sure there isn't another
|
||||
// one, or that we are updating the same one.
|
||||
if isQueryWild(query) {
|
||||
wrapped, err := tx.First("prepared-queries", "wild", true)
|
||||
} else if prepared_query.IsTemplate(query) {
|
||||
wrapped, err := tx.First("prepared-queries", "template", query.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
other := toPreparedQuery(wrapped)
|
||||
if other != nil && (existing == nil || existing.ID != other.ID) {
|
||||
return fmt.Errorf("a prepared query template already exists with an empty name")
|
||||
return fmt.Errorf("name '%s' aliases an existing query template name", query.Name)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -311,27 +297,22 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct
|
|||
}
|
||||
}
|
||||
|
||||
// Then try by name. We use a prefix match but check to make sure that
|
||||
// the query's name matches the whole prefix for a non-template query.
|
||||
// Templates are allowed to use the partial match. It's more efficient
|
||||
// to combine the two lookups here, even though the logic is a little
|
||||
// less clear.
|
||||
// Next, look for an exact name match. This is the common case for static
|
||||
// prepared queries, and could also apply to templates.
|
||||
{
|
||||
wrapped, err := tx.First("prepared-queries", "name_prefix", queryIDOrName)
|
||||
wrapped, err := tx.First("prepared-queries", "name", queryIDOrName)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
if wrapped != nil {
|
||||
query := toPreparedQuery(wrapped)
|
||||
if query.Name == queryIDOrName || prepared_query.IsTemplate(query) {
|
||||
return prep(wrapped)
|
||||
}
|
||||
return prep(wrapped)
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, see if there's a wild template we can use.
|
||||
// Next, look for the longest prefix match among the prepared query
|
||||
// templates.
|
||||
{
|
||||
wrapped, err := tx.First("prepared-queries", "wild", true)
|
||||
wrapped, err := tx.LongestPrefix("prepared-queries", "template_prefix", queryIDOrName)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/consul/prepared_query"
|
||||
)
|
||||
|
||||
// PreparedQueryIndex is a custom memdb indexer used to manage index prepared
|
||||
// query templates. None of the built-in indexers do what we need, and our
|
||||
// use case is pretty specific so it's better to put the logic here.
|
||||
type PreparedQueryIndex struct {
|
||||
}
|
||||
|
||||
// FromObject is used to compute the index key when inserting or updating an
|
||||
// object.
|
||||
func (*PreparedQueryIndex) FromObject(obj interface{}) (bool, []byte, error) {
|
||||
wrapped, ok := obj.(*queryWrapper)
|
||||
if !ok {
|
||||
return false, nil, fmt.Errorf("invalid object given to index as prepared query")
|
||||
}
|
||||
|
||||
query := toPreparedQuery(wrapped)
|
||||
if !prepared_query.IsTemplate(query) {
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
// Always prepend a null so that we can represent even an empty name.
|
||||
out := "\x00" + strings.ToLower(query.Name)
|
||||
return true, []byte(out), nil
|
||||
}
|
||||
|
||||
// FromArgs is used when querying for an exact match. Since we don't add any
|
||||
// suffix we can just call the prefix version.
|
||||
func (p *PreparedQueryIndex) FromArgs(args ...interface{}) ([]byte, error) {
|
||||
return p.PrefixFromArgs(args...)
|
||||
}
|
||||
|
||||
// PrefixFromArgs is used when doing a prefix scan for an object.
|
||||
func (*PreparedQueryIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
|
||||
if len(args) != 1 {
|
||||
return nil, fmt.Errorf("must provide only a single argument")
|
||||
}
|
||||
arg, ok := args[0].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
|
||||
}
|
||||
arg = "\x00" + strings.ToLower(arg)
|
||||
return []byte(arg), nil
|
||||
}
|
|
@ -525,9 +525,12 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
|
|||
},
|
||||
&structs.PreparedQuery{
|
||||
ID: testUUID(),
|
||||
Name: "bob",
|
||||
Name: "bob-",
|
||||
Template: structs.QueryTemplateOptions{
|
||||
Type: structs.QueryTemplateTypeNamePrefixMatch,
|
||||
},
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "mongodb",
|
||||
Service: "${name.suffix}",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -571,9 +574,12 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
|
|||
},
|
||||
&structs.PreparedQuery{
|
||||
ID: queries[1].ID,
|
||||
Name: "bob",
|
||||
Name: "bob-",
|
||||
Template: structs.QueryTemplateOptions{
|
||||
Type: structs.QueryTemplateTypeNamePrefixMatch,
|
||||
},
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "mongodb",
|
||||
Service: "${name.suffix}",
|
||||
},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 5,
|
||||
|
@ -612,6 +618,19 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
|
|||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Fatalf("bad: %v", actual)
|
||||
}
|
||||
|
||||
// Make sure the second query, which is a template, was compiled
|
||||
// and can be resolved.
|
||||
_, query, err := s.PreparedQueryResolve("bob-backwards-is-bob")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if query == nil {
|
||||
t.Fatalf("should have resolved the query")
|
||||
}
|
||||
if query.Service.Service != "backwards-is-bob" {
|
||||
t.Fatalf("bad: %s", query.Service.Service)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
|
|
|
@ -390,20 +390,11 @@ func preparedQueriesTableSchema() *memdb.TableSchema {
|
|||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
// This is a bit of an oddball. It's an important feature
|
||||
// of prepared query templates to be able to define a
|
||||
// single template that matches any query. Unfortunately,
|
||||
// we can't index an empty Name field. This index lets us
|
||||
// keep track of whether there is any wild template in
|
||||
// existence, so there will be one "true" in here if that
|
||||
// exists, and everything else will be "false".
|
||||
"wild": &memdb.IndexSchema{
|
||||
Name: "wild",
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.ConditionalIndex{
|
||||
Conditional: isWrappedWild,
|
||||
},
|
||||
"template": &memdb.IndexSchema{
|
||||
Name: "template",
|
||||
AllowMissing: true,
|
||||
Unique: true,
|
||||
Indexer: &PreparedQueryIndex{},
|
||||
},
|
||||
"session": &memdb.IndexSchema{
|
||||
Name: "session",
|
||||
|
|
|
@ -41,8 +41,15 @@ func (n *Node) isLeaf() bool {
|
|||
}
|
||||
|
||||
func (n *Node) addEdge(e edge) {
|
||||
num := len(n.edges)
|
||||
idx := sort.Search(num, func(i int) bool {
|
||||
return n.edges[i].label >= e.label
|
||||
})
|
||||
n.edges = append(n.edges, e)
|
||||
n.edges.Sort()
|
||||
if idx != num {
|
||||
copy(n.edges[idx+1:], n.edges[idx:num])
|
||||
n.edges[idx] = e
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Node) replaceEdge(e edge) {
|
||||
|
|
|
@ -291,10 +291,6 @@ func (c *CompoundIndex) FromArgs(args ...interface{}) ([]byte, error) {
|
|||
if len(args) != len(c.Indexes) {
|
||||
return nil, fmt.Errorf("less arguments than index fields")
|
||||
}
|
||||
return c.PrefixFromArgs(args...)
|
||||
}
|
||||
|
||||
func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
|
||||
var out []byte
|
||||
for i, arg := range args {
|
||||
val, err := c.Indexes[i].FromArgs(arg)
|
||||
|
@ -305,3 +301,30 @@ func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
|
|||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *CompoundIndex) PrefixFromArgs(args ...interface{}) ([]byte, error) {
|
||||
if len(args) > len(c.Indexes) {
|
||||
return nil, fmt.Errorf("more arguments than index fields")
|
||||
}
|
||||
var out []byte
|
||||
for i, arg := range args {
|
||||
if i+1 < len(args) {
|
||||
val, err := c.Indexes[i].FromArgs(arg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sub-index %d error: %v", i, err)
|
||||
}
|
||||
out = append(out, val...)
|
||||
} else {
|
||||
prefixIndexer, ok := c.Indexes[i].(PrefixIndexer)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("sub-index %d does not support prefix scanning", i)
|
||||
}
|
||||
val, err := prefixIndexer.PrefixFromArgs(arg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sub-index %d error: %v", i, err)
|
||||
}
|
||||
out = append(out, val...)
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
|
|
@ -2,6 +2,8 @@ package memdb
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/hashicorp/go-immutable-radix"
|
||||
)
|
||||
|
@ -12,7 +14,7 @@ import (
|
|||
// transactions and MVCC.
|
||||
type MemDB struct {
|
||||
schema *DBSchema
|
||||
root *iradix.Tree
|
||||
root unsafe.Pointer // *iradix.Tree underneath
|
||||
|
||||
// There can only be a single writter at once
|
||||
writer sync.Mutex
|
||||
|
@ -28,7 +30,7 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) {
|
|||
// Create the MemDB
|
||||
db := &MemDB{
|
||||
schema: schema,
|
||||
root: iradix.New(),
|
||||
root: unsafe.Pointer(iradix.New()),
|
||||
}
|
||||
if err := db.initialize(); err != nil {
|
||||
return nil, err
|
||||
|
@ -36,6 +38,12 @@ func NewMemDB(schema *DBSchema) (*MemDB, error) {
|
|||
return db, nil
|
||||
}
|
||||
|
||||
// getRoot is used to do an atomic load of the root pointer
|
||||
func (db *MemDB) getRoot() *iradix.Tree {
|
||||
root := (*iradix.Tree)(atomic.LoadPointer(&db.root))
|
||||
return root
|
||||
}
|
||||
|
||||
// Txn is used to start a new transaction, in either read or write mode.
|
||||
// There can only be a single concurrent writer, but any number of readers.
|
||||
func (db *MemDB) Txn(write bool) *Txn {
|
||||
|
@ -45,7 +53,7 @@ func (db *MemDB) Txn(write bool) *Txn {
|
|||
txn := &Txn{
|
||||
db: db,
|
||||
write: write,
|
||||
rootTxn: db.root.Txn(),
|
||||
rootTxn: db.getRoot().Txn(),
|
||||
}
|
||||
return txn
|
||||
}
|
||||
|
@ -56,20 +64,22 @@ func (db *MemDB) Txn(write bool) *Txn {
|
|||
func (db *MemDB) Snapshot() *MemDB {
|
||||
clone := &MemDB{
|
||||
schema: db.schema,
|
||||
root: db.root,
|
||||
root: unsafe.Pointer(db.getRoot()),
|
||||
}
|
||||
return clone
|
||||
}
|
||||
|
||||
// initialize is used to setup the DB for use after creation
|
||||
func (db *MemDB) initialize() error {
|
||||
root := db.getRoot()
|
||||
for tName, tableSchema := range db.schema.Tables {
|
||||
for iName, _ := range tableSchema.Indexes {
|
||||
index := iradix.New()
|
||||
path := indexPath(tName, iName)
|
||||
db.root, _, _ = db.root.Insert(path, index)
|
||||
root, _, _ = root.Insert(path, index)
|
||||
}
|
||||
}
|
||||
db.root = unsafe.Pointer(root)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
package memdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/hashicorp/go-immutable-radix"
|
||||
)
|
||||
|
@ -10,6 +13,7 @@ import (
|
|||
const (
|
||||
id = "id"
|
||||
)
|
||||
|
||||
// tableIndex is a tuple of (Table, Index) used for lookups
|
||||
type tableIndex struct {
|
||||
Table string
|
||||
|
@ -113,7 +117,8 @@ func (txn *Txn) Commit() {
|
|||
}
|
||||
|
||||
// Update the root of the DB
|
||||
txn.db.root = txn.rootTxn.Commit()
|
||||
newRoot := txn.rootTxn.Commit()
|
||||
atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot))
|
||||
|
||||
// Clear the txn
|
||||
txn.rootTxn = nil
|
||||
|
@ -161,28 +166,44 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
|
|||
for name, indexSchema := range tableSchema.Indexes {
|
||||
indexTxn := txn.writableIndex(table, name)
|
||||
|
||||
// Handle the update by deleting from the index first
|
||||
if update {
|
||||
ok, val, err := indexSchema.Indexer.FromObject(existing)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build index '%s': %v", name, err)
|
||||
}
|
||||
if ok {
|
||||
// Handle non-unique index by computing a unique index.
|
||||
// This is done by appending the primary key which must
|
||||
// be unique anyways.
|
||||
if !indexSchema.Unique {
|
||||
val = append(val, idVal...)
|
||||
}
|
||||
indexTxn.Delete(val)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the insert after the update
|
||||
// Determine the new index value
|
||||
ok, val, err := indexSchema.Indexer.FromObject(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build index '%s': %v", name, err)
|
||||
}
|
||||
|
||||
// Handle non-unique index by computing a unique index.
|
||||
// This is done by appending the primary key which must
|
||||
// be unique anyways.
|
||||
if ok && !indexSchema.Unique {
|
||||
val = append(val, idVal...)
|
||||
}
|
||||
|
||||
// Handle the update by deleting from the index first
|
||||
if update {
|
||||
okExist, valExist, err := indexSchema.Indexer.FromObject(existing)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build index '%s': %v", name, err)
|
||||
}
|
||||
if okExist {
|
||||
// Handle non-unique index by computing a unique index.
|
||||
// This is done by appending the primary key which must
|
||||
// be unique anyways.
|
||||
if !indexSchema.Unique {
|
||||
valExist = append(valExist, idVal...)
|
||||
}
|
||||
|
||||
// If we are writing to the same index with the same value,
|
||||
// we can avoid the delete as the insert will overwrite the
|
||||
// value anyways.
|
||||
if !bytes.Equal(valExist, val) {
|
||||
indexTxn.Delete(valExist)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If there is no index value, either this is an error or an expected
|
||||
// case and we can skip updating
|
||||
if !ok {
|
||||
if indexSchema.AllowMissing {
|
||||
continue
|
||||
|
@ -191,12 +212,7 @@ func (txn *Txn) Insert(table string, obj interface{}) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Handle non-unique index by computing a unique index.
|
||||
// This is done by appending the primary key which must
|
||||
// be unique anyways.
|
||||
if !indexSchema.Unique {
|
||||
val = append(val, idVal...)
|
||||
}
|
||||
// Update the value of the index
|
||||
indexTxn.Insert(val, obj)
|
||||
}
|
||||
return nil
|
||||
|
@ -281,7 +297,7 @@ func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error)
|
|||
|
||||
// Do the deletes
|
||||
num := 0
|
||||
for _, obj := range(objs) {
|
||||
for _, obj := range objs {
|
||||
if err := txn.Delete(table, obj); err != nil {
|
||||
return num, err
|
||||
}
|
||||
|
@ -318,6 +334,39 @@ func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, er
|
|||
return value, nil
|
||||
}
|
||||
|
||||
// LongestPrefix is used to fetch the longest prefix match for the given
|
||||
// constraints on the index. Note that this will not work with the memdb
|
||||
// StringFieldIndex because it adds null terminators which prevent the
|
||||
// algorithm from correctly finding a match (it will get to right before the
|
||||
// null and fail to find a leaf node). This should only be used where the prefix
|
||||
// given is capable of matching indexed entries directly, which typically only
|
||||
// applies to a custom indexer. See the unit test for an example.
|
||||
func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interface{}, error) {
|
||||
// Enforce that this only works on prefix indexes.
|
||||
if !strings.HasSuffix(index, "_prefix") {
|
||||
return nil, fmt.Errorf("must use '%s_prefix' on index", index)
|
||||
}
|
||||
|
||||
// Get the index value.
|
||||
indexSchema, val, err := txn.getIndexValue(table, index, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// This algorithm only makes sense against a unique index, otherwise the
|
||||
// index keys will have the IDs appended to them.
|
||||
if !indexSchema.Unique {
|
||||
return nil, fmt.Errorf("index '%s' is not unique", index)
|
||||
}
|
||||
|
||||
// Find the longest prefix match with the given index.
|
||||
indexTxn := txn.readableIndex(table, indexSchema.Name)
|
||||
if _, value, ok := indexTxn.Root().LongestPrefix(val); ok {
|
||||
return value, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// getIndexValue is used to get the IndexSchema and the value
|
||||
// used to scan the index given the parameters. This handles prefix based
|
||||
// scans when the index has the "_prefix" suffix. The index must support
|
||||
|
|
Loading…
Reference in New Issue