mirror of https://github.com/status-im/consul.git
161 lines
4.5 KiB
Go
161 lines
4.5 KiB
Go
|
package leafcert
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"github.com/hashicorp/consul/agent/cache"
|
||
|
"github.com/hashicorp/consul/lib"
|
||
|
)
|
||
|
|
||
|
// Notify registers a desire to be updated about changes to a cache result.
|
||
|
//
|
||
|
// It is a helper that abstracts code from performing their own "blocking" query
|
||
|
// logic against a cache key to watch for changes and to maintain the key in
|
||
|
// cache actively. It will continue to perform blocking Get requests until the
|
||
|
// context is canceled.
|
||
|
//
|
||
|
// The passed context must be canceled or timeout in order to free resources
|
||
|
// and stop maintaining the value in cache. Typically request-scoped resources
|
||
|
// do this but if a long-lived context like context.Background is used, then the
|
||
|
// caller must arrange for it to be canceled when the watch is no longer
|
||
|
// needed.
|
||
|
//
|
||
|
// The passed chan may be buffered or unbuffered, if the caller doesn't consume
|
||
|
// fast enough it will block the notification loop. When the chan is later
|
||
|
// drained, watching resumes correctly. If the pause is longer than the
|
||
|
// cachetype's TTL, the result might be removed from the local cache. Even in
|
||
|
// this case though when the chan is drained again, the new Get will re-fetch
|
||
|
// the entry from servers and resume notification behavior transparently.
|
||
|
//
|
||
|
// The chan is passed in to allow multiple cached results to be watched by a
|
||
|
// single consumer without juggling extra goroutines per watch. The
|
||
|
// correlationID is opaque and will be returned in all UpdateEvents generated by
|
||
|
// result of watching the specified request so the caller can set this to any
|
||
|
// value that allows them to disambiguate between events in the returned chan
|
||
|
// when sharing a chan between multiple cache entries. If the chan is closed,
|
||
|
// the notify loop will terminate.
|
||
|
func (m *Manager) Notify(
|
||
|
ctx context.Context,
|
||
|
req *ConnectCALeafRequest,
|
||
|
correlationID string,
|
||
|
ch chan<- cache.UpdateEvent,
|
||
|
) error {
|
||
|
return m.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cache.UpdateEvent) {
|
||
|
select {
|
||
|
case ch <- event:
|
||
|
case <-ctx.Done():
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// NotifyCallback allows you to receive notifications about changes to a cache
|
||
|
// result in the same way as Notify, but accepts a callback function instead of
|
||
|
// a channel.
|
||
|
func (m *Manager) NotifyCallback(
|
||
|
ctx context.Context,
|
||
|
req *ConnectCALeafRequest,
|
||
|
correlationID string,
|
||
|
cb cache.Callback,
|
||
|
) error {
|
||
|
if req.Key() == "" {
|
||
|
return fmt.Errorf("a key is required")
|
||
|
}
|
||
|
// Lightweight copy this object so that manipulating req doesn't race.
|
||
|
dup := *req
|
||
|
req = &dup
|
||
|
|
||
|
if req.MaxQueryTime <= 0 {
|
||
|
req.MaxQueryTime = DefaultQueryTimeout
|
||
|
}
|
||
|
|
||
|
go m.notifyBlockingQuery(ctx, req, correlationID, cb)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (m *Manager) notifyBlockingQuery(
|
||
|
ctx context.Context,
|
||
|
req *ConnectCALeafRequest,
|
||
|
correlationID string,
|
||
|
cb cache.Callback,
|
||
|
) {
|
||
|
// Always start at 0 index to deliver the initial (possibly currently cached
|
||
|
// value).
|
||
|
index := uint64(0)
|
||
|
failures := uint(0)
|
||
|
|
||
|
for {
|
||
|
// Check context hasn't been canceled
|
||
|
if ctx.Err() != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Blocking request
|
||
|
req.MinQueryIndex = index
|
||
|
newValue, meta, err := m.internalGet(ctx, req)
|
||
|
|
||
|
// Check context hasn't been canceled
|
||
|
if ctx.Err() != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Check the index of the value returned in the cache entry to be sure it
|
||
|
// changed
|
||
|
if index == 0 || index < meta.Index {
|
||
|
cb(ctx, cache.UpdateEvent{
|
||
|
CorrelationID: correlationID,
|
||
|
Result: newValue,
|
||
|
Meta: meta,
|
||
|
Err: err,
|
||
|
})
|
||
|
|
||
|
// Update index for next request
|
||
|
index = meta.Index
|
||
|
}
|
||
|
|
||
|
var wait time.Duration
|
||
|
// Handle errors with backoff. Badly behaved blocking calls that returned
|
||
|
// a zero index are considered as failures since we need to not get stuck
|
||
|
// in a busy loop.
|
||
|
if err == nil && meta.Index > 0 {
|
||
|
failures = 0
|
||
|
} else {
|
||
|
failures++
|
||
|
wait = backOffWait(m.config, failures)
|
||
|
|
||
|
m.logger.
|
||
|
With("error", err).
|
||
|
With("index", index).
|
||
|
Warn("handling error in Manager.Notify")
|
||
|
}
|
||
|
|
||
|
if wait > 0 {
|
||
|
select {
|
||
|
case <-time.After(wait):
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
// Sanity check we always request blocking on second pass
|
||
|
if err == nil && index < 1 {
|
||
|
index = 1
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func backOffWait(cfg Config, failures uint) time.Duration {
|
||
|
if failures > cfg.LeafCertRefreshBackoffMin {
|
||
|
shift := failures - cfg.LeafCertRefreshBackoffMin
|
||
|
waitTime := cfg.LeafCertRefreshMaxWait
|
||
|
if shift < 31 {
|
||
|
waitTime = (1 << shift) * time.Second
|
||
|
}
|
||
|
if waitTime > cfg.LeafCertRefreshMaxWait {
|
||
|
waitTime = cfg.LeafCertRefreshMaxWait
|
||
|
}
|
||
|
return waitTime + lib.RandomStagger(waitTime)
|
||
|
}
|
||
|
return 0
|
||
|
}
|