From 8172526801bf4d9b224d9f9fbaedd0a62b6beb3b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 1 Apr 2014 11:42:07 -0700 Subject: [PATCH] consul: Support a streaming transaction --- consul/mdb_table.go | 23 +++++++++++ consul/mdb_table_test.go | 88 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/consul/mdb_table.go b/consul/mdb_table.go index 58f2781243..eeead56c34 100644 --- a/consul/mdb_table.go +++ b/consul/mdb_table.go @@ -376,6 +376,29 @@ func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interfac return results, err } +// StreamTxn is like GetTxn but it streams the results over a channel. +// This can be used if the expected data set is very large. The stream +// is always closed on return. +func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string, parts ...string) error { + // Always close the stream on return + defer close(stream) + + // Get the associated index + idx, key, err := t.getIndex(index, parts) + if err != nil { + return err + } + + // Stream the results + err = idx.iterate(tx, key, func(encRowId, res []byte) bool { + obj := t.Decoder(res) + stream <- obj + return false + }) + + return err +} + // getIndex is used to get the proper index, and also check the arity func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, error) { // Get the index diff --git a/consul/mdb_table_test.go b/consul/mdb_table_test.go index 061240df28..d57a9bd6b2 100644 --- a/consul/mdb_table_test.go +++ b/consul/mdb_table_test.go @@ -884,3 +884,91 @@ func TestMDBTableVirtualIndex(t *testing.T) { t.Fatalf("expect 1 result: %#v", res) } } + +func TestMDBTableStream(t *testing.T) { + dir, env := testMDBEnv(t) + defer os.RemoveAll(dir) + defer env.Close() + + table := &MDBTable{ + Env: env, + Name: "test", + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Key"}, + }, + "name": &MDBIndex{ + Fields: []string{"First", "Last"}, + }, + "country": &MDBIndex{ + Fields: []string{"Country"}, + }, + }, + Encoder: MockEncoder, + Decoder: MockDecoder, + } + if err := table.Init(); err != nil { + t.Fatalf("err: %v", err) + } + + objs := []*MockData{ + &MockData{ + Key: "1", + First: "Kevin", + Last: "Smith", + Country: "USA", + }, + &MockData{ + Key: "2", + First: "Kevin", + Last: "Wang", + Country: "USA", + }, + &MockData{ + Key: "3", + First: "Bernardo", + Last: "Torres", + Country: "Mexico", + }, + } + + // Insert some mock objects + for idx, obj := range objs { + if err := table.Insert(obj); err != nil { + t.Fatalf("err: %v", err) + } + if err := table.SetLastIndex(uint64(idx + 1)); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Start a readonly txn + tx, err := table.StartTxn(true, nil) + if err != nil { + panic(err) + } + defer tx.Abort() + + // Stream the records + streamCh := make(chan interface{}) + go func() { + if err := table.StreamTxn(streamCh, tx, "id"); err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Verify we get them all + idx := 0 + for obj := range streamCh { + p := obj.(*MockData) + if !reflect.DeepEqual(p, objs[idx]) { + t.Fatalf("bad: %#v %#v", p, objs[idx]) + } + idx++ + } + + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } +}