consul/agent/submatview/materializer.go
Daniel Nephin 9423c887c4 submatview: do not reset retry waiter when materializer is reset
The materializer is often reset when an error is received. By resetting
the retryWaiter we effectively never wait. The retryWaiter should only
be reset when we get an event without error. This is done in
Materializer.updateView().
2021-02-09 13:56:50 -05:00

285 lines
8.0 KiB
Go

package submatview
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// View receives events from, and return results to, Materializer. A view is
// responsible for converting the pbsubscribe.Event.Payload into the local
// type, and storing it so that it can be returned by Result().
type View interface {
// Update is called when one or more events are received. The first call will
// include _all_ events in the initial snapshot which may be an empty set.
// Subsequent calls will contain one or more update events in the order they
// are received.
Update(events []*pbsubscribe.Event) error
// Result returns the type-specific cache result based on the state. When no
// events have been delivered yet the result should be an empty value type
// suitable to return to clients in case there is an empty result on the
// servers. The index the materialized view represents is maintained
// separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining
// indexes seen during Update.
Result(index uint64) (interface{}, error)
// Reset the view to the zero state, done in preparation for receiving a new
// snapshot.
Reset()
}
// Materializer consumes the event stream, handling any framing events, and
// sends the events to View as they are received.
//
// Materializer is used as the cache.Result.State for a streaming
// cache type and manages the actual streaming RPC call to the servers behind
// the scenes until the cache result is discarded when TTL expires.
type Materializer struct {
deps Deps
retryWaiter *retry.Waiter
handler eventHandler
// lock protects the mutable state - all fields below it must only be accessed
// while holding lock.
lock sync.Mutex
index uint64
view View
updateCh chan struct{}
err error
}
type Deps struct {
View View
Client StreamClient
Logger hclog.Logger
Waiter *retry.Waiter
Request func(index uint64) pbsubscribe.SubscribeRequest
}
// StreamClient provides a subscription to state change events.
type StreamClient interface {
Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
}
// NewMaterializer returns a new Materializer. Run must be called to start it.
func NewMaterializer(deps Deps) *Materializer {
v := &Materializer{
deps: deps,
view: deps.View,
retryWaiter: deps.Waiter,
updateCh: make(chan struct{}),
}
return v
}
// Run receives events from the StreamClient and sends them to the View. It runs
// until ctx is cancelled, so it is expected to be run in a goroutine.
func (m *Materializer) Run(ctx context.Context) {
for {
req := m.deps.Request(m.index)
err := m.runSubscription(ctx, req)
if ctx.Err() != nil {
return
}
failures := m.retryWaiter.Failures()
if isNonTemporaryOrConsecutiveFailure(err, failures) {
m.lock.Lock()
m.notifyUpdateLocked(err)
m.lock.Unlock()
}
m.deps.Logger.Error("subscribe call failed",
"err", err,
"topic", req.Topic,
"key", req.Key,
"failure_count", failures+1)
if err := m.retryWaiter.Wait(ctx); err != nil {
return
}
}
}
// isNonTemporaryOrConsecutiveFailure returns true if the error is not a
// temporary error or if failures > 0.
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool {
// temporary is an interface used by net and other std lib packages to
// show error types represent temporary/recoverable errors.
temp, ok := err.(interface {
Temporary() bool
})
return !ok || !temp.Temporary() || failures > 0
}
// runSubscription opens a new subscribe streaming call to the servers and runs
// for it's lifetime or until the view is closed.
func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.SubscribeRequest) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
m.handler = initialHandler(req.Index)
s, err := m.deps.Client.Subscribe(ctx, &req)
if err != nil {
return err
}
for {
event, err := s.Recv()
switch {
case isGrpcStatus(err, codes.Aborted):
m.reset()
return resetErr("stream reset requested")
case err != nil:
return err
}
m.handler, err = m.handler(m, event)
if err != nil {
m.reset()
return err
}
}
}
func isGrpcStatus(err error, code codes.Code) bool {
s, ok := status.FromError(err)
return ok && s.Code() == code
}
// resetErr represents a server request to reset the subscription, it's typed so
// we can mark it as temporary and so attempt to retry first time without
// notifying clients.
type resetErr string
// Temporary Implements the internal Temporary interface
func (e resetErr) Temporary() bool {
return true
}
// Error implements error
func (e resetErr) Error() string {
return string(e)
}
// reset clears the state ready to start a new stream from scratch.
func (m *Materializer) reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.view.Reset()
m.index = 0
}
func (m *Materializer) updateView(events []*pbsubscribe.Event, index uint64) error {
m.lock.Lock()
defer m.lock.Unlock()
if err := m.view.Update(events); err != nil {
return err
}
m.index = index
m.notifyUpdateLocked(nil)
m.retryWaiter.Reset()
return nil
}
// notifyUpdateLocked closes the current update channel and recreates a new
// one. It must be called while holding the s.lock lock.
func (m *Materializer) notifyUpdateLocked(err error) {
m.err = err
close(m.updateCh)
m.updateCh = make(chan struct{})
}
// Fetch implements the logic a StreamingCacheType will need during it's Fetch
// call. Cache types that use streaming should just be able to proxy to this
// once they have a subscription object and return it's results directly.
func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) {
var result cache.FetchResult
// Get current view Result and index
m.lock.Lock()
index := m.index
val, err := m.view.Result(m.index)
updateCh := m.updateCh
m.lock.Unlock()
if err != nil {
return result, err
}
result.Index = index
result.Value = val
// If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan. Note it's opts.MinIndex that the cache is using here the
// request min index might be different and from initial user request.
if index > 0 && index > opts.MinIndex {
return result, nil
}
// Watch for timeout of the Fetch. Note it's opts.Timeout not req.Timeout
// since that is the timeout the client requested from the cache Get while the
// options one is the internal "background refresh" timeout which is what the
// Fetch call should be using.
timeoutCh := time.After(opts.Timeout)
for {
select {
case <-updateCh:
// View updated, return the new result
m.lock.Lock()
result.Index = m.index
// Grab the new updateCh in case we need to keep waiting for the next
// update.
updateCh = m.updateCh
fetchErr := m.err
if fetchErr == nil {
// Only generate a new result if there was no error to avoid pointless
// work potentially shuffling the same data around.
result.Value, err = m.view.Result(m.index)
}
m.lock.Unlock()
// If there was a non-transient error return it
if fetchErr != nil {
return result, fetchErr
}
if err != nil {
return result, err
}
// Sanity check the update is actually later than the one the user
// requested.
if result.Index <= opts.MinIndex {
// The result is still older/same as the requested index, continue to
// wait for further updates.
continue
}
// Return the updated result
return result, nil
case <-timeoutCh:
// Just return whatever we got originally, might still be empty
return result, nil
case <-done:
return result, context.Canceled
}
}
}