From 3301f9400484e9b318f89488d922ebe3d57a6b49 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 14 Feb 2022 17:27:33 -0500 Subject: [PATCH] rpc: improve docs for blockingQuery Follow the Go convention of accepting a small interface that documents the methods used by the function. Clarify the rules for implementing a query function passed to blockingQuery. --- agent/consul/gateway_locator.go | 7 ++- agent/consul/gateway_locator_test.go | 4 +- agent/consul/rpc.go | 86 ++++++++++++++++++++++------ 3 files changed, 73 insertions(+), 24 deletions(-) diff --git a/agent/consul/gateway_locator.go b/agent/consul/gateway_locator.go index ce6e390337..ab72fdce3d 100644 --- a/agent/consul/gateway_locator.go +++ b/agent/consul/gateway_locator.go @@ -8,14 +8,15 @@ import ( "sync" "time" + "github.com/hashicorp/go-hclog" + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib/stringslice" "github.com/hashicorp/consul/logging" - "github.com/hashicorp/go-hclog" - memdb "github.com/hashicorp/go-memdb" ) // GatewayLocator assists in selecting an appropriate mesh gateway when wan @@ -269,7 +270,7 @@ func getRandomItem(items []string) string { } type serverDelegate interface { - blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error + blockingQuery(queryOpts blockingQueryOptions, queryMeta blockingQueryResponseMeta, fn queryFn) error IsLeader() bool LeaderLastContact() time.Time setDatacenterSupportsFederationStates() diff --git a/agent/consul/gateway_locator_test.go b/agent/consul/gateway_locator_test.go index 01bbfb105f..8f32acb86a 100644 --- a/agent/consul/gateway_locator_test.go +++ b/agent/consul/gateway_locator_test.go @@ -493,8 +493,8 @@ func (d *testServerDelegate) datacenterSupportsFederationStates() bool { // This is just enough to exercise the logic. func (d *testServerDelegate) blockingQuery( - queryOpts structs.QueryOptionsCompat, - queryMeta structs.QueryMetaCompat, + queryOpts blockingQueryOptions, + queryMeta blockingQueryResponseMeta, fn queryFn, ) error { minQueryIndex := queryOpts.GetMinQueryIndex() diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 60c192acc9..2bec441e1c 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -14,7 +14,6 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" - msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" connlimit "github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" @@ -24,6 +23,8 @@ import ( "github.com/hashicorp/yamux" "google.golang.org/grpc" + msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/wanfed" @@ -910,35 +911,82 @@ func (s *Server) raftApplyWithEncoder( return resp, nil } -// queryFn is used to perform a query operation. If a re-query is needed, the -// passed-in watch set will be used to block for changes. The passed-in state -// store should be used (vs. calling fsm.State()) since the given state store -// will be correctly watched for changes if the state store is restored from -// a snapshot. +// queryFn is used to perform a query operation. See Server.blockingQuery for +// the requirements of this function. type queryFn func(memdb.WatchSet, *state.Store) error -// blockingQuery is used to process a potentially blocking query operation. -func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error { +// blockingQueryOptions are options used by Server.blockingQuery to modify the +// behaviour of the query operation, or to populate response metadata. +type blockingQueryOptions interface { + GetToken() string + GetMinQueryIndex() uint64 + GetMaxQueryTime() time.Duration + GetRequireConsistent() bool +} + +// blockingQueryResponseMeta is an interface used to populate the response struct +// with metadata about the query and the state of the server. +type blockingQueryResponseMeta interface { + SetLastContact(time.Duration) + SetKnownLeader(bool) + GetIndex() uint64 + SetIndex(uint64) + SetResultsFilteredByACLs(bool) +} + +// blockingQuery performs a blocking query if opts.GetMinQueryIndex is +// greater than 0, otherwise performs a non-blocking query. Blocking queries will +// block until responseMeta.Index is greater than opts.GetMinQueryIndex, +// or opts.GetMaxQueryTime is reached. Non-blocking queries return immediately +// after performing the query. +// +// If opts.GetRequireConsistent is true, blockingQuery will first verify it is +// still the cluster leader before performing the query. +// +// The query function is expected to be a closure that has access to responseMeta +// so that it can set the Index. The actual result of the query is opaque to blockingQuery. +// If query function returns an error, the error is returned to the caller immediately. +// +// The query function must follow these rules: +// +// 1. to access data it must use the passed in state.Store. +// 2. it must set the responseMeta.Index to an index greater than +// opts.GetMinQueryIndex if the results return by the query have changed. +// 3. any channels added to the memdb.WatchSet must unblock when the results +// returned by the query have changed. +// +// To ensure optimal performance of the query, the query function should make a +// best-effort attempt to follow these guidelines: +// +// 1. only set responseMeta.Index to an index greater than +// opts.GetMinQueryIndex when the results returned by the query have changed. +// 2. any channels added to the memdb.WatchSet should only unblock when the +// results returned by the query have changed. +func (s *Server) blockingQuery( + opts blockingQueryOptions, + responseMeta blockingQueryResponseMeta, + query queryFn, +) error { var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh} metrics.IncrCounter([]string{"rpc", "query"}, 1) - minQueryIndex := queryOpts.GetMinQueryIndex() + minQueryIndex := opts.GetMinQueryIndex() // Perform a non-blocking query if minQueryIndex == 0 { - if queryOpts.GetRequireConsistent() { + if opts.GetRequireConsistent() { if err := s.consistentRead(); err != nil { return err } } var ws memdb.WatchSet - err := fn(ws, s.fsm.State()) - s.setQueryMeta(queryMeta, queryOpts.GetToken()) + err := query(ws, s.fsm.State()) + s.setQueryMeta(responseMeta, opts.GetToken()) return err } - timeout := s.rpcQueryTimeout(queryOpts.GetMaxQueryTime()) + timeout := s.rpcQueryTimeout(opts.GetMaxQueryTime()) ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -948,7 +996,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) for { - if queryOpts.GetRequireConsistent() { + if opts.GetRequireConsistent() { if err := s.consistentRead(); err != nil { return err } @@ -964,13 +1012,13 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s // whole state store is abandoned. ws.Add(state.AbandonCh()) - err := fn(ws, state) - s.setQueryMeta(queryMeta, queryOpts.GetToken()) + err := query(ws, state) + s.setQueryMeta(responseMeta, opts.GetToken()) if err != nil { return err } - if queryMeta.GetIndex() > minQueryIndex { + if responseMeta.GetIndex() > minQueryIndex { return nil } @@ -992,7 +1040,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s // setQueryMeta is used to populate the QueryMeta data for an RPC call // // Note: This method must be called *after* filtering query results with ACLs. -func (s *Server) setQueryMeta(m structs.QueryMetaCompat, token string) { +func (s *Server) setQueryMeta(m blockingQueryResponseMeta, token string) { if s.IsLeader() { m.SetLastContact(0) m.SetKnownLeader(true) @@ -1085,7 +1133,7 @@ func (s *Server) rpcQueryTimeout(queryTimeout time.Duration) time.Duration { // will only check whether it is blank or not). It's a safe assumption because // ResultsFilteredByACLs is only set to try when applying the already-resolved // token's policies. -func maskResultsFilteredByACLs(token string, meta structs.QueryMetaCompat) { +func maskResultsFilteredByACLs(token string, meta blockingQueryResponseMeta) { if token == "" { meta.SetResultsFilteredByACLs(false) }