consul: Adding support for virtual indexes

This commit is contained in:
Armon Dadgar 2014-03-31 13:06:29 -07:00
parent 8d7c5fc9cd
commit 59703cbfae
2 changed files with 158 additions and 4 deletions

View File

@ -49,10 +49,13 @@ type MDBIndex struct {
Unique bool // Controls if values are unique
Fields []string // Fields are used to build the index
IdxFunc IndexFunc // Can be used to provide custom indexing
Virtual bool // Virtual index does not exist, but can be used for queries
RealIndex string // Virtual indexes use a RealIndex for iteration
table *MDBTable
name string
dbiName string
table *MDBTable
name string
dbiName string
realIndex *MDBIndex
}
// MDBTxn is used to wrap an underlying transaction
@ -88,6 +91,17 @@ func DefaultIndexFunc(idx *MDBIndex, parts []string) string {
return prefix
}
// DefaultIndexPrefixFunc can be used with DefaultIndexFunc to scan
// for index prefix values. This should only be used as part of a
// virtual index.
func DefaultIndexPrefixFunc(idx *MDBIndex, parts []string) string {
if len(parts) == 0 {
return "_"
}
prefix := "_" + strings.Join(parts, "||")
return prefix
}
// Init is used to initialize the MDBTable and ensure it's ready
func (t *MDBTable) Init() error {
if t.Env == nil {
@ -111,6 +125,9 @@ func (t *MDBTable) Init() error {
if id.AllowBlank {
return fmt.Errorf("id index must not allow blanks")
}
if id.Virtual {
return fmt.Errorf("id index cannot be virtual")
}
// Create the table
if err := t.createTable(); err != nil {
@ -221,6 +238,9 @@ EXTEND:
mdbTxn.dbis[t.Name] = dbi
for _, index := range t.Indexes {
if index.Virtual {
continue
}
dbi, err := index.openDBI(tx)
if err != nil {
tx.Abort()
@ -237,6 +257,9 @@ func (t *MDBTable) objIndexKeys(obj interface{}) (map[string][]byte, error) {
// Construct the indexes keys
indexes := make(map[string][]byte)
for name, index := range t.Indexes {
if index.Virtual {
continue
}
key, err := index.keyFromObject(obj)
if err != nil {
return nil, err
@ -301,6 +324,9 @@ AFTER_DELETE:
// Insert the new indexes
for name, index := range t.Indexes {
if index.Virtual {
continue
}
dbi := tx.dbis[index.dbiName]
if err := tx.tx.Put(dbi, indexes[name], encRowId, 0); err != nil {
return err
@ -427,6 +453,12 @@ func (t *MDBTable) deleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) (num i
if name == idx.name {
continue
}
if idx.Virtual && name == idx.RealIndex {
continue
}
if otherIdx.Virtual {
continue
}
dbi := tx.dbis[otherIdx.dbiName]
if err := tx.tx.Del(dbi, indexes[name], encRowId); err != nil {
panic(err)
@ -464,11 +496,23 @@ func (i *MDBIndex) init(table *MDBTable, name string) error {
if err := i.createIndex(); err != nil {
return err
}
// Verify real index exists
if i.Virtual {
if realIndex, ok := table.Indexes[i.RealIndex]; !ok {
return fmt.Errorf("real index '%s' missing", i.RealIndex)
} else {
i.realIndex = realIndex
}
}
return nil
}
// createIndex is used to ensure the index exists
func (i *MDBIndex) createIndex() error {
// Do not create if this is a virtual index
if i.Virtual {
return nil
}
tx, err := i.table.Env.BeginTxn(nil, 0)
if err != nil {
return err
@ -529,7 +573,14 @@ func (i *MDBIndex) keyFromParts(parts ...string) []byte {
func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
cb func(encRowId, res []byte) bool) error {
table := tx.dbis[i.table.Name]
dbi := tx.dbis[i.dbiName]
// If virtual, use the correct DBI
var dbi mdb.DBI
if i.Virtual {
dbi = tx.dbis[i.realIndex.dbiName]
} else {
dbi = tx.dbis[i.dbiName]
}
cursor, err := tx.tx.CursorOpen(dbi)
if err != nil {

View File

@ -781,3 +781,106 @@ func TestMDBTableDelete_Prefix(t *testing.T) {
t.Fatalf("expect 2 result: %#v", res)
}
}
func TestMDBTableVirtualIndex(t *testing.T) {
dir, env := testMDBEnv(t)
defer os.RemoveAll(dir)
defer env.Close()
table := &MDBTable{
Env: env,
Name: "test",
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"First"},
},
"id_prefix": &MDBIndex{
Virtual: true,
RealIndex: "id",
Fields: []string{"First"},
IdxFunc: DefaultIndexPrefixFunc,
},
},
Encoder: MockEncoder,
Decoder: MockDecoder,
}
if err := table.Init(); err != nil {
t.Fatalf("err: %v", err)
}
if table.lastRowID != 0 {
t.Fatalf("bad last row id: %d", table.lastRowID)
}
objs := []*MockData{
&MockData{
Key: "1",
First: "Jack",
Last: "Smith",
Country: "USA",
},
&MockData{
Key: "2",
First: "John",
Last: "Wang",
Country: "USA",
},
&MockData{
Key: "3",
First: "James",
Last: "Torres",
Country: "Mexico",
},
}
// Insert some mock objects
for idx, obj := range objs {
if err := table.Insert(obj); err != nil {
t.Fatalf("err: %v", err)
}
if err := table.SetLastIndex(uint64(4 * idx)); err != nil {
t.Fatalf("err: %v", err)
}
}
if table.lastRowID != 3 {
t.Fatalf("bad last row id: %d", table.lastRowID)
}
if idx, _ := table.LastIndex(); idx != 8 {
t.Fatalf("bad last idx: %d", idx)
}
_, res, err := table.Get("id_prefix", "J")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 3 {
t.Fatalf("expect 3 result: %#v", res)
}
_, res, err = table.Get("id_prefix", "Ja")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 2 {
t.Fatalf("expect 2 result: %#v", res)
}
num, err := table.Delete("id_prefix", "Ja")
if err != nil {
t.Fatalf("err: %v", err)
}
if num != 2 {
t.Fatalf("expect 2 result: %#v", num)
}
_, res, err = table.Get("id_prefix", "J")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 1 {
t.Fatalf("expect 1 result: %#v", res)
}
}