mirror of
https://github.com/status-im/consul.git
synced 2025-01-23 20:19:29 +00:00
13c04a13af
This is the OSS portion of enterprise PR 2339. It improves our handling of "irrecoverable" errors in proxycfg data sources. The canonical example of this is what happens when the ACL token presented by Envoy is deleted/revoked. Previously, the stream would get "stuck" until the xDS server re-checked the token (after 5 minutes) and terminated the stream. Materializers would also sit burning resources retrying something that could never succeed. Now, it is possible for data sources to mark errors as "terminal" which causes the xDS stream to be closed immediately. Similarly, the submatview.Store will evict materializers when it observes they have encountered such an error.
148 lines
3.6 KiB
Go
148 lines
3.6 KiB
Go
package submatview
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/acl/resolver"
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
|
"github.com/hashicorp/consul/lib/retry"
|
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
|
)
|
|
|
|
// LocalMaterializer is a materializer for a stream of events
|
|
// and manages the local subscription to the event publisher
|
|
// until the cache result is discarded when its TTL expires.
|
|
type LocalMaterializer struct {
|
|
deps LocalMaterializerDeps
|
|
retryWaiter *retry.Waiter
|
|
handler eventHandler
|
|
|
|
mat *materializer
|
|
}
|
|
|
|
type LocalMaterializerDeps struct {
|
|
Deps
|
|
|
|
Backend LocalBackend
|
|
ACLResolver ACLResolver
|
|
}
|
|
|
|
var _ Materializer = (*LocalMaterializer)(nil)
|
|
|
|
type LocalBackend interface {
|
|
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
|
|
}
|
|
|
|
//go:generate mockery --name ACLResolver --inpackage
|
|
type ACLResolver interface {
|
|
ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzContext *acl.AuthorizerContext) (resolver.Result, error)
|
|
}
|
|
|
|
func NewLocalMaterializer(deps LocalMaterializerDeps) *LocalMaterializer {
|
|
m := LocalMaterializer{
|
|
deps: deps,
|
|
mat: newMaterializer(deps.Logger, deps.View, deps.Waiter),
|
|
}
|
|
return &m
|
|
}
|
|
|
|
// Query implements Materializer
|
|
func (m *LocalMaterializer) Query(ctx context.Context, minIndex uint64) (Result, error) {
|
|
return m.mat.query(ctx, minIndex)
|
|
}
|
|
|
|
// Run receives events from a local subscription backend and sends them to the View.
|
|
// It runs until ctx is cancelled, so it is expected to be run in a goroutine.
|
|
// Mirrors implementation of RPCMaterializer.
|
|
//
|
|
// Run implements Materializer
|
|
func (m *LocalMaterializer) Run(ctx context.Context) {
|
|
for {
|
|
req := m.deps.Request(m.mat.currentIndex())
|
|
err := m.subscribeOnce(ctx, req)
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
if m.isTerminalError(err) {
|
|
return
|
|
}
|
|
|
|
m.mat.handleError(req, err)
|
|
|
|
if err := m.mat.retryWaiter.Wait(ctx); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// isTerminalError determines whether the given error cannot be recovered from
|
|
// and should cause the materializer to halt and be evicted from the view store.
|
|
//
|
|
// This roughly matches the logic in agent/proxycfg-glue.newUpdateEvent.
|
|
func (m *LocalMaterializer) isTerminalError(err error) bool {
|
|
return acl.IsErrNotFound(err)
|
|
}
|
|
|
|
// subscribeOnce opens a new subscription to a local backend and runs
|
|
// for its lifetime or until the view is closed.
|
|
func (m *LocalMaterializer) subscribeOnce(ctx context.Context, req *pbsubscribe.SubscribeRequest) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
m.handler = initialHandler(req.Index)
|
|
|
|
entMeta := req.EnterpriseMeta()
|
|
authz, err := m.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
subReq, err := state.PBToStreamSubscribeRequest(req, entMeta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sub, err := m.deps.Backend.Subscribe(subReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer sub.Unsubscribe()
|
|
|
|
for {
|
|
event, err := sub.Next(ctx)
|
|
switch {
|
|
case errors.Is(err, stream.ErrSubForceClosed):
|
|
m.deps.Logger.Trace("subscription reset by server")
|
|
return err
|
|
|
|
case err != nil:
|
|
return err
|
|
}
|
|
|
|
if !event.Payload.HasReadPermission(authz) {
|
|
continue
|
|
}
|
|
|
|
e := event.Payload.ToSubscriptionEvent(event.Index)
|
|
|
|
m.handler, err = m.handler(m, e)
|
|
if err != nil {
|
|
m.mat.reset()
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateView implements viewState
|
|
func (m *LocalMaterializer) updateView(events []*pbsubscribe.Event, index uint64) error {
|
|
return m.mat.updateView(events, index)
|
|
}
|
|
|
|
// reset implements viewState
|
|
func (m *LocalMaterializer) reset() {
|
|
m.mat.reset()
|
|
}
|