submatview: set up expiry of materializers

This commit is contained in:
Daniel Nephin 2021-02-22 17:43:52 -05:00
parent c574108354
commit 7d13465c7b

View File

@ -20,7 +20,9 @@ type entry struct {
materializer *Materializer
expiry *ttlcache.Entry
stop func()
// TODO: add watchCount
// notifier is the count of active Notify goroutines. This entry will
// remain in the store as long as this count remains > 0.
notifier int
}
// TODO: start expiration loop
@ -52,11 +54,13 @@ func (s *Store) Run(ctx context.Context) {
he := timer.Entry
s.expiryHeap.Remove(he.Index())
// TODO: expiry here
// if e.watchCount == 0 {}
e := s.byKey[he.Key()]
e.stop()
//delete(s.entries, entry.Key())
// Only stop the materializer if there are no active calls to Notify.
if e.notifier == 0 {
e.stop()
delete(s.byKey, he.Key())
}
s.lock.Unlock()
}
@ -77,11 +81,11 @@ func (s *Store) Get(
// TODO: only the Index field of ResultMeta is relevant, return a result struct instead.
) (interface{}, cache.ResultMeta, error) {
info := req.CacheInfo()
key := makeEntryKey(typ, info)
e := s.getEntry(key, req.NewMaterializer)
// TODO: requires a lock to update the heap.
//s.expiryHeap.Update(e.expiry.Index(), info.Timeout + ttl)
e := s.getEntry(getEntryOpts{
typ: typ,
info: info,
newMaterializer: req.NewMaterializer,
})
// TODO: no longer any need to return cache.FetchResult from Materializer.Fetch
// TODO: pass context instead of Done chan, also replaces Timeout param
@ -101,15 +105,26 @@ func (s *Store) Notify(
correlationID string,
updateCh chan<- cache.UpdateEvent,
) error {
// TODO: set entry to not expire until ctx is cancelled.
info := req.CacheInfo()
key := makeEntryKey(typ, info)
e := s.getEntry(key, req.NewMaterializer)
var index uint64
e := s.getEntry(getEntryOpts{
typ: typ,
info: info,
newMaterializer: req.NewMaterializer,
notifier: true,
})
go func() {
index := info.MinIndex
// TODO: better way to handle this?
defer func() {
s.lock.Lock()
e.notifier--
s.byKey[e.expiry.Key()] = e
s.expiryHeap.Update(e.expiry.Index(), idleTTL)
s.lock.Unlock()
}()
for {
result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index})
switch {
@ -140,28 +155,33 @@ func (s *Store) Notify(
return nil
}
func (s *Store) getEntry(key string, newMat func() *Materializer) entry {
s.lock.RLock()
e, ok := s.byKey[key]
s.lock.RUnlock()
if ok {
return e
}
func (s *Store) getEntry(opts getEntryOpts) entry {
info := opts.info
key := makeEntryKey(opts.typ, info)
s.lock.Lock()
defer s.lock.Unlock()
// Check again after acquiring the write lock, in case we raced to create the entry.
e, ok = s.byKey[key]
e, ok := s.byKey[key]
if ok {
s.expiryHeap.Update(e.expiry.Index(), info.Timeout+idleTTL)
if opts.notifier {
e.notifier++
}
return e
}
e = entry{materializer: newMat()}
ctx, cancel := context.WithCancel(context.Background())
e.stop = cancel
go e.materializer.Run(ctx)
mat := opts.newMaterializer()
go mat.Run(ctx)
e = entry{
materializer: mat,
stop: cancel,
expiry: s.expiryHeap.Add(key, info.Timeout+idleTTL),
}
if opts.notifier {
e.notifier++
}
s.byKey[key] = e
return e
}
@ -175,3 +195,10 @@ type Request interface {
cache.Request
NewMaterializer() *Materializer
}
type getEntryOpts struct {
typ string
info cache.RequestInfo
newMaterializer func() *Materializer
notifier bool
}