consul/agent/leafcert/watch.go
hashicorp-copywrite[bot] 5fb9df1640
[COMPLIANCE] License changes (#18443)
* Adding explicit MPL license for sub-package

This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository.

* Adding explicit MPL license for sub-package

This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository.

* Updating the license from MPL to Business Source License

Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl.

* add missing license headers

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

---------

Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 09:12:13 -04:00

164 lines
4.6 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package leafcert
import (
"context"
"fmt"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib"
)
// Notify registers a desire to be updated about changes to a cache result.
//
// It is a helper that abstracts code from performing their own "blocking" query
// logic against a cache key to watch for changes and to maintain the key in
// cache actively. It will continue to perform blocking Get requests until the
// context is canceled.
//
// The passed context must be canceled or timeout in order to free resources
// and stop maintaining the value in cache. Typically request-scoped resources
// do this but if a long-lived context like context.Background is used, then the
// caller must arrange for it to be canceled when the watch is no longer
// needed.
//
// The passed chan may be buffered or unbuffered, if the caller doesn't consume
// fast enough it will block the notification loop. When the chan is later
// drained, watching resumes correctly. If the pause is longer than the
// cachetype's TTL, the result might be removed from the local cache. Even in
// this case though when the chan is drained again, the new Get will re-fetch
// the entry from servers and resume notification behavior transparently.
//
// The chan is passed in to allow multiple cached results to be watched by a
// single consumer without juggling extra goroutines per watch. The
// correlationID is opaque and will be returned in all UpdateEvents generated by
// result of watching the specified request so the caller can set this to any
// value that allows them to disambiguate between events in the returned chan
// when sharing a chan between multiple cache entries. If the chan is closed,
// the notify loop will terminate.
func (m *Manager) Notify(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
ch chan<- cache.UpdateEvent,
) error {
return m.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cache.UpdateEvent) {
select {
case ch <- event:
case <-ctx.Done():
}
})
}
// NotifyCallback allows you to receive notifications about changes to a cache
// result in the same way as Notify, but accepts a callback function instead of
// a channel.
func (m *Manager) NotifyCallback(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
cb cache.Callback,
) error {
if req.Key() == "" {
return fmt.Errorf("a key is required")
}
// Lightweight copy this object so that manipulating req doesn't race.
dup := *req
req = &dup
if req.MaxQueryTime <= 0 {
req.MaxQueryTime = DefaultQueryTimeout
}
go m.notifyBlockingQuery(ctx, req, correlationID, cb)
return nil
}
func (m *Manager) notifyBlockingQuery(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
cb cache.Callback,
) {
// Always start at 0 index to deliver the initial (possibly currently cached
// value).
index := uint64(0)
failures := uint(0)
for {
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Blocking request
req.MinQueryIndex = index
newValue, meta, err := m.internalGet(ctx, req)
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Check the index of the value returned in the cache entry to be sure it
// changed
if index == 0 || index < meta.Index {
cb(ctx, cache.UpdateEvent{
CorrelationID: correlationID,
Result: newValue,
Meta: meta,
Err: err,
})
// Update index for next request
index = meta.Index
}
var wait time.Duration
// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
if err == nil && meta.Index > 0 {
failures = 0
} else {
failures++
wait = backOffWait(m.config, failures)
m.logger.
With("error", err).
With("index", index).
Warn("handling error in Manager.Notify")
}
if wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
// Sanity check we always request blocking on second pass
if err == nil && index < 1 {
index = 1
}
}
}
func backOffWait(cfg Config, failures uint) time.Duration {
if failures > cfg.LeafCertRefreshBackoffMin {
shift := failures - cfg.LeafCertRefreshBackoffMin
waitTime := cfg.LeafCertRefreshMaxWait
if shift < 31 {
waitTime = (1 << shift) * time.Second
}
if waitTime > cfg.LeafCertRefreshMaxWait {
waitTime = cfg.LeafCertRefreshMaxWait
}
return waitTime + lib.RandomStagger(waitTime)
}
return 0
}