Adds new variant of blocking query wrapper with WatchSet support.

This commit is contained in:
James Phillips 2017-01-13 11:17:38 -08:00
parent 10f3bdf4ff
commit b21625a6af
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
2 changed files with 64 additions and 0 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/memberlist" "github.com/hashicorp/memberlist"
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux" "github.com/hashicorp/yamux"
@ -422,6 +423,59 @@ RUN_QUERY:
return err return err
} }
// queryFn is used to perform a query operation. If a re-query is needed, the
// passed-in watch set will be used to block for changes.
type queryFn func(memdb.WatchSet) error
// blockingQuery is used to process a potentially blocking query operation.
func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
fn queryFn) error {
var timeout *time.Timer
// Fast path right to the non-blocking query.
if queryOpts.MinQueryIndex == 0 {
goto RUN_QUERY
}
// Restrict the max query time, and ensure there is always one.
if queryOpts.MaxQueryTime > maxQueryTime {
queryOpts.MaxQueryTime = maxQueryTime
} else if queryOpts.MaxQueryTime <= 0 {
queryOpts.MaxQueryTime = defaultQueryTime
}
// Apply a small amount of jitter to the request.
queryOpts.MaxQueryTime += lib.RandomStagger(queryOpts.MaxQueryTime / jitterFraction)
// Setup a query timeout.
timeout = time.NewTimer(queryOpts.MaxQueryTime)
defer timeout.Stop()
RUN_QUERY:
// Update the query metadata.
s.setQueryMeta(queryMeta)
// If the read must be consistent we verify that we are still the leader.
if queryOpts.RequireConsistent {
if err := s.consistentRead(); err != nil {
return err
}
}
// Run the query.
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
ws := memdb.NewWatchSet()
err := fn(ws)
// Block up to the timeout if we didn't see anything fresh.
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
if expired := ws.Watch(timeout.C); !expired {
goto RUN_QUERY
}
}
return err
}
// setQueryMeta is used to populate the QueryMeta data for an RPC call // setQueryMeta is used to populate the QueryMeta data for an RPC call
func (s *Server) setQueryMeta(m *structs.QueryMeta) { func (s *Server) setQueryMeta(m *structs.QueryMeta) {
if s.IsLeader() { if s.IsLeader() {

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb"
) )
func testUUID() string { func testUUID() string {
@ -122,6 +123,15 @@ func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) {
} }
} }
// watchFired is a helper for unit tests that returns if the given watch set
// fired (it doesn't care which watch actually fired). This uses a fixed
// 1 ms timeout since we already expect the event happened before calling
// this and just need to distinguish a fire from a timeout.
func watchFired(ws memdb.WatchSet) bool {
timedOut := ws.Watch(time.After(1 * time.Millisecond))
return !timedOut
}
func TestStateStore_Restore_Abort(t *testing.T) { func TestStateStore_Restore_Abort(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)