mirror of https://github.com/status-im/consul.git
Adds fine-grained watches to prepared query endpoints.
This commit is contained in:
parent
dfcffe097c
commit
8b7977ccb3
|
@ -553,7 +553,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify queries are restored.
|
||||
_, queries, err := fsm2.state.PreparedQueryList()
|
||||
_, queries, err := fsm2.state.PreparedQueryList(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1131,7 +1131,7 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) {
|
|||
|
||||
// Verify it's in the state store.
|
||||
{
|
||||
_, actual, err := fsm.state.PreparedQueryGet(query.Query.ID)
|
||||
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1158,7 +1158,7 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) {
|
|||
|
||||
// Verify the update.
|
||||
{
|
||||
_, actual, err := fsm.state.PreparedQueryGet(query.Query.ID)
|
||||
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -1184,7 +1184,7 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) {
|
|||
|
||||
// Make sure it's gone.
|
||||
{
|
||||
_, actual, err := fsm.state.PreparedQueryGet(query.Query.ID)
|
||||
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
|
@ -45,7 +46,7 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string)
|
|||
if args.Query.ID, err = uuid.GenerateUUID(); err != nil {
|
||||
return fmt.Errorf("UUID generation for prepared query failed: %v", err)
|
||||
}
|
||||
_, query, err := state.PreparedQueryGet(args.Query.ID)
|
||||
_, query, err := state.PreparedQueryGet(nil, args.Query.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Prepared query lookup failed: %v", err)
|
||||
}
|
||||
|
@ -77,7 +78,7 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string)
|
|||
// access to whatever they are changing, if prefix ACLs apply to it.
|
||||
if args.Op != structs.PreparedQueryCreate {
|
||||
state := p.srv.fsm.State()
|
||||
_, query, err := state.PreparedQueryGet(args.Query.ID)
|
||||
_, query, err := state.PreparedQueryGet(nil, args.Query.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Prepared Query lookup failed: %v", err)
|
||||
}
|
||||
|
@ -218,12 +219,11 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
|
|||
|
||||
// Get the requested query.
|
||||
state := p.srv.fsm.State()
|
||||
return p.srv.blockingRPC(
|
||||
return p.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetQueryWatch("PreparedQueryGet"),
|
||||
func() error {
|
||||
index, query, err := state.PreparedQueryGet(args.QueryID)
|
||||
func(ws memdb.WatchSet) error {
|
||||
index, query, err := state.PreparedQueryGet(ws, args.QueryID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -265,12 +265,11 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind
|
|||
|
||||
// Get the list of queries.
|
||||
state := p.srv.fsm.State()
|
||||
return p.srv.blockingRPC(
|
||||
return p.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetQueryWatch("PreparedQueryList"),
|
||||
func() error {
|
||||
index, queries, err := state.PreparedQueryList()
|
||||
func(ws memdb.WatchSet) error {
|
||||
index, queries, err := state.PreparedQueryList(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -238,18 +238,19 @@ func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches *
|
|||
}
|
||||
|
||||
// PreparedQueryGet returns the given prepared query by ID.
|
||||
func (s *StateStore) PreparedQueryGet(queryID string) (uint64, *structs.PreparedQuery, error) {
|
||||
func (s *StateStore) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64, *structs.PreparedQuery, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryGet")...)
|
||||
idx := maxIndexTxn(tx, "prepared-queries")
|
||||
|
||||
// Look up the query by its ID.
|
||||
wrapped, err := tx.First("prepared-queries", "id", queryID)
|
||||
watchCh, wrapped, err := tx.FirstWatch("prepared-queries", "id", queryID)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
ws.Add(watchCh)
|
||||
return idx, toPreparedQuery(wrapped), nil
|
||||
}
|
||||
|
||||
|
@ -331,7 +332,7 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct
|
|||
}
|
||||
|
||||
// PreparedQueryList returns all the prepared queries.
|
||||
func (s *StateStore) PreparedQueryList() (uint64, structs.PreparedQueries, error) {
|
||||
func (s *StateStore) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.PreparedQueries, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -343,6 +344,7 @@ func (s *StateStore) PreparedQueryList() (uint64, structs.PreparedQueries, error
|
|||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
|
||||
}
|
||||
ws.Add(queries.WatchCh())
|
||||
|
||||
// Go over all of the queries and build the response.
|
||||
var result structs.PreparedQueries
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
func TestStateStore_PreparedQuery_isUUID(t *testing.T) {
|
||||
|
@ -37,7 +38,8 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
s := testStateStore(t)
|
||||
|
||||
// Querying with no results returns nil.
|
||||
idx, res, err := s.PreparedQueryGet(testUUID())
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, res, err := s.PreparedQueryGet(ws, testUUID())
|
||||
if idx != 0 || res != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||
}
|
||||
|
@ -51,6 +53,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
if idx := s.maxIndex("prepared-queries"); idx != 0 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Build a legit-looking query with the most basic options.
|
||||
query := &structs.PreparedQuery{
|
||||
|
@ -71,6 +76,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
if idx := s.maxIndex("prepared-queries"); idx != 0 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Now register the service and remove the bogus session.
|
||||
testRegisterNode(t, s, 1, "foo")
|
||||
|
@ -86,6 +94,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
if idx := s.maxIndex("prepared-queries"); idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Read it back out and verify it.
|
||||
expected := &structs.PreparedQuery{
|
||||
|
@ -98,7 +109,8 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
ModifyIndex: 3,
|
||||
},
|
||||
}
|
||||
idx, actual, err := s.PreparedQueryGet(query.ID)
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, actual, err := s.PreparedQueryGet(ws, query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -119,11 +131,15 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
if idx := s.maxIndex("prepared-queries"); idx != 4 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Read it back and verify the data was updated as well as the index.
|
||||
expected.Name = "test-query"
|
||||
expected.ModifyIndex = 4
|
||||
idx, actual, err = s.PreparedQueryGet(query.ID)
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, actual, err = s.PreparedQueryGet(ws, query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -145,6 +161,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
if idx := s.maxIndex("prepared-queries"); idx != 4 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Now make a session and try again.
|
||||
session := &structs.Session{
|
||||
|
@ -162,11 +181,15 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
if idx := s.maxIndex("prepared-queries"); idx != 6 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Read it back and verify the data was updated as well as the index.
|
||||
expected.Session = query.Session
|
||||
expected.ModifyIndex = 6
|
||||
idx, actual, err = s.PreparedQueryGet(query.ID)
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, actual, err = s.PreparedQueryGet(ws, query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -192,7 +215,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Sanity check to make sure it's not there.
|
||||
idx, actual, err := s.PreparedQueryGet(evil.ID)
|
||||
idx, actual, err := s.PreparedQueryGet(nil, evil.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -220,7 +243,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Sanity check to make sure it's not there.
|
||||
idx, actual, err := s.PreparedQueryGet(evil.ID)
|
||||
idx, actual, err := s.PreparedQueryGet(nil, evil.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -250,7 +273,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Sanity check to make sure it's not there.
|
||||
idx, actual, err := s.PreparedQueryGet(evil.ID)
|
||||
idx, actual, err := s.PreparedQueryGet(nil, evil.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -266,6 +289,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
if idx := s.maxIndex("prepared-queries"); idx != 6 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Turn the query into a template with an empty name.
|
||||
query.Name = ""
|
||||
|
@ -280,6 +306,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
if idx := s.maxIndex("prepared-queries"); idx != 9 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Read it back and verify the data was updated as well as the index.
|
||||
expected.Name = ""
|
||||
|
@ -287,7 +316,8 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
Type: structs.QueryTemplateTypeNamePrefixMatch,
|
||||
}
|
||||
expected.ModifyIndex = 9
|
||||
idx, actual, err = s.PreparedQueryGet(query.ID)
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, actual, err = s.PreparedQueryGet(ws, query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -316,7 +346,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Sanity check to make sure it's not there.
|
||||
idx, actual, err := s.PreparedQueryGet(evil.ID)
|
||||
idx, actual, err := s.PreparedQueryGet(nil, evil.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -338,11 +368,15 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
if idx := s.maxIndex("prepared-queries"); idx != 11 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Read it back and verify the data was updated as well as the index.
|
||||
expected.Name = "prefix"
|
||||
expected.ModifyIndex = 11
|
||||
idx, actual, err = s.PreparedQueryGet(query.ID)
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, actual, err = s.PreparedQueryGet(ws, query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -371,7 +405,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Sanity check to make sure it's not there.
|
||||
idx, actual, err := s.PreparedQueryGet(evil.ID)
|
||||
idx, actual, err := s.PreparedQueryGet(nil, evil.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -401,7 +435,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
}
|
||||
|
||||
// Sanity check to make sure it's not there.
|
||||
idx, actual, err := s.PreparedQueryGet(evil.ID)
|
||||
idx, actual, err := s.PreparedQueryGet(nil, evil.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -412,6 +446,10 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
|
|||
t.Fatalf("bad: %v", actual)
|
||||
}
|
||||
}
|
||||
|
||||
if watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_PreparedQueryDelete(t *testing.T) {
|
||||
|
@ -460,7 +498,8 @@ func TestStateStore_PreparedQueryDelete(t *testing.T) {
|
|||
ModifyIndex: 3,
|
||||
},
|
||||
}
|
||||
idx, actual, err := s.PreparedQueryGet(query.ID)
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, actual, err := s.PreparedQueryGet(ws, query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -480,9 +519,12 @@ func TestStateStore_PreparedQueryDelete(t *testing.T) {
|
|||
if idx := s.maxIndex("prepared-queries"); idx != 4 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Sanity check to make sure it's not there.
|
||||
idx, actual, err = s.PreparedQueryGet(query.ID)
|
||||
idx, actual, err = s.PreparedQueryGet(nil, query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -716,7 +758,8 @@ func TestStateStore_PreparedQueryList(t *testing.T) {
|
|||
s := testStateStore(t)
|
||||
|
||||
// Make sure nothing is returned for an empty query
|
||||
idx, actual, err := s.PreparedQueryList()
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, actual, err := s.PreparedQueryList(ws)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -761,6 +804,9 @@ func TestStateStore_PreparedQueryList(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Read it back and verify.
|
||||
expected := structs.PreparedQueries{
|
||||
|
@ -787,7 +833,7 @@ func TestStateStore_PreparedQueryList(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
idx, actual, err = s.PreparedQueryList()
|
||||
idx, actual, err = s.PreparedQueryList(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -901,7 +947,7 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
|
|||
|
||||
// Read the restored queries back out and verify that they
|
||||
// match.
|
||||
idx, actual, err := s.PreparedQueryList()
|
||||
idx, actual, err := s.PreparedQueryList(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -898,7 +898,7 @@ func TestStateStore_Session_Invalidate_PreparedQuery_Delete(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make sure the query is gone and the index is updated.
|
||||
idx, q2, err := s.PreparedQueryGet(query.ID)
|
||||
idx, q2, err := s.PreparedQueryGet(nil, query.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue