mirror of https://github.com/status-im/consul.git
submatview: rough outline of the Get and Notify methods.
This commit is contained in:
parent
3d52abb5e2
commit
ddddbdb990
|
@ -204,9 +204,8 @@ func (m *Materializer) notifyUpdateLocked(err error) {
|
|||
m.updateCh = make(chan struct{})
|
||||
}
|
||||
|
||||
// Fetch implements the logic a StreamingCacheType will need during it's Fetch
|
||||
// call. Cache types that use streaming should just be able to proxy to this
|
||||
// once they have a subscription object and return it's results directly.
|
||||
// Fetch the value stored in the View. Fetch blocks until the index of the View
|
||||
// is greater than opts.MinIndex, or the context is cancelled.
|
||||
func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) {
|
||||
var result cache.FetchResult
|
||||
|
||||
|
|
|
@ -12,26 +12,48 @@ import (
|
|||
|
||||
type Store struct {
|
||||
lock sync.RWMutex
|
||||
byKey map[string]*Materializer
|
||||
byKey map[string]entry
|
||||
expiryHeap *ttlcache.ExpiryHeap
|
||||
}
|
||||
|
||||
type entry struct {
|
||||
materializer *Materializer
|
||||
expiry *ttlcache.Entry
|
||||
}
|
||||
|
||||
// TODO: start expiration loop
|
||||
func NewStore() *Store {
|
||||
return &Store{
|
||||
byKey: make(map[string]*Materializer),
|
||||
byKey: make(map[string]entry),
|
||||
expiryHeap: ttlcache.NewExpiryHeap(),
|
||||
}
|
||||
}
|
||||
|
||||
var ttl = 20 * time.Minute
|
||||
|
||||
// Get a value from the store, blocking if the store has not yet seen the
|
||||
// req.Index value.
|
||||
// See agent/cache.Cache.Get for complete documentation.
|
||||
func (s *Store) Get(
|
||||
ctx context.Context,
|
||||
typ string,
|
||||
req cache.Request,
|
||||
) (result interface{}, meta cache.ResultMeta, err error) {
|
||||
return nil, cache.ResultMeta{}, nil
|
||||
req Request,
|
||||
// TODO: only the Index field of ResultMeta is relevant, return a result struct instead.
|
||||
) (interface{}, cache.ResultMeta, error) {
|
||||
info := req.CacheInfo()
|
||||
key := makeEntryKey(typ, info)
|
||||
e := s.getEntry(key, req.NewMaterializer)
|
||||
|
||||
// TODO: requires a lock to update the heap.
|
||||
s.expiryHeap.Update(e.expiry.Index(), ttl)
|
||||
|
||||
// TODO: no longer any need to return cache.FetchResult from Materializer.Fetch
|
||||
// TODO: pass context instead of Done chan, also replaces Timeout param
|
||||
result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{
|
||||
MinIndex: info.MinIndex,
|
||||
Timeout: info.Timeout,
|
||||
})
|
||||
return result.Value, cache.ResultMeta{Index: result.Index}, err
|
||||
}
|
||||
|
||||
// Notify the updateCh when there are updates to the entry identified by req.
|
||||
|
@ -39,60 +61,75 @@ func (s *Store) Get(
|
|||
func (s *Store) Notify(
|
||||
ctx context.Context,
|
||||
typ string,
|
||||
req cache.Request,
|
||||
req Request,
|
||||
correlationID string,
|
||||
updateCh chan<- cache.UpdateEvent,
|
||||
) error {
|
||||
// TODO: set entry to not expire until ctx is cancelled.
|
||||
|
||||
info := req.CacheInfo()
|
||||
key := makeEntryKey(typ, info)
|
||||
e := s.getEntry(key, req.NewMaterializer)
|
||||
|
||||
var index uint64
|
||||
|
||||
go func() {
|
||||
for {
|
||||
result, err := e.materializer.Fetch(ctx.Done(), cache.FetchOptions{MinIndex: index})
|
||||
switch {
|
||||
case ctx.Err() != nil:
|
||||
return
|
||||
case err != nil:
|
||||
// TODO: cache.Notify sends errors on updateCh, should this do the same?
|
||||
// It seems like only fetch errors would ever get sent along.
|
||||
// TODO: log warning
|
||||
continue
|
||||
}
|
||||
|
||||
index = result.Index
|
||||
u := cache.UpdateEvent{
|
||||
CorrelationID: correlationID,
|
||||
Result: result.Value,
|
||||
Meta: cache.ResultMeta{Index: result.Index},
|
||||
Err: err,
|
||||
}
|
||||
select {
|
||||
case updateCh <- u:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) getMaterializer(opts GetOptions) *Materializer {
|
||||
// TODO: use makeEntryKey
|
||||
var key string
|
||||
|
||||
func (s *Store) getEntry(key string, newMat func() *Materializer) entry {
|
||||
s.lock.RLock()
|
||||
mat, ok := s.byKey[key]
|
||||
e, ok := s.byKey[key]
|
||||
s.lock.RUnlock()
|
||||
|
||||
if ok {
|
||||
return mat
|
||||
return e
|
||||
}
|
||||
|
||||
s.lock.Lock()
|
||||
mat, ok = s.byKey[key]
|
||||
if !ok {
|
||||
mat = opts.NewMaterializer()
|
||||
s.byKey[opts.Key] = mat
|
||||
defer s.lock.Unlock()
|
||||
e, ok = s.byKey[key]
|
||||
if ok {
|
||||
return e
|
||||
}
|
||||
s.lock.Unlock()
|
||||
return mat
|
||||
|
||||
e = entry{materializer: newMat()}
|
||||
s.byKey[key] = e
|
||||
return e
|
||||
}
|
||||
|
||||
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
|
||||
func makeEntryKey(t, dc, token, key string) string {
|
||||
return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key)
|
||||
func makeEntryKey(typ string, r cache.RequestInfo) string {
|
||||
return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)
|
||||
}
|
||||
|
||||
type GetOptions struct {
|
||||
// TODO: needs to use makeEntryKey
|
||||
Key string
|
||||
|
||||
// MinIndex is the index previously seen by the caller. If MinIndex>0 Fetch
|
||||
// will not return until the index is >MinIndex, or Timeout is hit.
|
||||
MinIndex uint64
|
||||
|
||||
// TODO: maybe remove and use a context deadline.
|
||||
Timeout time.Duration
|
||||
|
||||
// NewMaterializer returns a new Materializer to be used if the store does
|
||||
// not have one already running for the given key.
|
||||
NewMaterializer func() *Materializer
|
||||
}
|
||||
|
||||
type FetchResult struct {
|
||||
// Value is the result of the fetch.
|
||||
Value interface{}
|
||||
|
||||
// Index is the corresponding index value for this data.
|
||||
Index uint64
|
||||
type Request interface {
|
||||
cache.Request
|
||||
NewMaterializer() *Materializer
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue