consul/agent/submatview/materializer.go

231 lines
6.1 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package submatview
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
)
// View receives events from, and return results to, Materializer. A view is
// responsible for converting the pbsubscribe.Event.Payload into the local
// type, and storing it so that it can be returned by Result().
type View interface {
// Update is called when one or more events are received. The first call will
// include _all_ events in the initial snapshot which may be an empty set.
// Subsequent calls will contain one or more update events in the order they
// are received.
Update(events []*pbsubscribe.Event) error
// Result returns the type-specific cache result based on the state. When no
// events have been delivered yet the result should be an empty value type
// suitable to return to clients in case there is an empty result on the
// servers. The index the materialized view represents is maintained
// separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining
// indexes seen during Update.
Result(index uint64) interface{}
// Reset the view to the zero state, done in preparation for receiving a new
// snapshot.
Reset()
}
// Result returned from the View.
type Result struct {
Index uint64
Value interface{}
// Cached is true if the requested value was already available locally. If
// the value is false, it indicates that GetFromView had to wait for an update,
Cached bool
}
type Deps struct {
View View
Logger hclog.Logger
Waiter *retry.Waiter
Request func(index uint64) *pbsubscribe.SubscribeRequest
}
// materializer consumes the event stream, handling any framing events, and
// allows for querying the materialized view.
type materializer struct {
retryWaiter *retry.Waiter
logger hclog.Logger
// lock protects the mutable state - all fields below it must only be accessed
// while holding lock.
lock sync.Mutex
index uint64
view View
updateCh chan struct{}
err error
}
func newMaterializer(logger hclog.Logger, view View, waiter *retry.Waiter) *materializer {
m := materializer{
view: view,
retryWaiter: waiter,
logger: logger,
updateCh: make(chan struct{}),
}
if m.retryWaiter == nil {
m.retryWaiter = defaultWaiter()
}
return &m
}
// Query blocks until the index of the View is greater than opts.MinIndex,
// or the context is cancelled.
func (m *materializer) query(ctx context.Context, minIndex uint64) (Result, error) {
m.lock.Lock()
result := Result{
Index: m.index,
Value: m.view.Result(m.index),
}
updateCh := m.updateCh
m.lock.Unlock()
// If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan.
if result.Index > 0 && result.Index > minIndex {
result.Cached = true
return result, nil
}
for {
select {
case <-updateCh:
// View updated, return the new result
m.lock.Lock()
result.Index = m.index
switch {
case m.err != nil:
err := m.err
m.lock.Unlock()
return result, err
case result.Index <= minIndex:
// get a reference to the new updateCh, the previous one was closed
updateCh = m.updateCh
m.lock.Unlock()
continue
}
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, nil
case <-ctx.Done():
// Update the result value to the latest because callers may still
// use the value when the error is context.DeadlineExceeded
m.lock.Lock()
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, ctx.Err()
}
}
}
func (m *materializer) currentIndex() uint64 {
var resp uint64
m.lock.Lock()
resp = m.index
m.lock.Unlock()
return resp
}
// notifyUpdateLocked closes the current update channel and recreates a new
// one. It must be called while holding the m.lock lock.
func (m *materializer) notifyUpdateLocked(err error) {
m.err = err
close(m.updateCh)
m.updateCh = make(chan struct{})
}
// reset clears the state ready to start a new stream from scratch.
func (m *materializer) reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.view.Reset()
m.index = 0
}
// updateView updates the view from a sequence of events and stores
// the corresponding Raft index.
func (m *materializer) updateView(events []*pbsubscribe.Event, index uint64) error {
m.lock.Lock()
defer m.lock.Unlock()
if err := m.view.Update(events); err != nil {
return err
}
m.index = index
m.notifyUpdateLocked(nil)
m.retryWaiter.Reset()
return nil
}
func (m *materializer) handleError(req *pbsubscribe.SubscribeRequest, err error) {
failures := m.retryWaiter.Failures()
if isNonTemporaryOrConsecutiveFailure(err, failures) {
m.lock.Lock()
m.notifyUpdateLocked(err)
m.lock.Unlock()
}
logger := m.logger.With(
"err", err,
"topic", req.Topic,
"failure_count", failures+1,
)
if req.GetWildcardSubject() {
logger = logger.With("wildcard_subject", true)
} else if sub := req.GetNamedSubject(); sub != nil {
logger = logger.With("key", sub.Key)
} else {
logger = logger.With("key", req.Key) // nolint:staticcheck // SA1019 intentional use of deprecated field
}
logger.Error("subscribe call failed")
}
// isNonTemporaryOrConsecutiveFailure returns true if the error is not a
// temporary error or if failures > 0.
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool {
// temporary is an interface used by net and other std lib packages to
// show error types represent temporary/recoverable errors.
temp, ok := err.(interface {
Temporary() bool
})
return !ok || !temp.Temporary() || failures > 0
}
func defaultWaiter() *retry.Waiter {
return &retry.Waiter{
MinFailures: 1,
// Start backing off with small increments (200-400ms) which will double
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
// after that). (retry.Wait applies Max limit after jitter right now).
Factor: 200 * time.Millisecond,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
}
}