mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 14:55:02 +00:00
123 lines
3.8 KiB
Go
123 lines
3.8 KiB
Go
package cache
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// UpdateEvent is a struct summarising an update to a cache entry
|
|
type UpdateEvent struct {
|
|
// CorrelationID is used by the Notify API to allow correlation of updates
|
|
// with specific requests. We could return the full request object and
|
|
// cachetype for consumers to match against the calls they made but in
|
|
// practice it's cleaner for them to choose the minimal necessary unique
|
|
// identifier given the set of things they are watching. They might even
|
|
// choose to assign random IDs for example.
|
|
CorrelationID string
|
|
Result interface{}
|
|
Meta ResultMeta
|
|
Err error
|
|
}
|
|
|
|
// Notify registers a desire to be updated about changes to a cache result.
|
|
//
|
|
// It is a helper that abstracts code from performing their own "blocking" query
|
|
// logic against a cache key to watch for changes and to maintain the key in
|
|
// cache actively. It will continue to perform blocking Get requests until the
|
|
// context is canceled.
|
|
//
|
|
// The passed context must be cancelled or timeout in order to free resources
|
|
// and stop maintaining the value in cache. Typically request-scoped resources
|
|
// do this but if a long-lived context like context.Background is used, then the
|
|
// caller must arrange for it to be cancelled when the watch is no longer
|
|
// needed.
|
|
//
|
|
// The passed chan may be buffered or unbuffered, if the caller doesn't consume
|
|
// fast enough it will block the notification loop. When the chan is later
|
|
// drained, watching resumes correctly. If the pause is longer than the
|
|
// cachetype's TTL, the result might be removed from the local cache. Even in
|
|
// this case though when the chan is drained again, the new Get will re-fetch
|
|
// the entry from servers and resume notification behavior transparently.
|
|
//
|
|
// The chan is passed in to allow multiple cached results to be watched by a
|
|
// single consumer without juggling extra goroutines per watch. The
|
|
// correlationID is opaque and will be returned in all UpdateEvents generated by
|
|
// result of watching the specified request so the caller can set this to any
|
|
// value that allows them to disambiguate between events in the returned chan
|
|
// when sharing a chan between multiple cache entries. If the chan is closed,
|
|
// the notify loop will terminate.
|
|
func (c *Cache) Notify(ctx context.Context, t string, r Request,
|
|
correlationID string, ch chan<- UpdateEvent) error {
|
|
|
|
// Get the type that we're fetching
|
|
c.typesLock.RLock()
|
|
tEntry, ok := c.types[t]
|
|
c.typesLock.RUnlock()
|
|
if !ok {
|
|
return fmt.Errorf("unknown type in cache: %s", t)
|
|
}
|
|
if !tEntry.Type.SupportsBlocking() {
|
|
return fmt.Errorf("watch requires the type to support blocking")
|
|
}
|
|
|
|
// Always start at 0 index to deliver the inital (possibly currently cached
|
|
// value).
|
|
index := uint64(0)
|
|
|
|
go func() {
|
|
var failures uint
|
|
|
|
for {
|
|
// Check context hasn't been cancelled
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
// Blocking request
|
|
res, meta, err := c.getWithIndex(t, r, index)
|
|
|
|
// Check context hasn't been cancelled
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
// Check the index of the value returned in the cache entry to be sure it
|
|
// changed
|
|
if index < meta.Index {
|
|
u := UpdateEvent{correlationID, res, meta, err}
|
|
select {
|
|
case ch <- u:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
// Update index for next request
|
|
index = meta.Index
|
|
}
|
|
|
|
// Handle errors with backoff. Badly behaved blocking calls that returned
|
|
// a zero index are considered as failures since we need to not get stuck
|
|
// in a busy loop.
|
|
if err == nil && meta.Index > 0 {
|
|
failures = 0
|
|
} else {
|
|
failures++
|
|
}
|
|
if wait := backOffWait(failures); wait > 0 {
|
|
select {
|
|
case <-time.After(wait):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
// Sanity check we always request blocking on second pass
|
|
if index < 1 {
|
|
index = 1
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|