consul: Support a streaming transaction

This commit is contained in:
Armon Dadgar 2014-04-01 11:42:07 -07:00
parent 4ccb2d4a73
commit 8172526801
2 changed files with 111 additions and 0 deletions

View File

@ -376,6 +376,29 @@ func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interfac
return results, err
}
// StreamTxn is like GetTxn but it streams the results over a channel.
// This can be used if the expected data set is very large. The stream
// is always closed on return.
func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string, parts ...string) error {
// Always close the stream on return
defer close(stream)
// Get the associated index
idx, key, err := t.getIndex(index, parts)
if err != nil {
return err
}
// Stream the results
err = idx.iterate(tx, key, func(encRowId, res []byte) bool {
obj := t.Decoder(res)
stream <- obj
return false
})
return err
}
// getIndex is used to get the proper index, and also check the arity
func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, error) {
// Get the index

View File

@ -884,3 +884,91 @@ func TestMDBTableVirtualIndex(t *testing.T) {
t.Fatalf("expect 1 result: %#v", res)
}
}
func TestMDBTableStream(t *testing.T) {
dir, env := testMDBEnv(t)
defer os.RemoveAll(dir)
defer env.Close()
table := &MDBTable{
Env: env,
Name: "test",
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"Key"},
},
"name": &MDBIndex{
Fields: []string{"First", "Last"},
},
"country": &MDBIndex{
Fields: []string{"Country"},
},
},
Encoder: MockEncoder,
Decoder: MockDecoder,
}
if err := table.Init(); err != nil {
t.Fatalf("err: %v", err)
}
objs := []*MockData{
&MockData{
Key: "1",
First: "Kevin",
Last: "Smith",
Country: "USA",
},
&MockData{
Key: "2",
First: "Kevin",
Last: "Wang",
Country: "USA",
},
&MockData{
Key: "3",
First: "Bernardo",
Last: "Torres",
Country: "Mexico",
},
}
// Insert some mock objects
for idx, obj := range objs {
if err := table.Insert(obj); err != nil {
t.Fatalf("err: %v", err)
}
if err := table.SetLastIndex(uint64(idx + 1)); err != nil {
t.Fatalf("err: %v", err)
}
}
// Start a readonly txn
tx, err := table.StartTxn(true, nil)
if err != nil {
panic(err)
}
defer tx.Abort()
// Stream the records
streamCh := make(chan interface{})
go func() {
if err := table.StreamTxn(streamCh, tx, "id"); err != nil {
t.Fatalf("err: %v", err)
}
}()
// Verify we get them all
idx := 0
for obj := range streamCh {
p := obj.(*MockData)
if !reflect.DeepEqual(p, objs[idx]) {
t.Fatalf("bad: %#v %#v", p, objs[idx])
}
idx++
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
}