mirror of
https://github.com/status-im/consul.git
synced 2025-02-23 02:48:19 +00:00
Adds fine-grained watches to coordinate endpoints.
This commit is contained in:
parent
ec90404df0
commit
3675e5ceba
@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
@ -174,11 +175,10 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
|
||||
}
|
||||
|
||||
state := c.srv.fsm.State()
|
||||
return c.srv.blockingRPC(&args.QueryOptions,
|
||||
return c.srv.blockingQuery(&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetQueryWatch("Coordinates"),
|
||||
func() error {
|
||||
index, coords, err := state.Coordinates()
|
||||
func(ws memdb.WatchSet) error {
|
||||
index, coords, err := state.Coordinates(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -544,7 +544,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
||||
}()
|
||||
|
||||
// Verify coordinates are restored
|
||||
_, coords, err := fsm2.state.Coordinates()
|
||||
_, coords, err := fsm2.state.Coordinates(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
@ -832,7 +832,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
// Read back the two coordinates to make sure they got updated.
|
||||
_, coords, err := fsm.state.Coordinates()
|
||||
_, coords, err := fsm.state.Coordinates(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -58,20 +58,22 @@ func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, erro
|
||||
}
|
||||
|
||||
// Coordinates queries for all nodes with coordinates.
|
||||
func (s *StateStore) Coordinates() (uint64, structs.Coordinates, error) {
|
||||
func (s *StateStore) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("Coordinates")...)
|
||||
idx := maxIndexTxn(tx, "coordinates")
|
||||
|
||||
// Pull all the coordinates.
|
||||
coords, err := tx.Get("coordinates", "id")
|
||||
iter, err := tx.Get("coordinates", "id")
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
|
||||
}
|
||||
ws.Add(iter.WatchCh())
|
||||
|
||||
var results structs.Coordinates
|
||||
for coord := coords.Next(); coord != nil; coord = coords.Next() {
|
||||
for coord := iter.Next(); coord != nil; coord = iter.Next() {
|
||||
results = append(results, coord.(*structs.Coordinate))
|
||||
}
|
||||
return idx, results, nil
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
@ -29,7 +30,8 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
||||
|
||||
// Make sure the coordinates list starts out empty, and that a query for
|
||||
// a raw coordinate for a nonexistent node doesn't do anything bad.
|
||||
idx, coords, err := s.Coordinates()
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, coords, err := s.Coordinates(ws)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
@ -62,10 +64,14 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
||||
if err := s.CoordinateBatchUpdate(1, updates); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Should still be empty, though applying an empty batch does bump
|
||||
// the table index.
|
||||
idx, coords, err = s.Coordinates()
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, coords, err = s.Coordinates(ws)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
@ -82,9 +88,13 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
||||
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Should go through now.
|
||||
idx, coords, err = s.Coordinates()
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, coords, err = s.Coordinates(ws)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
@ -111,9 +121,12 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
||||
if err := s.CoordinateBatchUpdate(4, updates); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Verify it got applied.
|
||||
idx, coords, err = s.Coordinates()
|
||||
idx, coords, err = s.Coordinates(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
@ -175,7 +188,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
|
||||
}
|
||||
|
||||
// Make sure the index got updated.
|
||||
idx, coords, err := s.Coordinates()
|
||||
idx, coords, err := s.Coordinates(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
@ -252,7 +265,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||
restore.Commit()
|
||||
|
||||
// Read the restored coordinates back out and verify that they match.
|
||||
idx, res, err := s.Coordinates()
|
||||
idx, res, err := s.Coordinates(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user