mirror of https://github.com/status-im/consul.git
* Fix theoretical cache collision bug if/when we use more cache types with same result type * Generalized fix for blocking query handling when state store methods return zero index * Refactor test retry to only affect CI * Undo make file merge * Add hint to error message returned to end-user requests if Connect is not enabled when they try to request cert * Explicit error for Roots endpoint if connect is disabled * Fix tests that were asserting old behaviour
This commit is contained in:
parent
5635227fa6
commit
8cbeb29e73
|
@ -2304,19 +2304,15 @@ func TestAgent_Token(t *testing.T) {
|
||||||
func TestAgentConnectCARoots_empty(t *testing.T) {
|
func TestAgentConnectCARoots_empty(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
a := NewTestAgent(t.Name(), "connect { enabled = false }")
|
a := NewTestAgent(t.Name(), "connect { enabled = false }")
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/roots", nil)
|
req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/roots", nil)
|
||||||
resp := httptest.NewRecorder()
|
resp := httptest.NewRecorder()
|
||||||
obj, err := a.srv.AgentConnectCARoots(resp, req)
|
_, err := a.srv.AgentConnectCARoots(resp, req)
|
||||||
require.NoError(err)
|
require.Error(err)
|
||||||
|
require.Contains(err.Error(), "Connect must be enabled")
|
||||||
value := obj.(structs.IndexedCARoots)
|
|
||||||
assert.Equal(value.ActiveRootID, "")
|
|
||||||
assert.Len(value.Roots, 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAgentConnectCARoots_list(t *testing.T) {
|
func TestAgentConnectCARoots_list(t *testing.T) {
|
||||||
|
|
|
@ -108,7 +108,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
||||||
return result, errors.New("invalid RootCA response type")
|
return result, errors.New("invalid RootCA response type")
|
||||||
}
|
}
|
||||||
if roots.TrustDomain == "" {
|
if roots.TrustDomain == "" {
|
||||||
return result, errors.New("cluster has no CA bootstrapped")
|
return result, errors.New("cluster has no CA bootstrapped yet")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build the service ID
|
// Build the service ID
|
||||||
|
|
|
@ -190,7 +190,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the actual key for our entry
|
// Get the actual key for our entry
|
||||||
key := c.entryKey(&info)
|
key := c.entryKey(t, &info)
|
||||||
|
|
||||||
// First time through
|
// First time through
|
||||||
first := true
|
first := true
|
||||||
|
@ -278,8 +278,8 @@ RETRY_GET:
|
||||||
|
|
||||||
// entryKey returns the key for the entry in the cache. See the note
|
// entryKey returns the key for the entry in the cache. See the note
|
||||||
// about the entry key format in the structure docs for Cache.
|
// about the entry key format in the structure docs for Cache.
|
||||||
func (c *Cache) entryKey(r *RequestInfo) string {
|
func (c *Cache) entryKey(t string, r *RequestInfo) string {
|
||||||
return fmt.Sprintf("%s/%s/%s", r.Datacenter, r.Token, r.Key)
|
return fmt.Sprintf("%s/%s/%s/%s", t, r.Datacenter, r.Token, r.Key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetch triggers a new background fetch for the given Request. If a
|
// fetch triggers a new background fetch for the given Request. If a
|
||||||
|
|
|
@ -708,6 +708,54 @@ func TestCacheGet_expireResetGet(t *testing.T) {
|
||||||
typ.AssertExpectations(t)
|
typ.AssertExpectations(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test a Get with a request that returns the same cache key across
|
||||||
|
// two different "types" returns two separate results.
|
||||||
|
func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
typ := TestType(t)
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
typ2 := TestType(t)
|
||||||
|
defer typ2.AssertExpectations(t)
|
||||||
|
|
||||||
|
c := TestCache(t)
|
||||||
|
c.RegisterType("t", typ, nil)
|
||||||
|
c.RegisterType("t2", typ2, nil)
|
||||||
|
|
||||||
|
// Configure the types
|
||||||
|
typ.Static(FetchResult{Value: 100}, nil)
|
||||||
|
typ2.Static(FetchResult{Value: 200}, nil)
|
||||||
|
|
||||||
|
// Get, should fetch
|
||||||
|
req := TestRequest(t, RequestInfo{Key: "foo"})
|
||||||
|
result, meta, err := c.Get("t", req)
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(100, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
|
// Get from t2 with same key, should fetch
|
||||||
|
req = TestRequest(t, RequestInfo{Key: "foo"})
|
||||||
|
result, meta, err = c.Get("t2", req)
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(200, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
|
// Get from t again with same key, should cache
|
||||||
|
req = TestRequest(t, RequestInfo{Key: "foo"})
|
||||||
|
result, meta, err = c.Get("t", req)
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(100, result)
|
||||||
|
require.True(meta.Hit)
|
||||||
|
|
||||||
|
// Sleep a tiny bit just to let maybe some background calls happen
|
||||||
|
// then verify that we still only got the one call
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
typ.AssertExpectations(t)
|
||||||
|
typ2.AssertExpectations(t)
|
||||||
|
}
|
||||||
|
|
||||||
// Test that Get partitions the caches based on DC so two equivalent requests
|
// Test that Get partitions the caches based on DC so two equivalent requests
|
||||||
// to different datacenters are automatically cached even if their keys are
|
// to different datacenters are automatically cached even if their keys are
|
||||||
// the same.
|
// the same.
|
||||||
|
|
|
@ -7,6 +7,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/connect"
|
"github.com/hashicorp/consul/agent/connect"
|
||||||
ca "github.com/hashicorp/consul/agent/connect/ca"
|
ca "github.com/hashicorp/consul/agent/connect/ca"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -16,18 +18,15 @@ import (
|
||||||
func TestConnectCARoots_empty(t *testing.T) {
|
func TestConnectCARoots_empty(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
assert := assert.New(t)
|
require := require.New(t)
|
||||||
a := NewTestAgent(t.Name(), "connect { enabled = false }")
|
a := NewTestAgent(t.Name(), "connect { enabled = false }")
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
req, _ := http.NewRequest("GET", "/v1/connect/ca/roots", nil)
|
req, _ := http.NewRequest("GET", "/v1/connect/ca/roots", nil)
|
||||||
resp := httptest.NewRecorder()
|
resp := httptest.NewRecorder()
|
||||||
obj, err := a.srv.ConnectCARoots(resp, req)
|
_, err := a.srv.ConnectCARoots(resp, req)
|
||||||
assert.Nil(err)
|
require.Error(err)
|
||||||
|
require.Contains(err.Error(), "Connect must be enabled")
|
||||||
value := obj.(structs.IndexedCARoots)
|
|
||||||
assert.Equal(value.ActiveRootID, "")
|
|
||||||
assert.Len(value.Roots, 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConnectCARoots_list(t *testing.T) {
|
func TestConnectCARoots_list(t *testing.T) {
|
||||||
|
|
|
@ -217,6 +217,11 @@ func (s *ConnectCA) Roots(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Exit early if Connect hasn't been enabled.
|
||||||
|
if !s.srv.config.ConnectEnabled {
|
||||||
|
return ErrConnectNotEnabled
|
||||||
|
}
|
||||||
|
|
||||||
// Load the ClusterID to generate TrustDomain. We do this outside the loop
|
// Load the ClusterID to generate TrustDomain. We do this outside the loop
|
||||||
// since by definition this value should be immutable once set for lifetime of
|
// since by definition this value should be immutable once set for lifetime of
|
||||||
// the cluster so we don't need to look it up more than once. We also don't
|
// the cluster so we don't need to look it up more than once. We also don't
|
||||||
|
@ -230,6 +235,7 @@ func (s *ConnectCA) Roots(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check CA is actually bootstrapped...
|
// Check CA is actually bootstrapped...
|
||||||
if config != nil {
|
if config != nil {
|
||||||
// Build TrustDomain based on the ClusterID stored.
|
// Build TrustDomain based on the ClusterID stored.
|
||||||
|
|
|
@ -415,7 +415,18 @@ RUN_QUERY:
|
||||||
|
|
||||||
// Block up to the timeout if we didn't see anything fresh.
|
// Block up to the timeout if we didn't see anything fresh.
|
||||||
err := fn(ws, state)
|
err := fn(ws, state)
|
||||||
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
|
// Note we check queryOpts.MinQueryIndex is greater than zero to determine if
|
||||||
|
// blocking was requested by client, NOT meta.Index since the state function
|
||||||
|
// might return zero if something is not initialised and care wasn't taken to
|
||||||
|
// handle that special case (in practice this happened a lot so fixing it
|
||||||
|
// systematically here beats trying to remember to add zero checks in every
|
||||||
|
// state method). We also need to ensure that unless there is an error, we
|
||||||
|
// return an index > 0 otherwise the client will never block and burn CPU and
|
||||||
|
// requests.
|
||||||
|
if err == nil && queryMeta.Index < 1 {
|
||||||
|
queryMeta.Index = 1
|
||||||
|
}
|
||||||
|
if err == nil && queryOpts.MinQueryIndex > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
|
||||||
if expired := ws.Watch(timeout.C); !expired {
|
if expired := ws.Watch(timeout.C); !expired {
|
||||||
// If a restore may have woken us up then bail out from
|
// If a restore may have woken us up then bail out from
|
||||||
// the query immediately. This is slightly race-ey since
|
// the query immediately. This is slightly race-ey since
|
||||||
|
|
|
@ -12,6 +12,8 @@ import (
|
||||||
"github.com/hashicorp/consul/testutil/retry"
|
"github.com/hashicorp/consul/testutil/retry"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRPC_NoLeader_Fail(t *testing.T) {
|
func TestRPC_NoLeader_Fail(t *testing.T) {
|
||||||
|
@ -101,7 +103,12 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer s.Shutdown()
|
defer s.Shutdown()
|
||||||
|
|
||||||
// Perform a non-blocking query.
|
require := require.New(t)
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
// Perform a non-blocking query. Note that it's significant that the meta has
|
||||||
|
// a zero index in response - the implied opts.MinQueryIndex is also zero but
|
||||||
|
// this should not block still.
|
||||||
{
|
{
|
||||||
var opts structs.QueryOptions
|
var opts structs.QueryOptions
|
||||||
var meta structs.QueryMeta
|
var meta structs.QueryMeta
|
||||||
|
@ -146,6 +153,55 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Perform a blocking query that returns a zero index from blocking func (e.g.
|
||||||
|
// no state yet). This should still return an empty response immediately, but
|
||||||
|
// with index of 1 and then block on the next attempt. In one sense zero index
|
||||||
|
// is not really a valid response from a state method that is not an error but
|
||||||
|
// in practice a lot of state store operations do return it unless they
|
||||||
|
// explicitly special checks to turn 0 into 1. Often this is not caught or
|
||||||
|
// covered by tests but eventually when hit in the wild causes blocking
|
||||||
|
// clients to busy loop and burn CPU. This test ensure that blockingQuery
|
||||||
|
// systematically does the right thing to prevent future bugs like that.
|
||||||
|
{
|
||||||
|
opts := structs.QueryOptions{
|
||||||
|
MinQueryIndex: 0,
|
||||||
|
}
|
||||||
|
var meta structs.QueryMeta
|
||||||
|
var calls int
|
||||||
|
fn := func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
|
if opts.MinQueryIndex > 0 {
|
||||||
|
// If client requested blocking, block forever. This is simulating
|
||||||
|
// waiting for the watched resource to be initialized/written to giving
|
||||||
|
// it a non-zero index. Note the timeout on the query options is relied
|
||||||
|
// on to stop the test taking forever.
|
||||||
|
fakeCh := make(chan struct{})
|
||||||
|
ws.Add(fakeCh)
|
||||||
|
}
|
||||||
|
meta.Index = 0
|
||||||
|
calls++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
require.NoError(s.blockingQuery(&opts, &meta, fn))
|
||||||
|
assert.Equal(1, calls)
|
||||||
|
assert.Equal(uint64(1), meta.Index,
|
||||||
|
"expect fake index of 1 to force client to block on next update")
|
||||||
|
|
||||||
|
// Simulate client making next request
|
||||||
|
opts.MinQueryIndex = 1
|
||||||
|
opts.MaxQueryTime = 20 * time.Millisecond // Don't wait too long
|
||||||
|
|
||||||
|
// This time we should block even though the func returns index 0 still
|
||||||
|
t0 := time.Now()
|
||||||
|
require.NoError(s.blockingQuery(&opts, &meta, fn))
|
||||||
|
t1 := time.Now()
|
||||||
|
assert.Equal(2, calls)
|
||||||
|
assert.Equal(uint64(1), meta.Index,
|
||||||
|
"expect fake index of 1 to force client to block on next update")
|
||||||
|
assert.True(t1.Sub(t0) > 20*time.Millisecond,
|
||||||
|
"should have actually blocked waiting for timeout")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Perform a query that blocks and gets interrupted when the state store
|
// Perform a query that blocks and gets interrupted when the state store
|
||||||
// is abandoned.
|
// is abandoned.
|
||||||
{
|
{
|
||||||
|
|
|
@ -1050,10 +1050,9 @@ func TestAPI_AgentConnectCARoots_empty(t *testing.T) {
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
agent := c.Agent()
|
agent := c.Agent()
|
||||||
list, meta, err := agent.ConnectCARoots(nil)
|
_, _, err := agent.ConnectCARoots(nil)
|
||||||
require.NoError(err)
|
require.Error(err)
|
||||||
require.Equal(uint64(1), meta.LastIndex)
|
require.Contains(err.Error(), "Connect must be enabled")
|
||||||
require.Len(list.Roots, 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAPI_AgentConnectCARoots_list(t *testing.T) {
|
func TestAPI_AgentConnectCARoots_list(t *testing.T) {
|
||||||
|
|
|
@ -22,11 +22,10 @@ func TestAPI_ConnectCARoots_empty(t *testing.T) {
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
connect := c.Connect()
|
connect := c.Connect()
|
||||||
list, meta, err := connect.CARoots(nil)
|
_, _, err := connect.CARoots(nil)
|
||||||
require.NoError(err)
|
|
||||||
require.Equal(uint64(1), meta.LastIndex)
|
require.Error(err)
|
||||||
require.Len(list.Roots, 0)
|
require.Contains(err.Error(), "Connect must be enabled")
|
||||||
require.Empty(list.TrustDomain)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAPI_ConnectCARoots_list(t *testing.T) {
|
func TestAPI_ConnectCARoots_list(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue