mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
7e6b3e6a0c
Fixes #4969 This implements non-blocking request polling at the cache layer which is currently only used for prepared queries. Additionally this enables the proxycfg manager to poll prepared queries for use in envoy proxy upstreams.
389 lines
12 KiB
Go
389 lines
12 KiB
Go
package cache
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// Test that a type registered with a periodic refresh can be watched.
|
|
func TestCacheNotify(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
typ := TestType(t)
|
|
defer typ.AssertExpectations(t)
|
|
c := TestCache(t)
|
|
c.RegisterType("t", typ, &RegisterOptions{
|
|
Refresh: false,
|
|
})
|
|
|
|
// Setup triggers to control when "updates" should be delivered
|
|
trigger := make([]chan time.Time, 4)
|
|
for i := range trigger {
|
|
trigger[i] = make(chan time.Time)
|
|
}
|
|
|
|
// Configure the type
|
|
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) {
|
|
// Assert the right request type - all real Fetch implementations do this so
|
|
// it keeps us honest that Watch doesn't require type mangling which will
|
|
// break in real life (hint: it did on the first attempt)
|
|
_, ok := args.Get(1).(*MockRequest)
|
|
require.True(t, ok)
|
|
})
|
|
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[0])
|
|
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[1])
|
|
typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[2])
|
|
// It's timing dependent whether the blocking loop manages to make another
|
|
// call before we cancel so don't require it. We need to have a higher index
|
|
// here because if the index is the same then the cache Get will not return
|
|
// until the full 10 min timeout expires. This causes the last fetch to return
|
|
// after cancellation as if it had timed out.
|
|
typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[3])
|
|
|
|
require := require.New(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
ch := make(chan UpdateEvent)
|
|
|
|
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch)
|
|
require.NoError(err)
|
|
|
|
// Should receive the first result pretty soon
|
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
|
CorrelationID: "test",
|
|
Result: 1,
|
|
Meta: ResultMeta{Hit: false, Index: 4},
|
|
Err: nil,
|
|
})
|
|
|
|
// There should be no more updates delivered yet
|
|
require.Len(ch, 0)
|
|
|
|
// Trigger blocking query to return a "change"
|
|
close(trigger[0])
|
|
|
|
// Should receive the next result pretty soon
|
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
|
CorrelationID: "test",
|
|
Result: 12,
|
|
// Note these are never cache "hits" because blocking will wait until there
|
|
// is a new value at which point it's not considered a hit.
|
|
Meta: ResultMeta{Hit: false, Index: 5},
|
|
Err: nil,
|
|
})
|
|
|
|
// Register a second observer using same chan and request. Note that this is
|
|
// testing a few things implicitly:
|
|
// - that multiple watchers on the same cache entity are de-duped in their
|
|
// requests to the "backend"
|
|
// - that multiple watchers can distinguish their results using correlationID
|
|
err = c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test2", ch)
|
|
require.NoError(err)
|
|
|
|
// Should get test2 notify immediately, and it should be a cache hit
|
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
|
CorrelationID: "test2",
|
|
Result: 12,
|
|
Meta: ResultMeta{Hit: true, Index: 5},
|
|
Err: nil,
|
|
})
|
|
|
|
// We could wait for a full timeout but we can't directly observe it so
|
|
// simulate the behaviour by triggering a response with the same value and
|
|
// index as the last one.
|
|
close(trigger[1])
|
|
|
|
// We should NOT be notified about that. Note this is timing dependent but
|
|
// it's only a sanity check, if we somehow _do_ get the change delivered later
|
|
// than 10ms the next value assertion will fail anyway.
|
|
time.Sleep(10 * time.Millisecond)
|
|
require.Len(ch, 0)
|
|
|
|
// Trigger final update
|
|
close(trigger[2])
|
|
|
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
|
CorrelationID: "test",
|
|
Result: 42,
|
|
Meta: ResultMeta{Hit: false, Index: 7},
|
|
Err: nil,
|
|
}, UpdateEvent{
|
|
CorrelationID: "test2",
|
|
Result: 42,
|
|
Meta: ResultMeta{Hit: false, Index: 7},
|
|
Err: nil,
|
|
})
|
|
|
|
// Sanity check closing chan before context is cancelled doesn't panic
|
|
//close(ch)
|
|
|
|
// Close context
|
|
cancel()
|
|
|
|
// It's likely but not certain that at least one of the watchers was blocked
|
|
// on the next cache Get so trigger that to timeout so we can observe the
|
|
// watch goroutines being cleaned up. This is necessary since currently we
|
|
// have no way to interrupt a blocking query. In practice it's fine to know
|
|
// that after 10 mins max the blocking query will return and the resources
|
|
// will be cleaned.
|
|
close(trigger[3])
|
|
|
|
// I want to test that cancelling the context cleans up goroutines (which it
|
|
// does from manual verification with debugger etc). I had a check based on a
|
|
// similar approach to https://golang.org/src/net/http/main_test.go#L60 but it
|
|
// was just too flaky because it relies on the timing of the error backoff
|
|
// timer goroutines and similar so I've given up for now as I have more
|
|
// important things to get working.
|
|
}
|
|
|
|
func TestCacheNotifyPolling(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
typ := TestTypeNonBlocking(t)
|
|
defer typ.AssertExpectations(t)
|
|
c := TestCache(t)
|
|
c.RegisterType("t", typ, &RegisterOptions{
|
|
Refresh: false,
|
|
})
|
|
|
|
// Configure the type
|
|
typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) {
|
|
// Assert the right request type - all real Fetch implementations do this so
|
|
// it keeps us honest that Watch doesn't require type mangling which will
|
|
// break in real life (hint: it did on the first attempt)
|
|
_, ok := args.Get(1).(*MockRequest)
|
|
require.True(t, ok)
|
|
})
|
|
typ.Static(FetchResult{Value: 12, Index: 1}, nil).Once()
|
|
typ.Static(FetchResult{Value: 42, Index: 1}, nil).Once()
|
|
|
|
require := require.New(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
ch := make(chan UpdateEvent)
|
|
|
|
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test", ch)
|
|
require.NoError(err)
|
|
|
|
// Should receive the first result pretty soon
|
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
|
CorrelationID: "test",
|
|
Result: 1,
|
|
Meta: ResultMeta{Hit: false, Index: 1},
|
|
Err: nil,
|
|
})
|
|
|
|
// There should be no more updates delivered yet
|
|
require.Len(ch, 0)
|
|
|
|
// make sure the updates do not come too quickly
|
|
select {
|
|
case <-time.After(50 * time.Millisecond):
|
|
case <-ch:
|
|
require.Fail("Received update too early")
|
|
}
|
|
|
|
// make sure we get the update not too far out.
|
|
select {
|
|
case <-time.After(100 * time.Millisecond):
|
|
require.Fail("Didn't receive the notification")
|
|
case result := <-ch:
|
|
require.Equal(result.Result, 12)
|
|
require.Equal(result.CorrelationID, "test")
|
|
require.Equal(result.Meta.Hit, false)
|
|
require.Equal(result.Meta.Index, uint64(1))
|
|
// pretty conservative check it should be even newer because without a second
|
|
// notifier each value returned will have been executed just then and not served
|
|
// from the cache.
|
|
require.True(result.Meta.Age < 50*time.Millisecond)
|
|
require.NoError(result.Err)
|
|
}
|
|
|
|
require.Len(ch, 0)
|
|
|
|
// Register a second observer using same chan and request. Note that this is
|
|
// testing a few things implicitly:
|
|
// - that multiple watchers on the same cache entity are de-duped in their
|
|
// requests to the "backend"
|
|
// - that multiple watchers can distinguish their results using correlationID
|
|
err = c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test2", ch)
|
|
require.NoError(err)
|
|
|
|
// Should get test2 notify immediately, and it should be a cache hit
|
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
|
CorrelationID: "test2",
|
|
Result: 12,
|
|
Meta: ResultMeta{Hit: true, Index: 1},
|
|
Err: nil,
|
|
})
|
|
|
|
require.Len(ch, 0)
|
|
|
|
// wait for the next batch of responses
|
|
events := make([]UpdateEvent, 0)
|
|
// 110 is needed to allow for the jitter
|
|
timeout := time.After(110 * time.Millisecond)
|
|
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case <-timeout:
|
|
require.Fail("UpdateEvent not received in time")
|
|
case eve := <-ch:
|
|
events = append(events, eve)
|
|
}
|
|
}
|
|
|
|
require.Equal(events[0].Result, 42)
|
|
require.Equal(events[0].Meta.Hit, false)
|
|
require.Equal(events[0].Meta.Index, uint64(1))
|
|
require.True(events[0].Meta.Age < 50*time.Millisecond)
|
|
require.NoError(events[0].Err)
|
|
require.Equal(events[1].Result, 42)
|
|
// Sometimes this would be a hit and others not. It all depends on when the various getWithIndex calls got fired.
|
|
// If both are done concurrently then it will not be a cache hit but the request gets single flighted and both
|
|
// get notified at the same time.
|
|
// require.Equal(events[1].Meta.Hit, true)
|
|
require.Equal(events[1].Meta.Index, uint64(1))
|
|
require.True(events[1].Meta.Age < 100*time.Millisecond)
|
|
require.NoError(events[1].Err)
|
|
}
|
|
|
|
// Test that a refresh performs a backoff.
|
|
func TestCacheWatch_ErrorBackoff(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
typ := TestType(t)
|
|
defer typ.AssertExpectations(t)
|
|
c := TestCache(t)
|
|
c.RegisterType("t", typ, &RegisterOptions{
|
|
Refresh: false,
|
|
})
|
|
|
|
// Configure the type
|
|
var retries uint32
|
|
fetchErr := fmt.Errorf("test fetch error")
|
|
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
|
|
typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) {
|
|
atomic.AddUint32(&retries, 1)
|
|
})
|
|
|
|
require := require.New(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
ch := make(chan UpdateEvent)
|
|
|
|
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch)
|
|
require.NoError(err)
|
|
|
|
// Should receive the first result pretty soon
|
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
|
CorrelationID: "test",
|
|
Result: 1,
|
|
Meta: ResultMeta{Hit: false, Index: 4},
|
|
Err: nil,
|
|
})
|
|
|
|
numErrors := 0
|
|
// Loop for a little while and count how many errors we see reported. If this
|
|
// was running as fast as it could go we'd expect this to be huge. We have to
|
|
// be a little careful here because the watch chan ch doesn't have a large
|
|
// buffer so we could be artificially slowing down the loop without the
|
|
// backoff actually taking effect. We can validate that by ensuring this test
|
|
// fails without the backoff code reliably.
|
|
timeoutC := time.After(500 * time.Millisecond)
|
|
OUT:
|
|
for {
|
|
select {
|
|
case <-timeoutC:
|
|
break OUT
|
|
case u := <-ch:
|
|
numErrors++
|
|
require.Error(u.Err)
|
|
}
|
|
}
|
|
// Must be fewer than 10 failures in that time
|
|
require.True(numErrors < 10, fmt.Sprintf("numErrors: %d", numErrors))
|
|
|
|
// Check the number of RPCs as a sanity check too
|
|
actual := atomic.LoadUint32(&retries)
|
|
require.True(actual < 10, fmt.Sprintf("actual: %d", actual))
|
|
}
|
|
|
|
// Test that a refresh performs a backoff.
|
|
func TestCacheWatch_ErrorBackoffNonBlocking(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
typ := TestTypeNonBlocking(t)
|
|
defer typ.AssertExpectations(t)
|
|
c := TestCache(t)
|
|
c.RegisterType("t", typ, &RegisterOptions{
|
|
Refresh: false,
|
|
})
|
|
|
|
// Configure the type
|
|
var retries uint32
|
|
fetchErr := fmt.Errorf("test fetch error")
|
|
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
|
|
typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) {
|
|
atomic.AddUint32(&retries, 1)
|
|
})
|
|
|
|
require := require.New(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
ch := make(chan UpdateEvent)
|
|
|
|
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello", MaxAge: 100 * time.Millisecond}), "test", ch)
|
|
require.NoError(err)
|
|
|
|
// Should receive the first result pretty soon
|
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
|
CorrelationID: "test",
|
|
Result: 1,
|
|
Meta: ResultMeta{Hit: false, Index: 4},
|
|
Err: nil,
|
|
})
|
|
|
|
numErrors := 0
|
|
// Loop for a little while and count how many errors we see reported. If this
|
|
// was running as fast as it could go we'd expect this to be huge. We have to
|
|
// be a little careful here because the watch chan ch doesn't have a large
|
|
// buffer so we could be artificially slowing down the loop without the
|
|
// backoff actually taking effect. We can validate that by ensuring this test
|
|
// fails without the backoff code reliably.
|
|
//
|
|
// 100 + 500 milliseconds. 100 because the first retry will not happen until
|
|
// the 100 + jitter milliseconds have elapsed.
|
|
timeoutC := time.After(600 * time.Millisecond)
|
|
OUT:
|
|
for {
|
|
select {
|
|
case <-timeoutC:
|
|
break OUT
|
|
case u := <-ch:
|
|
numErrors++
|
|
require.Error(u.Err)
|
|
}
|
|
}
|
|
// Must be fewer than 10 failures in that time
|
|
require.True(numErrors < 10, fmt.Sprintf("numErrors: %d", numErrors))
|
|
|
|
// Check the number of RPCs as a sanity check too
|
|
actual := atomic.LoadUint32(&retries)
|
|
require.True(actual < 10, fmt.Sprintf("actual: %d", actual))
|
|
}
|