mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 21:35:52 +00:00
11398892b3
Previous getFromView would call view.Result when the result may not have been returned (because the index is updated past the minIndex. This would allocate a slice and sort it for no reason, because the values would never be returned. Fix this by re-ordering the operations in getFromView. The test changes in this commit were an attempt to cover the case where an update is received but the index does not exceed the minIndex.
238 lines
6.1 KiB
Go
238 lines
6.1 KiB
Go
package submatview
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
"github.com/hashicorp/consul/lib/ttlcache"
|
|
)
|
|
|
|
// Store of Materializers. Store implements an interface similar to
|
|
// agent/cache.Cache, and allows a single Materliazer to fulfill multiple requests
|
|
// as long as the requests are identical.
|
|
// Store is used in place of agent/cache.Cache because with the streaming
|
|
// backend there is no longer any need to run a background goroutine to refresh
|
|
// stored values.
|
|
type Store struct {
|
|
logger hclog.Logger
|
|
lock sync.RWMutex
|
|
byKey map[string]entry
|
|
expiryHeap *ttlcache.ExpiryHeap
|
|
|
|
// idleTTL is the duration of time an entry should remain in the Store after the
|
|
// last request for that entry has been terminated. It is a field on the struct
|
|
// so that it can be patched in tests without need a lock.
|
|
idleTTL time.Duration
|
|
}
|
|
|
|
type entry struct {
|
|
materializer *Materializer
|
|
expiry *ttlcache.Entry
|
|
stop func()
|
|
// requests is the count of active requests using this entry. This entry will
|
|
// remain in the store as long as this count remains > 0.
|
|
requests int
|
|
}
|
|
|
|
// NewStore creates and returns a Store that is ready for use. The caller must
|
|
// call Store.Run (likely in a separate goroutine) to start the expiration loop.
|
|
func NewStore(logger hclog.Logger) *Store {
|
|
return &Store{
|
|
logger: logger,
|
|
byKey: make(map[string]entry),
|
|
expiryHeap: ttlcache.NewExpiryHeap(),
|
|
idleTTL: 20 * time.Minute,
|
|
}
|
|
}
|
|
|
|
// Run the expiration loop until the context is cancelled.
|
|
func (s *Store) Run(ctx context.Context) {
|
|
for {
|
|
s.lock.RLock()
|
|
timer := s.expiryHeap.Next()
|
|
s.lock.RUnlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return
|
|
|
|
// the first item in the heap has changed, restart the timer with the
|
|
// new TTL.
|
|
case <-s.expiryHeap.NotifyCh:
|
|
timer.Stop()
|
|
continue
|
|
|
|
// the TTL for the first item has been reached, attempt an expiration.
|
|
case <-timer.Wait():
|
|
s.lock.Lock()
|
|
|
|
he := timer.Entry
|
|
s.expiryHeap.Remove(he.Index())
|
|
|
|
e := s.byKey[he.Key()]
|
|
|
|
// Only stop the materializer if there are no active requests.
|
|
if e.requests == 0 {
|
|
e.stop()
|
|
delete(s.byKey, he.Key())
|
|
}
|
|
|
|
s.lock.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Request is used to request data from the Store.
|
|
// Note that cache.Request is required, but some of the fields cache.RequestInfo
|
|
// fields are ignored (ex: MaxAge, and MustRevalidate).
|
|
type Request interface {
|
|
cache.Request
|
|
// NewMaterializer will be called if there is no active materializer to fulfil
|
|
// the request. It should return a Materializer appropriate for streaming
|
|
// data to fulfil this request.
|
|
NewMaterializer() (*Materializer, error)
|
|
// Type should return a string which uniquely identifies this type of request.
|
|
// The returned value is used as the prefix of the key used to index
|
|
// entries in the Store.
|
|
Type() string
|
|
}
|
|
|
|
// Get a value from the store, blocking if the store has not yet seen the
|
|
// req.Index value.
|
|
// See agent/cache.Cache.Get for complete documentation.
|
|
func (s *Store) Get(ctx context.Context, req Request) (Result, error) {
|
|
info := req.CacheInfo()
|
|
key, materializer, err := s.readEntry(req)
|
|
if err != nil {
|
|
return Result{}, err
|
|
}
|
|
defer s.releaseEntry(key)
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, info.Timeout)
|
|
defer cancel()
|
|
|
|
result, err := materializer.getFromView(ctx, info.MinIndex)
|
|
// context.DeadlineExceeded is translated to nil to match the behaviour of
|
|
// agent/cache.Cache.Get.
|
|
if err == nil || errors.Is(err, context.DeadlineExceeded) {
|
|
return result, nil
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
// Notify the updateCh when there are updates to the entry identified by req.
|
|
// See agent/cache.Cache.Notify for complete documentation.
|
|
//
|
|
// Request.CacheInfo().Timeout is ignored because it is not really relevant in
|
|
// this case. Instead set a deadline on the context.
|
|
func (s *Store) Notify(
|
|
ctx context.Context,
|
|
req Request,
|
|
correlationID string,
|
|
updateCh chan<- cache.UpdateEvent,
|
|
) error {
|
|
info := req.CacheInfo()
|
|
key, materializer, err := s.readEntry(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
defer s.releaseEntry(key)
|
|
|
|
index := info.MinIndex
|
|
for {
|
|
result, err := materializer.getFromView(ctx, index)
|
|
switch {
|
|
case ctx.Err() != nil:
|
|
return
|
|
case err != nil:
|
|
s.logger.Warn("handling error in Store.Notify",
|
|
"error", err,
|
|
"request-type", req.Type(),
|
|
"index", index)
|
|
continue
|
|
}
|
|
|
|
index = result.Index
|
|
u := cache.UpdateEvent{
|
|
CorrelationID: correlationID,
|
|
Result: result.Value,
|
|
Meta: cache.ResultMeta{Index: result.Index},
|
|
}
|
|
select {
|
|
case updateCh <- u:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// readEntry from the store, and increment the requests counter. releaseEntry
|
|
// must be called when the request is finished to decrement the counter.
|
|
func (s *Store) readEntry(req Request) (string, *Materializer, error) {
|
|
info := req.CacheInfo()
|
|
key := makeEntryKey(req.Type(), info)
|
|
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
e, ok := s.byKey[key]
|
|
if ok {
|
|
e.requests++
|
|
s.byKey[key] = e
|
|
return key, e.materializer, nil
|
|
}
|
|
|
|
mat, err := req.NewMaterializer()
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
go mat.Run(ctx)
|
|
|
|
e = entry{
|
|
materializer: mat,
|
|
stop: cancel,
|
|
requests: 1,
|
|
}
|
|
s.byKey[key] = e
|
|
return key, e.materializer, nil
|
|
}
|
|
|
|
// releaseEntry decrements the request count and starts an expiry timer if the
|
|
// count has reached 0. Must be called once for every call to readEntry.
|
|
func (s *Store) releaseEntry(key string) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
e := s.byKey[key]
|
|
e.requests--
|
|
s.byKey[key] = e
|
|
|
|
if e.requests > 0 {
|
|
return
|
|
}
|
|
|
|
if e.expiry.Index() == ttlcache.NotIndexed {
|
|
e.expiry = s.expiryHeap.Add(key, s.idleTTL)
|
|
s.byKey[key] = e
|
|
return
|
|
}
|
|
|
|
s.expiryHeap.Update(e.expiry.Index(), s.idleTTL)
|
|
}
|
|
|
|
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
|
|
func makeEntryKey(typ string, r cache.RequestInfo) string {
|
|
return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)
|
|
}
|