diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index b5a5157541..51568d7d9e 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -204,9 +204,8 @@ func (m *Materializer) notifyUpdateLocked(err error) { m.updateCh = make(chan struct{}) } -// Fetch implements the logic a StreamingCacheType will need during it's Fetch -// call. Cache types that use streaming should just be able to proxy to this -// once they have a subscription object and return it's results directly. +// Fetch the value stored in the View. Fetch blocks until the index of the View +// is greater than opts.MinIndex, or the context is cancelled. func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) { var result cache.FetchResult diff --git a/agent/submatview/store.go b/agent/submatview/store.go index e5603102fb..9df57214bf 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -12,26 +12,48 @@ import ( type Store struct { lock sync.RWMutex - byKey map[string]*Materializer + byKey map[string]entry expiryHeap *ttlcache.ExpiryHeap } +type entry struct { + materializer *Materializer + expiry *ttlcache.Entry +} + +// TODO: start expiration loop func NewStore() *Store { return &Store{ - byKey: make(map[string]*Materializer), + byKey: make(map[string]entry), expiryHeap: ttlcache.NewExpiryHeap(), } } +var ttl = 20 * time.Minute + // Get a value from the store, blocking if the store has not yet seen the // req.Index value. // See agent/cache.Cache.Get for complete documentation. func (s *Store) Get( ctx context.Context, typ string, - req cache.Request, -) (result interface{}, meta cache.ResultMeta, err error) { - return nil, cache.ResultMeta{}, nil + req Request, + // TODO: only the Index field of ResultMeta is relevant, return a result struct instead. +) (interface{}, cache.ResultMeta, error) { + info := req.CacheInfo() + key := makeEntryKey(typ, info) + e := s.getEntry(key, req.NewMaterializer) + + // TODO: requires a lock to update the heap. + s.expiryHeap.Update(e.expiry.Index(), ttl) + + // TODO: no longer any need to return cache.FetchResult from Materializer.Fetch + // TODO: pass context instead of Done chan, also replaces Timeout param + result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{ + MinIndex: info.MinIndex, + Timeout: info.Timeout, + }) + return result.Value, cache.ResultMeta{Index: result.Index}, err } // Notify the updateCh when there are updates to the entry identified by req. @@ -39,60 +61,75 @@ func (s *Store) Get( func (s *Store) Notify( ctx context.Context, typ string, - req cache.Request, + req Request, correlationID string, updateCh chan<- cache.UpdateEvent, ) error { + // TODO: set entry to not expire until ctx is cancelled. + + info := req.CacheInfo() + key := makeEntryKey(typ, info) + e := s.getEntry(key, req.NewMaterializer) + + var index uint64 + + go func() { + for { + result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index}) + switch { + case ctx.Err() != nil: + return + case err != nil: + // TODO: cache.Notify sends errors on updateCh, should this do the same? + // It seems like only fetch errors would ever get sent along. + // TODO: log warning + continue + } + + index = result.Index + u := cache.UpdateEvent{ + CorrelationID: correlationID, + Result: result.Value, + Meta: cache.ResultMeta{Index: result.Index}, + Err: err, + } + select { + case updateCh <- u: + case <-ctx.Done(): + return + } + + } + }() return nil } -func (s *Store) getMaterializer(opts GetOptions) *Materializer { - // TODO: use makeEntryKey - var key string - +func (s *Store) getEntry(key string, newMat func() *Materializer) entry { s.lock.RLock() - mat, ok := s.byKey[key] + e, ok := s.byKey[key] s.lock.RUnlock() - if ok { - return mat + return e } s.lock.Lock() - mat, ok = s.byKey[key] - if !ok { - mat = opts.NewMaterializer() - s.byKey[opts.Key] = mat + defer s.lock.Unlock() + e, ok = s.byKey[key] + if ok { + return e } - s.lock.Unlock() - return mat + + e = entry{materializer: newMat()} + s.byKey[key] = e + return e } // makeEntryKey matches agent/cache.makeEntryKey, but may change in the future. -func makeEntryKey(t, dc, token, key string) string { - return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key) +func makeEntryKey(typ string, r cache.RequestInfo) string { + return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key) } -type GetOptions struct { - // TODO: needs to use makeEntryKey - Key string - - // MinIndex is the index previously seen by the caller. If MinIndex>0 Fetch - // will not return until the index is >MinIndex, or Timeout is hit. - MinIndex uint64 - - // TODO: maybe remove and use a context deadline. - Timeout time.Duration - - // NewMaterializer returns a new Materializer to be used if the store does - // not have one already running for the given key. - NewMaterializer func() *Materializer -} - -type FetchResult struct { - // Value is the result of the fetch. - Value interface{} - - // Index is the corresponding index value for this data. - Index uint64 +type Request interface { + cache.Request + NewMaterializer() *Materializer }