agent/cache: support timeouts for cache reads and empty fetch results

This commit is contained in:
Mitchell Hashimoto 2018-04-16 12:06:08 +02:00
parent e81942df7a
commit fcb15e15ae
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
4 changed files with 120 additions and 10 deletions

32
agent/cache/cache.go vendored
View File

@ -136,6 +136,9 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) {
// First time through // First time through
first := true first := true
// timeoutCh for watching our tmeout
var timeoutCh <-chan time.Time
RETRY_GET: RETRY_GET:
// Get the current value // Get the current value
c.entriesLock.RLock() c.entriesLock.RLock()
@ -164,16 +167,27 @@ RETRY_GET:
// No longer our first time through // No longer our first time through
first = false first = false
// Set our timeout channel if we must
if info.Timeout > 0 && timeoutCh == nil {
timeoutCh = time.After(info.Timeout)
}
// At this point, we know we either don't have a value at all or the // At this point, we know we either don't have a value at all or the
// value we have is too old. We need to wait for new data. // value we have is too old. We need to wait for new data.
waiter, err := c.fetch(t, key, r) waiterCh, err := c.fetch(t, key, r)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Wait on our waiter and then retry the cache load select {
<-waiter case <-waiterCh:
// Our fetch returned, retry the get from the cache
goto RETRY_GET goto RETRY_GET
case <-timeoutCh:
// Timeout on the cache read, just return whatever we have.
return entry.Value, nil
}
} }
// entryKey returns the key for the entry in the cache. See the note // entryKey returns the key for the entry in the cache. See the note
@ -216,16 +230,26 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
// The actual Fetch must be performed in a goroutine. // The actual Fetch must be performed in a goroutine.
go func() { go func() {
// Start building the new entry by blocking on the fetch. // Start building the new entry by blocking on the fetch.
var newEntry cacheEntry
result, err := tEntry.Type.Fetch(FetchOptions{ result, err := tEntry.Type.Fetch(FetchOptions{
MinIndex: entry.Index, MinIndex: entry.Index,
}, r) }, r)
var newEntry cacheEntry
if result.Value == nil {
// If no value was set, then we do not change the prior entry.
// Instead, we just update the waiter to be new so that another
// Get will wait on the correct value.
newEntry = entry
newEntry.Fetching = false
} else {
// A new value was given, so we create a brand new entry.
newEntry.Value = result.Value newEntry.Value = result.Value
newEntry.Index = result.Index newEntry.Index = result.Index
newEntry.Error = err newEntry.Error = err
// This is a valid entry with a result // This is a valid entry with a result
newEntry.Valid = true newEntry.Valid = true
}
// Create a new waiter that will be used for the next fetch. // Create a new waiter that will be used for the next fetch.
newEntry.Waiter = make(chan struct{}) newEntry.Waiter = make(chan struct{})

View File

@ -194,6 +194,77 @@ func TestCacheGet_blockingIndex(t *testing.T) {
TestCacheGetChResult(t, resultCh, 42) TestCacheGetChResult(t, resultCh, 42)
} }
// Test a get with an index set will timeout if the fetch doesn't return
// anything.
func TestCacheGet_blockingIndexTimeout(t *testing.T) {
t.Parallel()
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
// Configure the type
triggerCh := make(chan time.Time)
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once()
typ.Static(FetchResult{Value: 42, Index: 6}, nil).WaitUntil(triggerCh)
// Fetch should block
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
Key: "hello", MinIndex: 5, Timeout: 200 * time.Millisecond}))
// Should block
select {
case <-resultCh:
t.Fatal("should block")
case <-time.After(50 * time.Millisecond):
}
// Should return after more of the timeout
select {
case result := <-resultCh:
require.Equal(t, 12, result)
case <-time.After(300 * time.Millisecond):
t.Fatal("should've returned")
}
}
// Test that if a Type returns an empty value on Fetch that the previous
// value is preserved.
func TestCacheGet_emptyFetchResult(t *testing.T) {
t.Parallel()
require := require.New(t)
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
// Configure the type
typ.Static(FetchResult{Value: 42, Index: 1}, nil).Times(1)
typ.Static(FetchResult{Value: nil}, nil)
// Get, should fetch
req := TestRequest(t, RequestInfo{Key: "hello"})
result, err := c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Get, should not fetch since we already have a satisfying value
req = TestRequest(t, RequestInfo{
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
result, err = c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Sleep a tiny bit just to let maybe some background calls happen
// then verify that we still only got the one call
time.Sleep(20 * time.Millisecond)
typ.AssertExpectations(t)
}
// Test that a type registered with a periodic refresh will perform // Test that a type registered with a periodic refresh will perform
// that refresh after the timer is up. // that refresh after the timer is up.
func TestCacheGet_periodicRefresh(t *testing.T) { func TestCacheGet_periodicRefresh(t *testing.T) {

View File

@ -1,5 +1,9 @@
package cache package cache
import (
"time"
)
// Request is a cache-able request. // Request is a cache-able request.
// //
// This interface is typically implemented by request structures in // This interface is typically implemented by request structures in
@ -36,4 +40,9 @@ type RequestInfo struct {
// to block until new data is available. If no index is available, the // to block until new data is available. If no index is available, the
// default value (zero) is acceptable. // default value (zero) is acceptable.
MinIndex uint64 MinIndex uint64
// Timeout is the timeout for waiting on a blocking query. When the
// timeout is reached, the last known value is returned (or maybe nil
// if there was no prior value).
Timeout time.Duration
} }

6
agent/cache/type.go vendored
View File

@ -15,6 +15,12 @@ type Type interface {
// //
// The return value is a FetchResult which contains information about // The return value is a FetchResult which contains information about
// the fetch. // the fetch.
//
// On timeout, FetchResult can behave one of two ways. First, it can
// return the last known value. This is the default behavior of blocking
// RPC calls in Consul so this allows cache types to be implemented with
// no extra logic. Second, FetchResult can return an unset value and index.
// In this case, the cache will reuse the last value automatically.
Fetch(FetchOptions, Request) (FetchResult, error) Fetch(FetchOptions, Request) (FetchResult, error)
} }