submatview: track requests instead of notifiers

And only start expiration time when the last request ends. This makes tracking expiry simpler, and
ensures that no entry can be expired while there are active requests.
This commit is contained in:
Daniel Nephin 2021-02-23 12:18:44 -05:00
parent 7d13465c7b
commit 1d9d7d0aa5
3 changed files with 60 additions and 59 deletions

View File

@ -32,6 +32,7 @@ type View interface {
// separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining
// indexes seen during Update.
// TODO: remove error return value.
Result(index uint64) (interface{}, error)
// Reset the view to the zero state, done in preparation for receiving a new

View File

@ -20,9 +20,9 @@ type entry struct {
materializer *Materializer
expiry *ttlcache.Entry
stop func()
// notifier is the count of active Notify goroutines. This entry will
// requests is the count of active requests using this entry. This entry will
// remain in the store as long as this count remains > 0.
notifier int
requests int
}
// TODO: start expiration loop
@ -56,8 +56,8 @@ func (s *Store) Run(ctx context.Context) {
e := s.byKey[he.Key()]
// Only stop the materializer if there are no active calls to Notify.
if e.notifier == 0 {
// Only stop the materializer if there are no active requests.
if e.requests == 0 {
e.stop()
delete(s.byKey, he.Key())
}
@ -68,24 +68,23 @@ func (s *Store) Run(ctx context.Context) {
}
// TODO: godoc
var idleTTL = 20 * time.Minute
type Request interface {
cache.Request
NewMaterializer() *Materializer
Type() string
}
// Get a value from the store, blocking if the store has not yet seen the
// req.Index value.
// See agent/cache.Cache.Get for complete documentation.
func (s *Store) Get(
ctx context.Context,
// TODO: remove typ param, make it part of the Request interface.
typ string,
req Request,
// TODO: only the Index field of ResultMeta is relevant, return a result struct instead.
) (interface{}, cache.ResultMeta, error) {
info := req.CacheInfo()
e := s.getEntry(getEntryOpts{
typ: typ,
info: info,
newMaterializer: req.NewMaterializer,
})
key, e := s.getEntry(req)
defer s.releaseEntry(key)
// TODO: no longer any need to return cache.FetchResult from Materializer.Fetch
// TODO: pass context instead of Done chan, also replaces Timeout param
@ -93,6 +92,7 @@ func (s *Store) Get(
MinIndex: info.MinIndex,
Timeout: info.Timeout,
})
return result.Value, cache.ResultMeta{Index: result.Index}, err
}
@ -100,31 +100,17 @@ func (s *Store) Get(
// See agent/cache.Cache.Notify for complete documentation.
func (s *Store) Notify(
ctx context.Context,
typ string,
req Request,
correlationID string,
updateCh chan<- cache.UpdateEvent,
) error {
info := req.CacheInfo()
e := s.getEntry(getEntryOpts{
typ: typ,
info: info,
newMaterializer: req.NewMaterializer,
notifier: true,
})
key, e := s.getEntry(req)
go func() {
defer s.releaseEntry(key)
index := info.MinIndex
// TODO: better way to handle this?
defer func() {
s.lock.Lock()
e.notifier--
s.byKey[e.expiry.Key()] = e
s.expiryHeap.Update(e.expiry.Index(), idleTTL)
s.lock.Unlock()
}()
for {
result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index})
switch {
@ -149,56 +135,66 @@ func (s *Store) Notify(
case <-ctx.Done():
return
}
}
}()
return nil
}
func (s *Store) getEntry(opts getEntryOpts) entry {
info := opts.info
key := makeEntryKey(opts.typ, info)
// getEntry from the store, and increment the requests counter. releaseEntry
// must be called when the request is finished to decrement the counter.
func (s *Store) getEntry(req Request) (string, entry) {
info := req.CacheInfo()
key := makeEntryKey(req.Type(), info)
s.lock.Lock()
defer s.lock.Unlock()
e, ok := s.byKey[key]
if ok {
s.expiryHeap.Update(e.expiry.Index(), info.Timeout+idleTTL)
if opts.notifier {
e.notifier++
}
return e
e.requests++
s.byKey[key] = e
return key, e
}
ctx, cancel := context.WithCancel(context.Background())
mat := opts.newMaterializer()
mat := req.NewMaterializer()
go mat.Run(ctx)
e = entry{
materializer: mat,
stop: cancel,
expiry: s.expiryHeap.Add(key, info.Timeout+idleTTL),
}
if opts.notifier {
e.notifier++
requests: 1,
}
s.byKey[key] = e
return e
return key, e
}
// idleTTL is the duration of time an entry should remain in the Store after the
// last request for that entry has been terminated.
var idleTTL = 20 * time.Minute
// releaseEntry decrements the request count and starts an expiry timer if the
// count has reached 0. Must be called once for every call to getEntry.
func (s *Store) releaseEntry(key string) {
s.lock.Lock()
defer s.lock.Unlock()
e := s.byKey[key]
e.requests--
s.byKey[key] = e
if e.requests > 0 {
return
}
if e.expiry.Index() == ttlcache.NotIndexed {
e.expiry = s.expiryHeap.Add(key, idleTTL)
s.byKey[key] = e
return
}
s.expiryHeap.Update(e.expiry.Index(), idleTTL)
}
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
func makeEntryKey(typ string, r cache.RequestInfo) string {
return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)
}
type Request interface {
cache.Request
NewMaterializer() *Materializer
}
type getEntryOpts struct {
typ string
info cache.RequestInfo
newMaterializer func() *Materializer
notifier bool
}

View File

@ -30,7 +30,7 @@ func TestStore_Get_Fresh(t *testing.T) {
newEventServiceHealthRegister(10, 1, "srv1"),
newEventServiceHealthRegister(22, 2, "srv1"))
result, md, err := store.Get(ctx, "test", req)
result, md, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(22), md.Index)
@ -39,11 +39,11 @@ func TestStore_Get_Fresh(t *testing.T) {
require.Len(t, r.srvs, 2)
require.Equal(t, uint64(22), r.index)
store.lock.Lock()
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey("test", req.CacheInfo())]
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index())
store.lock.Lock()
defer store.lock.Unlock()
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
}
@ -80,6 +80,10 @@ func (r *fakeRequest) NewMaterializer() *Materializer {
})
}
func (r *fakeRequest) Type() string {
return fmt.Sprintf("%T", r)
}
type fakeView struct {
srvs map[string]*pbservice.CheckServiceNode
}