mirror of https://github.com/status-im/consul.git
NET-9084 - add tests to peering endpoint and blockingquery package to assert blocking works properly. (#21078)
This commit is contained in:
parent
8d4525ae50
commit
17df32e5cb
|
@ -28,6 +28,8 @@ type QueryFn func(memdb.WatchSet, *state.Store) error
|
|||
|
||||
// RequestOptions are options used by Server.blockingQuery to modify the
|
||||
// behaviour of the query operation, or to populate response metadata.
|
||||
//
|
||||
//go:generate mockery --name RequestOptions --inpackage
|
||||
type RequestOptions interface {
|
||||
GetToken() string
|
||||
GetMinQueryIndex() uint64
|
||||
|
@ -37,6 +39,8 @@ type RequestOptions interface {
|
|||
|
||||
// ResponseMeta is an interface used to populate the response struct
|
||||
// with metadata about the query and the state of the server.
|
||||
//
|
||||
//go:generate mockery --name ResponseMeta --inpackage
|
||||
type ResponseMeta interface {
|
||||
SetLastContact(time.Duration)
|
||||
SetKnownLeader(bool)
|
||||
|
@ -47,6 +51,8 @@ type ResponseMeta interface {
|
|||
|
||||
// FSMServer is interface into the stateful components of a Consul server, such
|
||||
// as memdb or raft leadership.
|
||||
//
|
||||
//go:generate mockery --name FSMServer --inpackage
|
||||
type FSMServer interface {
|
||||
ConsistentRead() error
|
||||
DecrementBlockingQueries() uint64
|
||||
|
|
|
@ -3,5 +3,338 @@
|
|||
|
||||
package blockingquery
|
||||
|
||||
// TODO: move tests from the consul package, rpc_test.go, TestServer_blockingQuery
|
||||
// here using mock for FSMServer w/ structs.QueryOptions and structs.QueryOptions
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestServer_blockingQuery(t *testing.T) {
|
||||
t.Parallel()
|
||||
getFSM := func(t *testing.T, additionalCfgFunc func(mockFSM *MockFSMServer)) *MockFSMServer {
|
||||
fsm := NewMockFSMServer(t)
|
||||
testCh := make(chan struct{})
|
||||
tombstoneGC, err := state.NewTombstoneGC(time.Second, time.Second)
|
||||
require.NoError(t, err)
|
||||
store := state.NewStateStore(tombstoneGC)
|
||||
fsm.On("GetShutdownChannel").Return(testCh)
|
||||
fsm.On("GetState").Return(store)
|
||||
fsm.On("SetQueryMeta", mock.Anything, mock.Anything).Return(nil)
|
||||
if additionalCfgFunc != nil {
|
||||
additionalCfgFunc(fsm)
|
||||
}
|
||||
return fsm
|
||||
}
|
||||
|
||||
getOpts := func(t *testing.T, additionalCfgFunc func(options *MockRequestOptions)) *MockRequestOptions {
|
||||
requestOpts := NewMockRequestOptions(t)
|
||||
requestOpts.On("GetRequireConsistent").Return(false)
|
||||
requestOpts.On("GetToken").Return("fake-token")
|
||||
if additionalCfgFunc != nil {
|
||||
additionalCfgFunc(requestOpts)
|
||||
}
|
||||
return requestOpts
|
||||
}
|
||||
|
||||
getMeta := func(t *testing.T, additionalCfgFunc func(mockMeta *MockResponseMeta)) *MockResponseMeta {
|
||||
meta := NewMockResponseMeta(t)
|
||||
if additionalCfgFunc != nil {
|
||||
additionalCfgFunc(meta)
|
||||
}
|
||||
return meta
|
||||
}
|
||||
|
||||
// Perform a non-blocking query. Note that it's significant that the meta has
|
||||
// a zero index in response - the implied opts.MinQueryIndex is also zero but
|
||||
// this should not block still.
|
||||
t.Run("non-blocking query", func(t *testing.T) {
|
||||
var calls int
|
||||
fn := func(_ memdb.WatchSet, _ *state.Store) error {
|
||||
calls++
|
||||
return nil
|
||||
}
|
||||
err := Query(getFSM(t, nil), getOpts(t, func(mockOpts *MockRequestOptions) {
|
||||
mockOpts.On("GetMinQueryIndex").Return(uint64(0))
|
||||
}), getMeta(t, nil), fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, calls)
|
||||
})
|
||||
|
||||
// Perform a blocking query that gets woken up and loops around once.
|
||||
t.Run("blocking query - single loop", func(t *testing.T) {
|
||||
opts := getOpts(t, func(options *MockRequestOptions) {
|
||||
options.On("GetMinQueryIndex").Return(uint64(1))
|
||||
options.On("GetMaxQueryTime").Return(1*time.Second, nil)
|
||||
})
|
||||
|
||||
meta := getMeta(t, func(mockMeta *MockResponseMeta) {
|
||||
mockMeta.On("GetIndex").Return(uint64(1))
|
||||
})
|
||||
|
||||
fsm := getFSM(t, func(mockFSM *MockFSMServer) {
|
||||
mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second)
|
||||
mockFSM.On("IncrementBlockingQueries").Return(uint64(1))
|
||||
mockFSM.On("DecrementBlockingQueries").Return(uint64(1))
|
||||
})
|
||||
|
||||
var calls int
|
||||
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||
if calls == 0 {
|
||||
meta.On("GetIndex").Return(uint64(3))
|
||||
|
||||
fakeCh := make(chan struct{})
|
||||
close(fakeCh)
|
||||
ws.Add(fakeCh)
|
||||
} else {
|
||||
meta.On("GetIndex").Return(uint64(4))
|
||||
}
|
||||
calls++
|
||||
return nil
|
||||
}
|
||||
err := Query(fsm, opts, meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, calls)
|
||||
})
|
||||
|
||||
// Perform a blocking query that returns a zero index from blocking func (e.g.
|
||||
// no state yet). This should still return an empty response immediately, but
|
||||
// with index of 1 and then block on the next attempt. In one sense zero index
|
||||
// is not really a valid response from a state method that is not an error but
|
||||
// in practice a lot of state store operations do return it unless they
|
||||
// explicitly special checks to turn 0 into 1. Often this is not caught or
|
||||
// covered by tests but eventually when hit in the wild causes blocking
|
||||
// clients to busy loop and burn CPU. This test ensure that blockingQuery
|
||||
// systematically does the right thing to prevent future bugs like that.
|
||||
t.Run("blocking query with 0 modifyIndex from state func", func(t *testing.T) {
|
||||
opts := getOpts(t, func(options *MockRequestOptions) {
|
||||
options.On("GetMinQueryIndex").Return(uint64(0))
|
||||
})
|
||||
|
||||
meta := getMeta(t, func(mockMeta *MockResponseMeta) {
|
||||
mockMeta.On("GetIndex").Return(uint64(1))
|
||||
})
|
||||
|
||||
fsm := getFSM(t, func(mockFSM *MockFSMServer) {
|
||||
mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second)
|
||||
mockFSM.On("IncrementBlockingQueries").Return(uint64(1))
|
||||
mockFSM.On("DecrementBlockingQueries").Return(uint64(1))
|
||||
})
|
||||
var calls int
|
||||
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||
if opts.GetMinQueryIndex() > 0 {
|
||||
// If client requested blocking, block forever. This is simulating
|
||||
// waiting for the watched resource to be initialized/written to giving
|
||||
// it a non-zero index. Note the timeout on the query options is relied
|
||||
// on to stop the test taking forever.
|
||||
fakeCh := make(chan struct{})
|
||||
ws.Add(fakeCh)
|
||||
}
|
||||
meta.On("GetIndex").Return(uint64(0))
|
||||
calls++
|
||||
return nil
|
||||
}
|
||||
err := Query(fsm, opts, meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, calls)
|
||||
require.Equal(t, uint64(1), meta.GetIndex(),
|
||||
"expect fake index of 1 to force client to block on next update")
|
||||
|
||||
// Simulate client making next request
|
||||
opts = getOpts(t, func(options *MockRequestOptions) {
|
||||
options.On("GetMinQueryIndex").Return(uint64(1))
|
||||
options.On("GetMaxQueryTime").Return(20*time.Millisecond, nil)
|
||||
})
|
||||
|
||||
// This time we should block even though the func returns index 0 still
|
||||
t0 := time.Now()
|
||||
require.NoError(t, Query(fsm, opts, meta, fn))
|
||||
t1 := time.Now()
|
||||
require.Equal(t, 2, calls)
|
||||
require.Equal(t, uint64(1), meta.GetIndex(),
|
||||
"expect fake index of 1 to force client to block on next update")
|
||||
require.True(t, t1.Sub(t0) > 20*time.Millisecond,
|
||||
"should have actually blocked waiting for timeout")
|
||||
|
||||
})
|
||||
|
||||
// Perform a query that blocks and gets interrupted when the state store
|
||||
// is abandoned.
|
||||
t.Run("blocking query interrupted by abandonCh", func(t *testing.T) {
|
||||
opts := getOpts(t, func(options *MockRequestOptions) {
|
||||
options.On("GetMinQueryIndex").Return(uint64(3))
|
||||
options.On("GetMaxQueryTime").Return(20*time.Millisecond, nil)
|
||||
})
|
||||
|
||||
meta := getMeta(t, func(mockMeta *MockResponseMeta) {
|
||||
mockMeta.On("GetIndex").Return(uint64(1))
|
||||
})
|
||||
|
||||
fsm := getFSM(t, func(mockFSM *MockFSMServer) {
|
||||
mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second)
|
||||
mockFSM.On("IncrementBlockingQueries").Return(uint64(1))
|
||||
mockFSM.On("DecrementBlockingQueries").Return(uint64(1))
|
||||
})
|
||||
|
||||
var calls int
|
||||
fn := func(_ memdb.WatchSet, _ *state.Store) error {
|
||||
if calls == 0 {
|
||||
meta.On("GetIndex").Return(uint64(1))
|
||||
|
||||
fsm.GetState().Abandon()
|
||||
}
|
||||
calls++
|
||||
return nil
|
||||
}
|
||||
err := Query(fsm, opts, meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, calls)
|
||||
})
|
||||
|
||||
t.Run("non-blocking query for item that does not exist", func(t *testing.T) {
|
||||
opts := getOpts(t, func(options *MockRequestOptions) {
|
||||
options.On("GetMinQueryIndex").Return(uint64(3))
|
||||
options.On("GetMaxQueryTime").Return(20*time.Millisecond, nil)
|
||||
})
|
||||
|
||||
meta := getMeta(t, func(mockMeta *MockResponseMeta) {
|
||||
mockMeta.On("GetIndex").Return(uint64(1))
|
||||
})
|
||||
|
||||
fsm := getFSM(t, func(mockFSM *MockFSMServer) {
|
||||
mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second)
|
||||
mockFSM.On("IncrementBlockingQueries").Return(uint64(1))
|
||||
mockFSM.On("DecrementBlockingQueries").Return(uint64(1))
|
||||
})
|
||||
calls := 0
|
||||
fn := func(_ memdb.WatchSet, _ *state.Store) error {
|
||||
calls++
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
err := Query(fsm, opts, meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, calls)
|
||||
})
|
||||
|
||||
t.Run("blocking query for item that does not exist", func(t *testing.T) {
|
||||
opts := getOpts(t, func(options *MockRequestOptions) {
|
||||
options.On("GetMinQueryIndex").Return(uint64(3))
|
||||
options.On("GetMaxQueryTime").Return(100*time.Millisecond, nil)
|
||||
})
|
||||
|
||||
meta := getMeta(t, func(mockMeta *MockResponseMeta) {
|
||||
mockMeta.On("GetIndex").Return(uint64(1))
|
||||
})
|
||||
|
||||
fsm := getFSM(t, func(mockFSM *MockFSMServer) {
|
||||
mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second)
|
||||
mockFSM.On("IncrementBlockingQueries").Return(uint64(1))
|
||||
mockFSM.On("DecrementBlockingQueries").Return(uint64(1))
|
||||
})
|
||||
calls := 0
|
||||
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||
calls++
|
||||
if calls == 1 {
|
||||
meta.On("GetIndex").Return(uint64(3))
|
||||
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
ws.Add(ch)
|
||||
return ErrNotFound
|
||||
}
|
||||
meta.On("GetIndex").Return(uint64(5))
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
err := Query(fsm, opts, meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, calls)
|
||||
})
|
||||
|
||||
t.Run("blocking query for item that existed and is removed", func(t *testing.T) {
|
||||
opts := getOpts(t, func(options *MockRequestOptions) {
|
||||
options.On("GetMinQueryIndex").Return(uint64(3))
|
||||
// this query taks 1.002 sceonds locally so setting the timeout to 2 seconds
|
||||
options.On("GetMaxQueryTime").Return(2*time.Second, nil)
|
||||
})
|
||||
|
||||
meta := getMeta(t, func(mockMeta *MockResponseMeta) {
|
||||
mockMeta.On("GetIndex").Return(uint64(3))
|
||||
})
|
||||
|
||||
fsm := getFSM(t, func(mockFSM *MockFSMServer) {
|
||||
mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second)
|
||||
mockFSM.On("IncrementBlockingQueries").Return(uint64(1))
|
||||
mockFSM.On("DecrementBlockingQueries").Return(uint64(1))
|
||||
})
|
||||
calls := 0
|
||||
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||
calls++
|
||||
if calls == 1 {
|
||||
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
ws.Add(ch)
|
||||
return nil
|
||||
}
|
||||
meta = getMeta(t, func(mockMeta *MockResponseMeta) {
|
||||
meta.On("GetIndex").Return(uint64(5))
|
||||
})
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
require.NoError(t, Query(fsm, opts, meta, fn))
|
||||
queryDuration := time.Since(start)
|
||||
maxQueryDuration, err := opts.GetMaxQueryTime()
|
||||
require.NoError(t, err)
|
||||
require.True(t, queryDuration < maxQueryDuration, fmt.Sprintf("query timed out - queryDuration: %v, maxQueryDuration: %v", queryDuration, maxQueryDuration))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, calls)
|
||||
})
|
||||
|
||||
t.Run("blocking query for non-existent item that is created", func(t *testing.T) {
|
||||
opts := getOpts(t, func(options *MockRequestOptions) {
|
||||
options.On("GetMinQueryIndex").Return(uint64(3))
|
||||
// this query taks 1.002 sceonds locally so setting the timeout to 2 seconds
|
||||
options.On("GetMaxQueryTime").Return(2*time.Second, nil)
|
||||
})
|
||||
|
||||
meta := getMeta(t, func(mockMeta *MockResponseMeta) {
|
||||
mockMeta.On("GetIndex").Return(uint64(3))
|
||||
})
|
||||
|
||||
fsm := getFSM(t, func(mockFSM *MockFSMServer) {
|
||||
mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second)
|
||||
mockFSM.On("IncrementBlockingQueries").Return(uint64(1))
|
||||
mockFSM.On("DecrementBlockingQueries").Return(uint64(1))
|
||||
})
|
||||
calls := 0
|
||||
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||
calls++
|
||||
if calls == 1 {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
ws.Add(ch)
|
||||
return ErrNotFound
|
||||
}
|
||||
meta = getMeta(t, func(mockMeta *MockResponseMeta) {
|
||||
meta.On("GetIndex").Return(uint64(5))
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
require.NoError(t, Query(fsm, opts, meta, fn))
|
||||
queryDuration := time.Since(start)
|
||||
maxQueryDuration, err := opts.GetMaxQueryTime()
|
||||
require.NoError(t, err)
|
||||
require.True(t, queryDuration < maxQueryDuration, fmt.Sprintf("query timed out - queryDuration: %v, maxQueryDuration: %v", queryDuration, maxQueryDuration))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, calls)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
// Code generated by mockery v2.32.4. DO NOT EDIT.
|
||||
|
||||
package blockingquery
|
||||
|
||||
import (
|
||||
time "time"
|
||||
|
||||
state "github.com/hashicorp/consul/agent/consul/state"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// MockFSMServer is an autogenerated mock type for the FSMServer type
|
||||
type MockFSMServer struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// ConsistentRead provides a mock function with given fields:
|
||||
func (_m *MockFSMServer) ConsistentRead() error {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func() error); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// DecrementBlockingQueries provides a mock function with given fields:
|
||||
func (_m *MockFSMServer) DecrementBlockingQueries() uint64 {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 uint64
|
||||
if rf, ok := ret.Get(0).(func() uint64); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(uint64)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// GetShutdownChannel provides a mock function with given fields:
|
||||
func (_m *MockFSMServer) GetShutdownChannel() chan struct{} {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 chan struct{}
|
||||
if rf, ok := ret.Get(0).(func() chan struct{}); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(chan struct{})
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// GetState provides a mock function with given fields:
|
||||
func (_m *MockFSMServer) GetState() *state.Store {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 *state.Store
|
||||
if rf, ok := ret.Get(0).(func() *state.Store); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*state.Store)
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// IncrementBlockingQueries provides a mock function with given fields:
|
||||
func (_m *MockFSMServer) IncrementBlockingQueries() uint64 {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 uint64
|
||||
if rf, ok := ret.Get(0).(func() uint64); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(uint64)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// RPCQueryTimeout provides a mock function with given fields: _a0
|
||||
func (_m *MockFSMServer) RPCQueryTimeout(_a0 time.Duration) time.Duration {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 time.Duration
|
||||
if rf, ok := ret.Get(0).(func(time.Duration) time.Duration); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
r0 = ret.Get(0).(time.Duration)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// SetQueryMeta provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockFSMServer) SetQueryMeta(_a0 ResponseMeta, _a1 string) {
|
||||
_m.Called(_a0, _a1)
|
||||
}
|
||||
|
||||
// NewMockFSMServer creates a new instance of MockFSMServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockFSMServer(t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}) *MockFSMServer {
|
||||
mock := &MockFSMServer{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
// Code generated by mockery v2.32.4. DO NOT EDIT.
|
||||
|
||||
package blockingquery
|
||||
|
||||
import (
|
||||
time "time"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// MockRequestOptions is an autogenerated mock type for the RequestOptions type
|
||||
type MockRequestOptions struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// GetMaxQueryTime provides a mock function with given fields:
|
||||
func (_m *MockRequestOptions) GetMaxQueryTime() (time.Duration, error) {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 time.Duration
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func() (time.Duration, error)); ok {
|
||||
return rf()
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func() time.Duration); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(time.Duration)
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func() error); ok {
|
||||
r1 = rf()
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// GetMinQueryIndex provides a mock function with given fields:
|
||||
func (_m *MockRequestOptions) GetMinQueryIndex() uint64 {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 uint64
|
||||
if rf, ok := ret.Get(0).(func() uint64); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(uint64)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// GetRequireConsistent provides a mock function with given fields:
|
||||
func (_m *MockRequestOptions) GetRequireConsistent() bool {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func() bool); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// GetToken provides a mock function with given fields:
|
||||
func (_m *MockRequestOptions) GetToken() string {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 string
|
||||
if rf, ok := ret.Get(0).(func() string); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(string)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// NewMockRequestOptions creates a new instance of MockRequestOptions. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockRequestOptions(t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}) *MockRequestOptions {
|
||||
mock := &MockRequestOptions{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
// Code generated by mockery v2.32.4. DO NOT EDIT.
|
||||
|
||||
package blockingquery
|
||||
|
||||
import (
|
||||
time "time"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// MockResponseMeta is an autogenerated mock type for the ResponseMeta type
|
||||
type MockResponseMeta struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// GetIndex provides a mock function with given fields:
|
||||
func (_m *MockResponseMeta) GetIndex() uint64 {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 uint64
|
||||
if rf, ok := ret.Get(0).(func() uint64); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(uint64)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// SetIndex provides a mock function with given fields: _a0
|
||||
func (_m *MockResponseMeta) SetIndex(_a0 uint64) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
|
||||
// SetKnownLeader provides a mock function with given fields: _a0
|
||||
func (_m *MockResponseMeta) SetKnownLeader(_a0 bool) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
|
||||
// SetLastContact provides a mock function with given fields: _a0
|
||||
func (_m *MockResponseMeta) SetLastContact(_a0 time.Duration) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
|
||||
// SetResultsFilteredByACLs provides a mock function with given fields: _a0
|
||||
func (_m *MockResponseMeta) SetResultsFilteredByACLs(_a0 bool) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
|
||||
// NewMockResponseMeta creates a new instance of MockResponseMeta. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockResponseMeta(t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}) *MockResponseMeta {
|
||||
mock := &MockResponseMeta{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
|
@ -24,7 +24,6 @@ import (
|
|||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
|
@ -232,136 +231,12 @@ func (m *MockSink) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// TestServer_blockingQuery tests authenticated and unauthenticated calls. The
|
||||
// other blocking query tests reside in blockingquery_test.go in the blockingquery package.
|
||||
func TestServer_blockingQuery(t *testing.T) {
|
||||
t.Parallel()
|
||||
_, s := testServerWithConfig(t)
|
||||
|
||||
// Perform a non-blocking query. Note that it's significant that the meta has
|
||||
// a zero index in response - the implied opts.MinQueryIndex is also zero but
|
||||
// this should not block still.
|
||||
t.Run("non-blocking query", func(t *testing.T) {
|
||||
var opts structs.QueryOptions
|
||||
var meta structs.QueryMeta
|
||||
var calls int
|
||||
fn := func(_ memdb.WatchSet, _ *state.Store) error {
|
||||
calls++
|
||||
return nil
|
||||
}
|
||||
err := s.blockingQuery(&opts, &meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, calls)
|
||||
})
|
||||
|
||||
// Perform a blocking query that gets woken up and loops around once.
|
||||
t.Run("blocking query - single loop", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{
|
||||
MinQueryIndex: 3,
|
||||
}
|
||||
var meta structs.QueryMeta
|
||||
var calls int
|
||||
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||
if calls == 0 {
|
||||
meta.Index = 3
|
||||
|
||||
fakeCh := make(chan struct{})
|
||||
close(fakeCh)
|
||||
ws.Add(fakeCh)
|
||||
} else {
|
||||
meta.Index = 4
|
||||
}
|
||||
calls++
|
||||
return nil
|
||||
}
|
||||
err := s.blockingQuery(&opts, &meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, calls)
|
||||
})
|
||||
|
||||
// Perform a blocking query that returns a zero index from blocking func (e.g.
|
||||
// no state yet). This should still return an empty response immediately, but
|
||||
// with index of 1 and then block on the next attempt. In one sense zero index
|
||||
// is not really a valid response from a state method that is not an error but
|
||||
// in practice a lot of state store operations do return it unless they
|
||||
// explicitly special checks to turn 0 into 1. Often this is not caught or
|
||||
// covered by tests but eventually when hit in the wild causes blocking
|
||||
// clients to busy loop and burn CPU. This test ensure that blockingQuery
|
||||
// systematically does the right thing to prevent future bugs like that.
|
||||
t.Run("blocking query with 0 modifyIndex from state func", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{
|
||||
MinQueryIndex: 0,
|
||||
}
|
||||
var meta structs.QueryMeta
|
||||
var calls int
|
||||
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||
if opts.MinQueryIndex > 0 {
|
||||
// If client requested blocking, block forever. This is simulating
|
||||
// waiting for the watched resource to be initialized/written to giving
|
||||
// it a non-zero index. Note the timeout on the query options is relied
|
||||
// on to stop the test taking forever.
|
||||
fakeCh := make(chan struct{})
|
||||
ws.Add(fakeCh)
|
||||
}
|
||||
meta.Index = 0
|
||||
calls++
|
||||
return nil
|
||||
}
|
||||
require.NoError(t, s.blockingQuery(&opts, &meta, fn))
|
||||
assert.Equal(t, 1, calls)
|
||||
assert.Equal(t, uint64(1), meta.Index,
|
||||
"expect fake index of 1 to force client to block on next update")
|
||||
|
||||
// Simulate client making next request
|
||||
opts.MinQueryIndex = 1
|
||||
opts.MaxQueryTime = 20 * time.Millisecond // Don't wait too long
|
||||
|
||||
// This time we should block even though the func returns index 0 still
|
||||
t0 := time.Now()
|
||||
require.NoError(t, s.blockingQuery(&opts, &meta, fn))
|
||||
t1 := time.Now()
|
||||
assert.Equal(t, 2, calls)
|
||||
assert.Equal(t, uint64(1), meta.Index,
|
||||
"expect fake index of 1 to force client to block on next update")
|
||||
assert.True(t, t1.Sub(t0) > 20*time.Millisecond,
|
||||
"should have actually blocked waiting for timeout")
|
||||
|
||||
})
|
||||
|
||||
// Perform a query that blocks and gets interrupted when the state store
|
||||
// is abandoned.
|
||||
t.Run("blocking query interrupted by abandonCh", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{
|
||||
MinQueryIndex: 3,
|
||||
}
|
||||
var meta structs.QueryMeta
|
||||
var calls int
|
||||
fn := func(_ memdb.WatchSet, _ *state.Store) error {
|
||||
if calls == 0 {
|
||||
meta.Index = 3
|
||||
|
||||
snap, err := s.fsm.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer snap.Release()
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
sink := &MockSink{buf, false}
|
||||
if err := snap.Persist(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if err := s.fsm.Restore(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
calls++
|
||||
return nil
|
||||
}
|
||||
err := s.blockingQuery(&opts, &meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, calls)
|
||||
})
|
||||
|
||||
t.Run("ResultsFilteredByACLs is reset for unauthenticated calls", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{
|
||||
Token: "",
|
||||
|
@ -394,93 +269,6 @@ func TestServer_blockingQuery(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.True(t, meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be honored for authenticated calls")
|
||||
})
|
||||
|
||||
t.Run("non-blocking query for item that does not exist", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{}
|
||||
meta := structs.QueryMeta{}
|
||||
calls := 0
|
||||
fn := func(_ memdb.WatchSet, _ *state.Store) error {
|
||||
calls++
|
||||
return errNotFound
|
||||
}
|
||||
|
||||
err := s.blockingQuery(&opts, &meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, calls)
|
||||
})
|
||||
|
||||
t.Run("blocking query for item that does not exist", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
|
||||
meta := structs.QueryMeta{}
|
||||
calls := 0
|
||||
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||
calls++
|
||||
if calls == 1 {
|
||||
meta.Index = 3
|
||||
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
ws.Add(ch)
|
||||
return errNotFound
|
||||
}
|
||||
meta.Index = 5
|
||||
return errNotFound
|
||||
}
|
||||
|
||||
err := s.blockingQuery(&opts, &meta, fn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, calls)
|
||||
})
|
||||
|
||||
t.Run("blocking query for item that existed and is removed", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
|
||||
meta := structs.QueryMeta{}
|
||||
calls := 0
|
||||
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||
calls++
|
||||
if calls == 1 {
|
||||
meta.Index = 3
|
||||
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
ws.Add(ch)
|
||||
return nil
|
||||
}
|
||||
meta.Index = 5
|
||||
return errNotFound
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
err := s.blockingQuery(&opts, &meta, fn)
|
||||
require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, calls)
|
||||
})
|
||||
|
||||
t.Run("blocking query for non-existent item that is created", func(t *testing.T) {
|
||||
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
|
||||
meta := structs.QueryMeta{}
|
||||
calls := 0
|
||||
fn := func(ws memdb.WatchSet, _ *state.Store) error {
|
||||
calls++
|
||||
if calls == 1 {
|
||||
meta.Index = 3
|
||||
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
ws.Add(ch)
|
||||
return errNotFound
|
||||
}
|
||||
meta.Index = 5
|
||||
return nil
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
err := s.blockingQuery(&opts, &meta, fn)
|
||||
require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, calls)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRPC_ReadyForConsistentReads(t *testing.T) {
|
||||
|
|
|
@ -556,6 +556,8 @@ func TestHTTP_Peering_Read(t *testing.T) {
|
|||
_, err = a.rpcClientPeering.PeeringWrite(ctx, bar)
|
||||
require.NoError(t, err)
|
||||
|
||||
var lastIndex uint64
|
||||
|
||||
t.Run("return foo", func(t *testing.T) {
|
||||
req, err := http.NewRequest("GET", "/v1/peering/foo", nil)
|
||||
require.NoError(t, err)
|
||||
|
@ -578,6 +580,8 @@ func TestHTTP_Peering_Read(t *testing.T) {
|
|||
|
||||
require.Equal(t, 0, len(apiResp.StreamStatus.ImportedServices))
|
||||
require.Equal(t, 0, len(apiResp.StreamStatus.ExportedServices))
|
||||
|
||||
lastIndex = getIndex(t, resp)
|
||||
})
|
||||
|
||||
t.Run("not found", func(t *testing.T) {
|
||||
|
@ -588,6 +592,43 @@ func TestHTTP_Peering_Read(t *testing.T) {
|
|||
require.Equal(t, http.StatusNotFound, resp.Code)
|
||||
require.Equal(t, "Peering not found for \"baz\"", resp.Body.String())
|
||||
})
|
||||
|
||||
const timeout = 5 * time.Second
|
||||
t.Run("read blocking query result", func(t *testing.T) {
|
||||
var (
|
||||
// out and resp are not safe to read until reading from errCh
|
||||
out api.Peering
|
||||
resp = httptest.NewRecorder()
|
||||
errCh = make(chan error, 1)
|
||||
)
|
||||
go func() {
|
||||
url := fmt.Sprintf("/v1/peering/foo?index=%d&wait=%s", lastIndex, timeout)
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
a.srv.h.ServeHTTP(resp, req)
|
||||
require.Equal(t, http.StatusOK, resp.Code)
|
||||
err = json.NewDecoder(resp.Body).Decode(&out)
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// update peering
|
||||
foo.Peering.Meta["spooky-key"] = "boo!"
|
||||
_, err = a.rpcClientPeering.PeeringWrite(ctx, foo)
|
||||
require.NoError(t, err)
|
||||
|
||||
if err := <-errCh; err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, "boo!", out.Meta["spooky-key"])
|
||||
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_Peering_Delete(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue