mirror of https://github.com/status-im/consul.git
agent/cache: string through attempt rather than storing on the entry
This commit is contained in:
parent
cfcd733609
commit
fc5508f8a3
|
@ -257,7 +257,7 @@ RETRY_GET:
|
||||||
|
|
||||||
// At this point, we know we either don't have a value at all or the
|
// At this point, we know we either don't have a value at all or the
|
||||||
// value we have is too old. We need to wait for new data.
|
// value we have is too old. We need to wait for new data.
|
||||||
waiterCh, err := c.fetch(t, key, r, true)
|
waiterCh, err := c.fetch(t, key, r, true, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -287,7 +287,7 @@ func (c *Cache) entryKey(r *RequestInfo) string {
|
||||||
// If allowNew is true then the fetch should create the cache entry
|
// If allowNew is true then the fetch should create the cache entry
|
||||||
// if it doesn't exist. If this is false, then fetch will do nothing
|
// if it doesn't exist. If this is false, then fetch will do nothing
|
||||||
// if the entry doesn't exist. This latter case is to support refreshing.
|
// if the entry doesn't exist. This latter case is to support refreshing.
|
||||||
func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{}, error) {
|
func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint8) (<-chan struct{}, error) {
|
||||||
// Get the type that we're fetching
|
// Get the type that we're fetching
|
||||||
c.typesLock.RLock()
|
c.typesLock.RLock()
|
||||||
tEntry, ok := c.types[t]
|
tEntry, ok := c.types[t]
|
||||||
|
@ -355,14 +355,14 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{},
|
||||||
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
|
||||||
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1)
|
||||||
|
|
||||||
// Reset the attepts counter so we don't have any backoff
|
// Reset the attempts counter so we don't have any backoff
|
||||||
newEntry.ErrAttempts = 0
|
attempt = 0
|
||||||
} else {
|
} else {
|
||||||
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
|
||||||
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
|
||||||
|
|
||||||
// Always increment the attempts to control backoff
|
// Increment attempt counter
|
||||||
newEntry.ErrAttempts++
|
attempt++
|
||||||
|
|
||||||
// If the entry wasn't valid, we set an error. If it was valid,
|
// If the entry wasn't valid, we set an error. If it was valid,
|
||||||
// we don't set an error so that the prior value can continue
|
// we don't set an error so that the prior value can continue
|
||||||
|
@ -399,7 +399,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{},
|
||||||
// If refresh is enabled, run the refresh in due time. The refresh
|
// If refresh is enabled, run the refresh in due time. The refresh
|
||||||
// below might block, but saves us from spawning another goroutine.
|
// below might block, but saves us from spawning another goroutine.
|
||||||
if tEntry.Opts.Refresh {
|
if tEntry.Opts.Refresh {
|
||||||
c.refresh(tEntry.Opts, newEntry.ErrAttempts, t, key, r)
|
c.refresh(tEntry.Opts, attempt, t, key, r)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -456,7 +456,7 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint8, t string, key stri
|
||||||
// Trigger. The "allowNew" field is false because in the time we were
|
// Trigger. The "allowNew" field is false because in the time we were
|
||||||
// waiting to refresh we may have expired and got evicted. If that
|
// waiting to refresh we may have expired and got evicted. If that
|
||||||
// happened, we don't want to create a new entry.
|
// happened, we don't want to create a new entry.
|
||||||
c.fetch(t, key, r, false)
|
c.fetch(t, key, r, false, attempt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// runExpiryLoop is a blocking function that watches the expiration
|
// runExpiryLoop is a blocking function that watches the expiration
|
||||||
|
|
|
@ -16,10 +16,9 @@ type cacheEntry struct {
|
||||||
Index uint64
|
Index uint64
|
||||||
|
|
||||||
// Metadata that is used for internal accounting
|
// Metadata that is used for internal accounting
|
||||||
Valid bool // True if the Value is set
|
Valid bool // True if the Value is set
|
||||||
Fetching bool // True if a fetch is already active
|
Fetching bool // True if a fetch is already active
|
||||||
Waiter chan struct{} // Closed when this entry is invalidated
|
Waiter chan struct{} // Closed when this entry is invalidated
|
||||||
ErrAttempts uint8 // Number of fetch errors since last success (Error may be nil)
|
|
||||||
|
|
||||||
// Expiry contains information about the expiration of this
|
// Expiry contains information about the expiration of this
|
||||||
// entry. This is a pointer as its shared as a value in the
|
// entry. This is a pointer as its shared as a value in the
|
||||||
|
|
Loading…
Reference in New Issue