submatview: reduce the getFromView implementation

Remove View.Result error return value, it was always nil, and seems like it will likely always remain nill
since it is simply reading a stored value.

Also replace some cache types with local types.
This commit is contained in:
Daniel Nephin 2021-02-23 14:27:24 -05:00
parent 1d9d7d0aa5
commit 832f1a2847
4 changed files with 41 additions and 60 deletions

View File

@ -139,7 +139,7 @@ func (s *streamingHealthState) Close() error {
}
func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
result, err := s.materializer.Fetch(s.done, opts)
result, err := s.materializer.getFromView(s.done, opts)
result.State = s
return result, err
}
@ -274,7 +274,7 @@ func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) {
}
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
func (s *healthView) Result(index uint64) (interface{}, error) {
func (s *healthView) Result(index uint64) interface{} {
result := structs.IndexedCheckServiceNodes{
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
QueryMeta: structs.QueryMeta{
@ -286,7 +286,7 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
}
sortCheckServiceNodes(&result)
return &result, nil
return &result
}
func (s *healthView) Reset() {

View File

@ -10,7 +10,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
@ -32,8 +31,7 @@ type View interface {
// separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining
// indexes seen during Update.
// TODO: remove error return value.
Result(index uint64) (interface{}, error)
Result(index uint64) interface{}
// Reset the view to the zero state, done in preparation for receiving a new
// snapshot.
@ -217,80 +215,59 @@ func (m *Materializer) notifyUpdateLocked(err error) {
m.updateCh = make(chan struct{})
}
// Fetch the value stored in the View. Fetch blocks until the index of the View
// is greater than opts.MinIndex, or the context is cancelled.
func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) {
var result cache.FetchResult
type viewResult struct {
Index uint64
Value interface{}
}
// Get current view Result and index
// getFromView blocks until the index of the View is greater than opts.MinIndex,
//or the context is cancelled.
func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (viewResult, error) {
m.lock.Lock()
index := m.index
val, err := m.view.Result(m.index)
updateCh := m.updateCh
m.lock.Unlock()
if err != nil {
return result, err
result := viewResult{
Index: m.index,
Value: m.view.Result(m.index),
}
result.Index = index
result.Value = val
updateCh := m.updateCh
m.lock.Unlock()
// If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan. Note it's opts.MinIndex that the cache is using here the
// request min index might be different and from initial user request.
if index > 0 && index > opts.MinIndex {
if result.Index > 0 && result.Index > minIndex {
return result, nil
}
// Watch for timeout of the Fetch. Note it's opts.Timeout not req.Timeout
// since that is the timeout the client requested from the cache Get while the
// options one is the internal "background refresh" timeout which is what the
// Fetch call should be using.
timeoutCh := time.After(opts.Timeout)
for {
select {
case <-updateCh:
// View updated, return the new result
m.lock.Lock()
result.Index = m.index
// Grab the new updateCh in case we need to keep waiting for the next
// update.
updateCh = m.updateCh
fetchErr := m.err
if fetchErr == nil {
// Only generate a new result if there was no error to avoid pointless
// work potentially shuffling the same data around.
result.Value, err = m.view.Result(m.index)
if m.err != nil {
m.lock.Unlock()
return result, m.err
}
result.Value = m.view.Result(m.index)
// Grab the new updateCh in case we need to keep waiting for the next update.
updateCh = m.updateCh
m.lock.Unlock()
// If there was a non-transient error return it
if fetchErr != nil {
return result, fetchErr
}
if err != nil {
return result, err
}
// Sanity check the update is actually later than the one the user
// requested.
if result.Index <= opts.MinIndex {
if result.Index <= minIndex {
// The result is still older/same as the requested index, continue to
// wait for further updates.
continue
}
// Return the updated result
return result, nil
case <-timeoutCh:
// Just return whatever we got originally, might still be empty
return result, nil
case <-done:
return result, context.Canceled
case <-ctx.Done():
return result, ctx.Err()
}
}
}

View File

@ -86,18 +86,22 @@ func (s *Store) Get(
key, e := s.getEntry(req)
defer s.releaseEntry(key)
// TODO: no longer any need to return cache.FetchResult from Materializer.Fetch
// TODO: pass context instead of Done chan, also replaces Timeout param
result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{
MinIndex: info.MinIndex,
Timeout: info.Timeout,
})
ctx, cancel := context.WithTimeout(ctx, info.Timeout)
defer cancel()
result, err := e.materializer.getFromView(ctx, info.MinIndex)
// TODO: does context.DeadlineExceeded need to be translated into a nil error
// to match the old interface?
return result.Value, cache.ResultMeta{Index: result.Index}, err
}
// Notify the updateCh when there are updates to the entry identified by req.
// See agent/cache.Cache.Notify for complete documentation.
//
// Request.CacheInfo().Timeout is ignored because it is not really relevant in
// this case. Instead set a deadline on the context.
func (s *Store) Notify(
ctx context.Context,
req Request,
@ -112,7 +116,7 @@ func (s *Store) Notify(
index := info.MinIndex
for {
result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index})
result, err := e.materializer.getFromView(ctx, index)
switch {
case ctx.Err() != nil:
return

View File

@ -108,12 +108,12 @@ func (f *fakeView) Update(events []*pbsubscribe.Event) error {
return nil
}
func (f *fakeView) Result(index uint64) (interface{}, error) {
func (f *fakeView) Result(index uint64) interface{} {
srvs := make([]*pbservice.CheckServiceNode, 0, len(f.srvs))
for _, srv := range f.srvs {
srvs = append(srvs, srv)
}
return fakeResult{srvs: srvs, index: index}, nil
return fakeResult{srvs: srvs, index: index}
}
type fakeResult struct {