consul/agent/submatview/materializer.go
Daniel Upton 653b8c4f9d proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.

This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.

It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.

It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.

Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.

Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-04 10:48:36 +01:00

228 lines
6.1 KiB
Go

package submatview
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// View receives events from, and return results to, Materializer. A view is
// responsible for converting the pbsubscribe.Event.Payload into the local
// type, and storing it so that it can be returned by Result().
type View interface {
// Update is called when one or more events are received. The first call will
// include _all_ events in the initial snapshot which may be an empty set.
// Subsequent calls will contain one or more update events in the order they
// are received.
Update(events []*pbsubscribe.Event) error
// Result returns the type-specific cache result based on the state. When no
// events have been delivered yet the result should be an empty value type
// suitable to return to clients in case there is an empty result on the
// servers. The index the materialized view represents is maintained
// separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining
// indexes seen during Update.
Result(index uint64) interface{}
// Reset the view to the zero state, done in preparation for receiving a new
// snapshot.
Reset()
}
// Result returned from the View.
type Result struct {
Index uint64
Value interface{}
// Cached is true if the requested value was already available locally. If
// the value is false, it indicates that GetFromView had to wait for an update,
Cached bool
}
type Deps struct {
View View
Logger hclog.Logger
Waiter *retry.Waiter
Request func(index uint64) *pbsubscribe.SubscribeRequest
}
// materializer consumes the event stream, handling any framing events, and
// allows for querying the materialized view.
type materializer struct {
retryWaiter *retry.Waiter
logger hclog.Logger
// lock protects the mutable state - all fields below it must only be accessed
// while holding lock.
lock sync.Mutex
index uint64
view View
updateCh chan struct{}
err error
}
func newMaterializer(logger hclog.Logger, view View, waiter *retry.Waiter) *materializer {
m := materializer{
view: view,
retryWaiter: waiter,
logger: logger,
updateCh: make(chan struct{}),
}
if m.retryWaiter == nil {
m.retryWaiter = defaultWaiter()
}
return &m
}
// Query blocks until the index of the View is greater than opts.MinIndex,
// or the context is cancelled.
func (m *materializer) query(ctx context.Context, minIndex uint64) (Result, error) {
m.lock.Lock()
result := Result{
Index: m.index,
Value: m.view.Result(m.index),
}
updateCh := m.updateCh
m.lock.Unlock()
// If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan.
if result.Index > 0 && result.Index > minIndex {
result.Cached = true
return result, nil
}
for {
select {
case <-updateCh:
// View updated, return the new result
m.lock.Lock()
result.Index = m.index
switch {
case m.err != nil:
err := m.err
m.lock.Unlock()
return result, err
case result.Index <= minIndex:
// get a reference to the new updateCh, the previous one was closed
updateCh = m.updateCh
m.lock.Unlock()
continue
}
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, nil
case <-ctx.Done():
// Update the result value to the latest because callers may still
// use the value when the error is context.DeadlineExceeded
m.lock.Lock()
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, ctx.Err()
}
}
}
func (m *materializer) currentIndex() uint64 {
var resp uint64
m.lock.Lock()
resp = m.index
m.lock.Unlock()
return resp
}
// notifyUpdateLocked closes the current update channel and recreates a new
// one. It must be called while holding the m.lock lock.
func (m *materializer) notifyUpdateLocked(err error) {
m.err = err
close(m.updateCh)
m.updateCh = make(chan struct{})
}
// reset clears the state ready to start a new stream from scratch.
func (m *materializer) reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.view.Reset()
m.index = 0
}
// updateView updates the view from a sequence of events and stores
// the corresponding Raft index.
func (m *materializer) updateView(events []*pbsubscribe.Event, index uint64) error {
m.lock.Lock()
defer m.lock.Unlock()
if err := m.view.Update(events); err != nil {
return err
}
m.index = index
m.notifyUpdateLocked(nil)
m.retryWaiter.Reset()
return nil
}
func (m *materializer) handleError(req *pbsubscribe.SubscribeRequest, err error) {
failures := m.retryWaiter.Failures()
if isNonTemporaryOrConsecutiveFailure(err, failures) {
m.lock.Lock()
m.notifyUpdateLocked(err)
m.lock.Unlock()
}
logger := m.logger.With(
"err", err,
"topic", req.Topic,
"failure_count", failures+1,
)
if req.GetWildcardSubject() {
logger = logger.With("wildcard_subject", true)
} else if sub := req.GetNamedSubject(); sub != nil {
logger = logger.With("key", sub.Key)
} else {
logger = logger.With("key", req.Key) // nolint:staticcheck // SA1019 intentional use of deprecated field
}
logger.Error("subscribe call failed")
}
// isNonTemporaryOrConsecutiveFailure returns true if the error is not a
// temporary error or if failures > 0.
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool {
// temporary is an interface used by net and other std lib packages to
// show error types represent temporary/recoverable errors.
temp, ok := err.(interface {
Temporary() bool
})
return !ok || !temp.Temporary() || failures > 0
}
func defaultWaiter() *retry.Waiter {
return &retry.Waiter{
MinFailures: 1,
// Start backing off with small increments (200-400ms) which will double
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
// after that). (retry.Wait applies Max limit after jitter right now).
Factor: 200 * time.Millisecond,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
}
}