diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 15770f6843..03d65632fd 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -553,7 +553,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { } // Verify queries are restored. - _, queries, err := fsm2.state.PreparedQueryList() + _, queries, err := fsm2.state.PreparedQueryList(nil) if err != nil { t.Fatalf("err: %s", err) } @@ -1131,7 +1131,7 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) { // Verify it's in the state store. { - _, actual, err := fsm.state.PreparedQueryGet(query.Query.ID) + _, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -1158,7 +1158,7 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) { // Verify the update. { - _, actual, err := fsm.state.PreparedQueryGet(query.Query.ID) + _, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -1184,7 +1184,7 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) { // Make sure it's gone. { - _, actual, err := fsm.state.PreparedQueryGet(query.Query.ID) + _, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID) if err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/prepared_query_endpoint.go b/consul/prepared_query_endpoint.go index 6a1c5a760b..baa14776cb 100644 --- a/consul/prepared_query_endpoint.go +++ b/consul/prepared_query_endpoint.go @@ -9,6 +9,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" ) @@ -45,7 +46,7 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) if args.Query.ID, err = uuid.GenerateUUID(); err != nil { return fmt.Errorf("UUID generation for prepared query failed: %v", err) } - _, query, err := state.PreparedQueryGet(args.Query.ID) + _, query, err := state.PreparedQueryGet(nil, args.Query.ID) if err != nil { return fmt.Errorf("Prepared query lookup failed: %v", err) } @@ -77,7 +78,7 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) // access to whatever they are changing, if prefix ACLs apply to it. if args.Op != structs.PreparedQueryCreate { state := p.srv.fsm.State() - _, query, err := state.PreparedQueryGet(args.Query.ID) + _, query, err := state.PreparedQueryGet(nil, args.Query.ID) if err != nil { return fmt.Errorf("Prepared Query lookup failed: %v", err) } @@ -218,12 +219,11 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, // Get the requested query. state := p.srv.fsm.State() - return p.srv.blockingRPC( + return p.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetQueryWatch("PreparedQueryGet"), - func() error { - index, query, err := state.PreparedQueryGet(args.QueryID) + func(ws memdb.WatchSet) error { + index, query, err := state.PreparedQueryGet(ws, args.QueryID) if err != nil { return err } @@ -265,12 +265,11 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind // Get the list of queries. state := p.srv.fsm.State() - return p.srv.blockingRPC( + return p.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - state.GetQueryWatch("PreparedQueryList"), - func() error { - index, queries, err := state.PreparedQueryList() + func(ws memdb.WatchSet) error { + index, queries, err := state.PreparedQueryList(ws) if err != nil { return err } diff --git a/consul/state/prepared_query.go b/consul/state/prepared_query.go index c84496fbdd..3c97435400 100644 --- a/consul/state/prepared_query.go +++ b/consul/state/prepared_query.go @@ -238,18 +238,19 @@ func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches * } // PreparedQueryGet returns the given prepared query by ID. -func (s *StateStore) PreparedQueryGet(queryID string) (uint64, *structs.PreparedQuery, error) { +func (s *StateStore) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64, *structs.PreparedQuery, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryGet")...) + idx := maxIndexTxn(tx, "prepared-queries") // Look up the query by its ID. - wrapped, err := tx.First("prepared-queries", "id", queryID) + watchCh, wrapped, err := tx.FirstWatch("prepared-queries", "id", queryID) if err != nil { return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err) } + ws.Add(watchCh) return idx, toPreparedQuery(wrapped), nil } @@ -331,7 +332,7 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct } // PreparedQueryList returns all the prepared queries. -func (s *StateStore) PreparedQueryList() (uint64, structs.PreparedQueries, error) { +func (s *StateStore) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.PreparedQueries, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -343,6 +344,7 @@ func (s *StateStore) PreparedQueryList() (uint64, structs.PreparedQueries, error if err != nil { return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err) } + ws.Add(queries.WatchCh()) // Go over all of the queries and build the response. var result structs.PreparedQueries diff --git a/consul/state/prepared_query_test.go b/consul/state/prepared_query_test.go index c0581986be..4277de2e2d 100644 --- a/consul/state/prepared_query_test.go +++ b/consul/state/prepared_query_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/go-memdb" ) func TestStateStore_PreparedQuery_isUUID(t *testing.T) { @@ -37,7 +38,8 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { s := testStateStore(t) // Querying with no results returns nil. - idx, res, err := s.PreparedQueryGet(testUUID()) + ws := memdb.NewWatchSet() + idx, res, err := s.PreparedQueryGet(ws, testUUID()) if idx != 0 || res != nil || err != nil { t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) } @@ -51,6 +53,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { if idx := s.maxIndex("prepared-queries"); idx != 0 { t.Fatalf("bad index: %d", idx) } + if watchFired(ws) { + t.Fatalf("bad") + } // Build a legit-looking query with the most basic options. query := &structs.PreparedQuery{ @@ -71,6 +76,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { if idx := s.maxIndex("prepared-queries"); idx != 0 { t.Fatalf("bad index: %d", idx) } + if watchFired(ws) { + t.Fatalf("bad") + } // Now register the service and remove the bogus session. testRegisterNode(t, s, 1, "foo") @@ -86,6 +94,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { if idx := s.maxIndex("prepared-queries"); idx != 3 { t.Fatalf("bad index: %d", idx) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Read it back out and verify it. expected := &structs.PreparedQuery{ @@ -98,7 +109,8 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { ModifyIndex: 3, }, } - idx, actual, err := s.PreparedQueryGet(query.ID) + ws = memdb.NewWatchSet() + idx, actual, err := s.PreparedQueryGet(ws, query.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -119,11 +131,15 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { if idx := s.maxIndex("prepared-queries"); idx != 4 { t.Fatalf("bad index: %d", idx) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Read it back and verify the data was updated as well as the index. expected.Name = "test-query" expected.ModifyIndex = 4 - idx, actual, err = s.PreparedQueryGet(query.ID) + ws = memdb.NewWatchSet() + idx, actual, err = s.PreparedQueryGet(ws, query.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -145,6 +161,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { if idx := s.maxIndex("prepared-queries"); idx != 4 { t.Fatalf("bad index: %d", idx) } + if watchFired(ws) { + t.Fatalf("bad") + } // Now make a session and try again. session := &structs.Session{ @@ -162,11 +181,15 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { if idx := s.maxIndex("prepared-queries"); idx != 6 { t.Fatalf("bad index: %d", idx) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Read it back and verify the data was updated as well as the index. expected.Session = query.Session expected.ModifyIndex = 6 - idx, actual, err = s.PreparedQueryGet(query.ID) + ws = memdb.NewWatchSet() + idx, actual, err = s.PreparedQueryGet(ws, query.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -192,7 +215,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { } // Sanity check to make sure it's not there. - idx, actual, err := s.PreparedQueryGet(evil.ID) + idx, actual, err := s.PreparedQueryGet(nil, evil.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -220,7 +243,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { } // Sanity check to make sure it's not there. - idx, actual, err := s.PreparedQueryGet(evil.ID) + idx, actual, err := s.PreparedQueryGet(nil, evil.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -250,7 +273,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { } // Sanity check to make sure it's not there. - idx, actual, err := s.PreparedQueryGet(evil.ID) + idx, actual, err := s.PreparedQueryGet(nil, evil.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -266,6 +289,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { if idx := s.maxIndex("prepared-queries"); idx != 6 { t.Fatalf("bad index: %d", idx) } + if watchFired(ws) { + t.Fatalf("bad") + } // Turn the query into a template with an empty name. query.Name = "" @@ -280,6 +306,9 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { if idx := s.maxIndex("prepared-queries"); idx != 9 { t.Fatalf("bad index: %d", idx) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Read it back and verify the data was updated as well as the index. expected.Name = "" @@ -287,7 +316,8 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { Type: structs.QueryTemplateTypeNamePrefixMatch, } expected.ModifyIndex = 9 - idx, actual, err = s.PreparedQueryGet(query.ID) + ws = memdb.NewWatchSet() + idx, actual, err = s.PreparedQueryGet(ws, query.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -316,7 +346,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { } // Sanity check to make sure it's not there. - idx, actual, err := s.PreparedQueryGet(evil.ID) + idx, actual, err := s.PreparedQueryGet(nil, evil.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -338,11 +368,15 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { if idx := s.maxIndex("prepared-queries"); idx != 11 { t.Fatalf("bad index: %d", idx) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Read it back and verify the data was updated as well as the index. expected.Name = "prefix" expected.ModifyIndex = 11 - idx, actual, err = s.PreparedQueryGet(query.ID) + ws = memdb.NewWatchSet() + idx, actual, err = s.PreparedQueryGet(ws, query.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -371,7 +405,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { } // Sanity check to make sure it's not there. - idx, actual, err := s.PreparedQueryGet(evil.ID) + idx, actual, err := s.PreparedQueryGet(nil, evil.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -401,7 +435,7 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { } // Sanity check to make sure it's not there. - idx, actual, err := s.PreparedQueryGet(evil.ID) + idx, actual, err := s.PreparedQueryGet(nil, evil.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -412,6 +446,10 @@ func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) { t.Fatalf("bad: %v", actual) } } + + if watchFired(ws) { + t.Fatalf("bad") + } } func TestStateStore_PreparedQueryDelete(t *testing.T) { @@ -460,7 +498,8 @@ func TestStateStore_PreparedQueryDelete(t *testing.T) { ModifyIndex: 3, }, } - idx, actual, err := s.PreparedQueryGet(query.ID) + ws := memdb.NewWatchSet() + idx, actual, err := s.PreparedQueryGet(ws, query.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -480,9 +519,12 @@ func TestStateStore_PreparedQueryDelete(t *testing.T) { if idx := s.maxIndex("prepared-queries"); idx != 4 { t.Fatalf("bad index: %d", idx) } + if !watchFired(ws) { + t.Fatalf("bad") + } // Sanity check to make sure it's not there. - idx, actual, err = s.PreparedQueryGet(query.ID) + idx, actual, err = s.PreparedQueryGet(nil, query.ID) if err != nil { t.Fatalf("err: %s", err) } @@ -716,7 +758,8 @@ func TestStateStore_PreparedQueryList(t *testing.T) { s := testStateStore(t) // Make sure nothing is returned for an empty query - idx, actual, err := s.PreparedQueryList() + ws := memdb.NewWatchSet() + idx, actual, err := s.PreparedQueryList(ws) if err != nil { t.Fatalf("err: %s", err) } @@ -761,6 +804,9 @@ func TestStateStore_PreparedQueryList(t *testing.T) { t.Fatalf("err: %s", err) } } + if !watchFired(ws) { + t.Fatalf("bad") + } // Read it back and verify. expected := structs.PreparedQueries{ @@ -787,7 +833,7 @@ func TestStateStore_PreparedQueryList(t *testing.T) { }, }, } - idx, actual, err = s.PreparedQueryList() + idx, actual, err = s.PreparedQueryList(nil) if err != nil { t.Fatalf("err: %s", err) } @@ -901,7 +947,7 @@ func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) { // Read the restored queries back out and verify that they // match. - idx, actual, err := s.PreparedQueryList() + idx, actual, err := s.PreparedQueryList(nil) if err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/state/session_test.go b/consul/state/session_test.go index 3e435e7e16..68b588c30f 100644 --- a/consul/state/session_test.go +++ b/consul/state/session_test.go @@ -898,7 +898,7 @@ func TestStateStore_Session_Invalidate_PreparedQuery_Delete(t *testing.T) { } // Make sure the query is gone and the index is updated. - idx, q2, err := s.PreparedQueryGet(query.ID) + idx, q2, err := s.PreparedQueryGet(nil, query.ID) if err != nil { t.Fatalf("err: %s", err) }