mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 13:26:07 +00:00
5fb9df1640
* Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
231 lines
6.1 KiB
Go
231 lines
6.1 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package submatview
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/consul/lib/retry"
|
|
"github.com/hashicorp/consul/proto/private/pbsubscribe"
|
|
)
|
|
|
|
// View receives events from, and return results to, Materializer. A view is
|
|
// responsible for converting the pbsubscribe.Event.Payload into the local
|
|
// type, and storing it so that it can be returned by Result().
|
|
type View interface {
|
|
// Update is called when one or more events are received. The first call will
|
|
// include _all_ events in the initial snapshot which may be an empty set.
|
|
// Subsequent calls will contain one or more update events in the order they
|
|
// are received.
|
|
Update(events []*pbsubscribe.Event) error
|
|
|
|
// Result returns the type-specific cache result based on the state. When no
|
|
// events have been delivered yet the result should be an empty value type
|
|
// suitable to return to clients in case there is an empty result on the
|
|
// servers. The index the materialized view represents is maintained
|
|
// separately and passed in in case the return type needs an Index field
|
|
// populating. This allows implementations to not worry about maintaining
|
|
// indexes seen during Update.
|
|
Result(index uint64) interface{}
|
|
|
|
// Reset the view to the zero state, done in preparation for receiving a new
|
|
// snapshot.
|
|
Reset()
|
|
}
|
|
|
|
// Result returned from the View.
|
|
type Result struct {
|
|
Index uint64
|
|
Value interface{}
|
|
// Cached is true if the requested value was already available locally. If
|
|
// the value is false, it indicates that GetFromView had to wait for an update,
|
|
Cached bool
|
|
}
|
|
|
|
type Deps struct {
|
|
View View
|
|
Logger hclog.Logger
|
|
Waiter *retry.Waiter
|
|
Request func(index uint64) *pbsubscribe.SubscribeRequest
|
|
}
|
|
|
|
// materializer consumes the event stream, handling any framing events, and
|
|
// allows for querying the materialized view.
|
|
type materializer struct {
|
|
retryWaiter *retry.Waiter
|
|
logger hclog.Logger
|
|
|
|
// lock protects the mutable state - all fields below it must only be accessed
|
|
// while holding lock.
|
|
lock sync.Mutex
|
|
index uint64
|
|
view View
|
|
updateCh chan struct{}
|
|
err error
|
|
}
|
|
|
|
func newMaterializer(logger hclog.Logger, view View, waiter *retry.Waiter) *materializer {
|
|
m := materializer{
|
|
view: view,
|
|
retryWaiter: waiter,
|
|
logger: logger,
|
|
updateCh: make(chan struct{}),
|
|
}
|
|
if m.retryWaiter == nil {
|
|
m.retryWaiter = defaultWaiter()
|
|
}
|
|
return &m
|
|
}
|
|
|
|
// Query blocks until the index of the View is greater than opts.MinIndex,
|
|
// or the context is cancelled.
|
|
func (m *materializer) query(ctx context.Context, minIndex uint64) (Result, error) {
|
|
m.lock.Lock()
|
|
|
|
result := Result{
|
|
Index: m.index,
|
|
Value: m.view.Result(m.index),
|
|
}
|
|
|
|
updateCh := m.updateCh
|
|
m.lock.Unlock()
|
|
|
|
// If our index is > req.Index return right away. If index is zero then we
|
|
// haven't loaded a snapshot at all yet which means we should wait for one on
|
|
// the update chan.
|
|
if result.Index > 0 && result.Index > minIndex {
|
|
result.Cached = true
|
|
return result, nil
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-updateCh:
|
|
// View updated, return the new result
|
|
m.lock.Lock()
|
|
result.Index = m.index
|
|
|
|
switch {
|
|
case m.err != nil:
|
|
err := m.err
|
|
m.lock.Unlock()
|
|
return result, err
|
|
case result.Index <= minIndex:
|
|
// get a reference to the new updateCh, the previous one was closed
|
|
updateCh = m.updateCh
|
|
m.lock.Unlock()
|
|
continue
|
|
}
|
|
|
|
result.Value = m.view.Result(m.index)
|
|
m.lock.Unlock()
|
|
return result, nil
|
|
|
|
case <-ctx.Done():
|
|
// Update the result value to the latest because callers may still
|
|
// use the value when the error is context.DeadlineExceeded
|
|
m.lock.Lock()
|
|
result.Value = m.view.Result(m.index)
|
|
m.lock.Unlock()
|
|
return result, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *materializer) currentIndex() uint64 {
|
|
var resp uint64
|
|
|
|
m.lock.Lock()
|
|
resp = m.index
|
|
m.lock.Unlock()
|
|
|
|
return resp
|
|
}
|
|
|
|
// notifyUpdateLocked closes the current update channel and recreates a new
|
|
// one. It must be called while holding the m.lock lock.
|
|
func (m *materializer) notifyUpdateLocked(err error) {
|
|
m.err = err
|
|
close(m.updateCh)
|
|
m.updateCh = make(chan struct{})
|
|
}
|
|
|
|
// reset clears the state ready to start a new stream from scratch.
|
|
func (m *materializer) reset() {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
m.view.Reset()
|
|
m.index = 0
|
|
}
|
|
|
|
// updateView updates the view from a sequence of events and stores
|
|
// the corresponding Raft index.
|
|
func (m *materializer) updateView(events []*pbsubscribe.Event, index uint64) error {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
if err := m.view.Update(events); err != nil {
|
|
return err
|
|
}
|
|
|
|
m.index = index
|
|
m.notifyUpdateLocked(nil)
|
|
m.retryWaiter.Reset()
|
|
return nil
|
|
}
|
|
|
|
func (m *materializer) handleError(req *pbsubscribe.SubscribeRequest, err error) {
|
|
failures := m.retryWaiter.Failures()
|
|
if isNonTemporaryOrConsecutiveFailure(err, failures) {
|
|
m.lock.Lock()
|
|
m.notifyUpdateLocked(err)
|
|
m.lock.Unlock()
|
|
}
|
|
|
|
logger := m.logger.With(
|
|
"err", err,
|
|
"topic", req.Topic,
|
|
"failure_count", failures+1,
|
|
)
|
|
|
|
if req.GetWildcardSubject() {
|
|
logger = logger.With("wildcard_subject", true)
|
|
} else if sub := req.GetNamedSubject(); sub != nil {
|
|
logger = logger.With("key", sub.Key)
|
|
} else {
|
|
logger = logger.With("key", req.Key) // nolint:staticcheck // SA1019 intentional use of deprecated field
|
|
}
|
|
|
|
logger.Error("subscribe call failed")
|
|
}
|
|
|
|
// isNonTemporaryOrConsecutiveFailure returns true if the error is not a
|
|
// temporary error or if failures > 0.
|
|
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool {
|
|
// temporary is an interface used by net and other std lib packages to
|
|
// show error types represent temporary/recoverable errors.
|
|
temp, ok := err.(interface {
|
|
Temporary() bool
|
|
})
|
|
return !ok || !temp.Temporary() || failures > 0
|
|
}
|
|
|
|
func defaultWaiter() *retry.Waiter {
|
|
return &retry.Waiter{
|
|
MinFailures: 1,
|
|
// Start backing off with small increments (200-400ms) which will double
|
|
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
|
|
// after that). (retry.Wait applies Max limit after jitter right now).
|
|
Factor: 200 * time.Millisecond,
|
|
MinWait: 0,
|
|
MaxWait: 60 * time.Second,
|
|
Jitter: retry.NewJitter(100),
|
|
}
|
|
}
|