mirror of https://github.com/status-im/consul.git
Cache Type and client plumbing for Health streaming
This commit is contained in:
parent
1d0f3c4853
commit
d97412ce4c
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/ae"
|
"github.com/hashicorp/consul/agent/ae"
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
|
@ -301,6 +302,9 @@ type Agent struct {
|
||||||
// Envoy.
|
// Envoy.
|
||||||
grpcServer *grpc.Server
|
grpcServer *grpc.Server
|
||||||
|
|
||||||
|
// streamClient is the client to use for streaming gRPC endpoints.
|
||||||
|
streamClient agentpb.ConsulClient
|
||||||
|
|
||||||
// tlsConfigurator is the central instance to provide a *tls.Config
|
// tlsConfigurator is the central instance to provide a *tls.Config
|
||||||
// based on the current consul configuration.
|
// based on the current consul configuration.
|
||||||
tlsConfigurator *tlsutil.Configurator
|
tlsConfigurator *tlsutil.Configurator
|
||||||
|
@ -464,6 +468,16 @@ func (a *Agent) Start() error {
|
||||||
a.State.Delegate = a.delegate
|
a.State.Delegate = a.delegate
|
||||||
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
|
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
|
||||||
|
|
||||||
|
if a.config.EnableBackendStreaming {
|
||||||
|
// Set up the gRPC client for the cache.
|
||||||
|
conn, err := a.delegate.GRPCConn()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
a.streamClient = agentpb.NewConsulClient(conn)
|
||||||
|
}
|
||||||
|
|
||||||
// Register the cache. We do this much later so the delegate is
|
// Register the cache. We do this much later so the delegate is
|
||||||
// populated from above.
|
// populated from above.
|
||||||
a.registerCache()
|
a.registerCache()
|
||||||
|
@ -4336,6 +4350,26 @@ func (a *Agent) registerCache() {
|
||||||
RefreshTimer: 0 * time.Second,
|
RefreshTimer: 0 * time.Second,
|
||||||
RefreshTimeout: 10 * time.Minute,
|
RefreshTimeout: 10 * time.Minute,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if a.config.EnableBackendStreaming {
|
||||||
|
a.cache.RegisterType(cachetype.StreamingHealthServicesName,
|
||||||
|
cachetype.NewStreamingHealthServices(a.streamClient, a.logger),
|
||||||
|
&cache.RegisterOptions{
|
||||||
|
// Note that although streaming is naturally "always connected" to
|
||||||
|
// backend, we still need background refresh mechanism to ensure the
|
||||||
|
// cache keeps watching the streaming subscription for changes and
|
||||||
|
// updating the cached value in a timely way (not just on request).
|
||||||
|
// Blocking queries should work without it since the presence of an
|
||||||
|
// actual blocking client would cause the cache to fetch/watch the
|
||||||
|
// subscription again, however non-blocking, ?cached queries might see
|
||||||
|
// needlessly stale cached results if there is no blocking client
|
||||||
|
// causing the cache to be repopulated when the subscription sees a
|
||||||
|
// change.
|
||||||
|
Refresh: true,
|
||||||
|
RefreshTimer: 0 * time.Second,
|
||||||
|
RefreshTimeout: 10 * time.Minute,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// LocalState returns the agent's local state
|
// LocalState returns the agent's local state
|
||||||
|
|
|
@ -16,7 +16,7 @@ func TestEventEnforceACL(t *testing.T) {
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
Name: "service health reg, blanket allow",
|
Name: "service health reg, blanket allow",
|
||||||
Event: TestEventServiceHealthRegister(t, 1, "web"),
|
Event: TestEventServiceHealthRegister(t, 100, 1, "web"),
|
||||||
ACLRules: `service_prefix "" {
|
ACLRules: `service_prefix "" {
|
||||||
policy = "read"
|
policy = "read"
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ func TestEventEnforceACL(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "service health reg, deny node",
|
Name: "service health reg, deny node",
|
||||||
Event: TestEventServiceHealthRegister(t, 1, "web"),
|
Event: TestEventServiceHealthRegister(t, 100, 1, "web"),
|
||||||
ACLRules: `service_prefix "" {
|
ACLRules: `service_prefix "" {
|
||||||
policy = "read"
|
policy = "read"
|
||||||
}`,
|
}`,
|
||||||
|
@ -35,7 +35,7 @@ func TestEventEnforceACL(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "service health reg, deny service",
|
Name: "service health reg, deny service",
|
||||||
Event: TestEventServiceHealthRegister(t, 1, "web"),
|
Event: TestEventServiceHealthRegister(t, 100, 1, "web"),
|
||||||
ACLRules: `node_prefix "" {
|
ACLRules: `node_prefix "" {
|
||||||
policy = "read"
|
policy = "read"
|
||||||
}`,
|
}`,
|
||||||
|
|
|
@ -46,8 +46,8 @@ func TestEventResumeStream(t testing.T, topic Topic, index uint64) Event {
|
||||||
// TestEventBatch returns a valid EventBatch event it assumes service health
|
// TestEventBatch returns a valid EventBatch event it assumes service health
|
||||||
// topic, an index of 100 and contains two health registrations.
|
// topic, an index of 100 and contains two health registrations.
|
||||||
func TestEventBatch(t testing.T) Event {
|
func TestEventBatch(t testing.T) Event {
|
||||||
e1 := TestEventServiceHealthRegister(t, 1, "web")
|
e1 := TestEventServiceHealthRegister(t, 100, 1, "web")
|
||||||
e2 := TestEventServiceHealthRegister(t, 1, "api")
|
e2 := TestEventServiceHealthRegister(t, 100, 1, "api")
|
||||||
return Event{
|
return Event{
|
||||||
Topic: Topic_ServiceHealth,
|
Topic: Topic_ServiceHealth,
|
||||||
Index: 100,
|
Index: 100,
|
||||||
|
@ -59,6 +59,24 @@ func TestEventBatch(t testing.T) Event {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestEventBatchWithEvents returns a valid EventBatch event containing the
|
||||||
|
// event arguments. The index and topic are taken from the first event.
|
||||||
|
func TestEventBatchWithEvents(t testing.T, evs ...Event) Event {
|
||||||
|
t.Helper()
|
||||||
|
if len(evs) < 1 {
|
||||||
|
t.Fatal("TestEventBatchWithEvents needs at least one event argument")
|
||||||
|
}
|
||||||
|
return Event{
|
||||||
|
Topic: evs[0].Topic,
|
||||||
|
Index: evs[0].Index,
|
||||||
|
Payload: &Event_EventBatch{
|
||||||
|
EventBatch: &EventBatch{
|
||||||
|
Events: EventBatchEventsFromEventSlice(evs),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestEventACLTokenUpdate returns a valid ACLToken event.
|
// TestEventACLTokenUpdate returns a valid ACLToken event.
|
||||||
func TestEventACLTokenUpdate(t testing.T) Event {
|
func TestEventACLTokenUpdate(t testing.T) Event {
|
||||||
return Event{
|
return Event{
|
||||||
|
@ -111,7 +129,7 @@ func TestEventACLRoleUpdate(t testing.T) Event {
|
||||||
// need that. nodeNum should be less than 64k to make the IP address look
|
// need that. nodeNum should be less than 64k to make the IP address look
|
||||||
// realistic. Any other changes can be made on the returned event to avoid
|
// realistic. Any other changes can be made on the returned event to avoid
|
||||||
// adding too many options to callers.
|
// adding too many options to callers.
|
||||||
func TestEventServiceHealthRegister(t testing.T, nodeNum int, svc string) Event {
|
func TestEventServiceHealthRegister(t testing.T, index uint64, nodeNum int, svc string) Event {
|
||||||
|
|
||||||
node := fmt.Sprintf("node%d", nodeNum)
|
node := fmt.Sprintf("node%d", nodeNum)
|
||||||
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
|
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
|
||||||
|
@ -120,7 +138,7 @@ func TestEventServiceHealthRegister(t testing.T, nodeNum int, svc string) Event
|
||||||
return Event{
|
return Event{
|
||||||
Topic: Topic_ServiceHealth,
|
Topic: Topic_ServiceHealth,
|
||||||
Key: svc,
|
Key: svc,
|
||||||
Index: 100,
|
Index: index,
|
||||||
Payload: &Event_ServiceHealth{
|
Payload: &Event_ServiceHealth{
|
||||||
ServiceHealth: &ServiceHealthUpdate{
|
ServiceHealth: &ServiceHealthUpdate{
|
||||||
Op: CatalogOp_Register,
|
Op: CatalogOp_Register,
|
||||||
|
@ -131,8 +149,8 @@ func TestEventServiceHealthRegister(t testing.T, nodeNum int, svc string) Event
|
||||||
Address: addr,
|
Address: addr,
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
RaftIndex: RaftIndex{
|
RaftIndex: RaftIndex{
|
||||||
CreateIndex: 100,
|
CreateIndex: index,
|
||||||
ModifyIndex: 100,
|
ModifyIndex: index,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Service: &NodeService{
|
Service: &NodeService{
|
||||||
|
@ -150,8 +168,8 @@ func TestEventServiceHealthRegister(t testing.T, nodeNum int, svc string) Event
|
||||||
},
|
},
|
||||||
EnterpriseMeta: &EnterpriseMeta{},
|
EnterpriseMeta: &EnterpriseMeta{},
|
||||||
RaftIndex: RaftIndex{
|
RaftIndex: RaftIndex{
|
||||||
CreateIndex: 100,
|
CreateIndex: index,
|
||||||
ModifyIndex: 100,
|
ModifyIndex: index,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Checks: []*HealthCheck{
|
Checks: []*HealthCheck{
|
||||||
|
@ -162,8 +180,8 @@ func TestEventServiceHealthRegister(t testing.T, nodeNum int, svc string) Event
|
||||||
Status: "passing",
|
Status: "passing",
|
||||||
EnterpriseMeta: &EnterpriseMeta{},
|
EnterpriseMeta: &EnterpriseMeta{},
|
||||||
RaftIndex: RaftIndex{
|
RaftIndex: RaftIndex{
|
||||||
CreateIndex: 100,
|
CreateIndex: index,
|
||||||
ModifyIndex: 100,
|
ModifyIndex: index,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
&HealthCheck{
|
&HealthCheck{
|
||||||
|
@ -176,8 +194,8 @@ func TestEventServiceHealthRegister(t testing.T, nodeNum int, svc string) Event
|
||||||
Status: "passing",
|
Status: "passing",
|
||||||
EnterpriseMeta: &EnterpriseMeta{},
|
EnterpriseMeta: &EnterpriseMeta{},
|
||||||
RaftIndex: RaftIndex{
|
RaftIndex: RaftIndex{
|
||||||
CreateIndex: 100,
|
CreateIndex: index,
|
||||||
ModifyIndex: 100,
|
ModifyIndex: index,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -194,14 +212,14 @@ func TestEventServiceHealthRegister(t testing.T, nodeNum int, svc string) Event
|
||||||
// need that. nodeNum should be less than 64k to make the IP address look
|
// need that. nodeNum should be less than 64k to make the IP address look
|
||||||
// realistic. Any other changes can be made on the returned event to avoid
|
// realistic. Any other changes can be made on the returned event to avoid
|
||||||
// adding too many options to callers.
|
// adding too many options to callers.
|
||||||
func TestEventServiceHealthDeregister(t testing.T, nodeNum int, svc string) Event {
|
func TestEventServiceHealthDeregister(t testing.T, index uint64, nodeNum int, svc string) Event {
|
||||||
|
|
||||||
node := fmt.Sprintf("node%d", nodeNum)
|
node := fmt.Sprintf("node%d", nodeNum)
|
||||||
|
|
||||||
return Event{
|
return Event{
|
||||||
Topic: Topic_ServiceHealth,
|
Topic: Topic_ServiceHealth,
|
||||||
Key: svc,
|
Key: svc,
|
||||||
Index: 100,
|
Index: index,
|
||||||
Payload: &Event_ServiceHealth{
|
Payload: &Event_ServiceHealth{
|
||||||
ServiceHealth: &ServiceHealthUpdate{
|
ServiceHealth: &ServiceHealthUpdate{
|
||||||
Op: CatalogOp_Deregister,
|
Op: CatalogOp_Deregister,
|
||||||
|
@ -224,7 +242,11 @@ func TestEventServiceHealthDeregister(t testing.T, nodeNum int, svc string) Even
|
||||||
},
|
},
|
||||||
EnterpriseMeta: &EnterpriseMeta{},
|
EnterpriseMeta: &EnterpriseMeta{},
|
||||||
RaftIndex: RaftIndex{
|
RaftIndex: RaftIndex{
|
||||||
// The original insertion index since a delete doesn't update this.
|
// The original insertion index since a delete doesn't update
|
||||||
|
// this. This magic value came from state store tests where we
|
||||||
|
// setup at index 10 and then mutate at index 100. It can be
|
||||||
|
// modified by the caller later and makes it easier than having
|
||||||
|
// yet another argument in the common case.
|
||||||
CreateIndex: 10,
|
CreateIndex: 10,
|
||||||
ModifyIndex: 10,
|
ModifyIndex: 10,
|
||||||
},
|
},
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/go-bexpr"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Recommended name for registration.
|
||||||
|
StreamingHealthServicesName = "streaming-health-services"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StreamingHealthServices supports fetching discovering service instances via the
|
||||||
|
// catalog using the streaming gRPC endpoint.
|
||||||
|
type StreamingHealthServices struct {
|
||||||
|
client StreamingClient
|
||||||
|
logger hclog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStreamingHealthServices creates a cache-type for watching for service
|
||||||
|
// health results via streaming updates.
|
||||||
|
func NewStreamingHealthServices(client StreamingClient, logger hclog.Logger) *StreamingHealthServices {
|
||||||
|
return &StreamingHealthServices{
|
||||||
|
client: client,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch implements cache.Type
|
||||||
|
func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||||
|
// The request should be a ServiceSpecificRequest.
|
||||||
|
reqReal, ok := req.(*structs.ServiceSpecificRequest)
|
||||||
|
if !ok {
|
||||||
|
return cache.FetchResult{}, fmt.Errorf(
|
||||||
|
"Internal cache failure: request wrong type: %T", req)
|
||||||
|
}
|
||||||
|
|
||||||
|
r := agentpb.SubscribeRequest{
|
||||||
|
Topic: agentpb.Topic_ServiceHealth,
|
||||||
|
Key: reqReal.ServiceName,
|
||||||
|
Token: reqReal.Token,
|
||||||
|
Index: reqReal.MinQueryIndex,
|
||||||
|
Filter: reqReal.Filter,
|
||||||
|
Datacenter: reqReal.Datacenter,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect requests need a different topic
|
||||||
|
if reqReal.Connect {
|
||||||
|
r.Topic = agentpb.Topic_ServiceHealthConnect
|
||||||
|
}
|
||||||
|
|
||||||
|
view := MaterializedViewFromFetch(c, opts, r)
|
||||||
|
return view.Fetch(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SupportsBlocking implements cache.Type
|
||||||
|
func (c *StreamingHealthServices) SupportsBlocking() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMaterializedView implements StreamingCacheType
|
||||||
|
func (c *StreamingHealthServices) NewMaterializedViewState() MaterializedViewState {
|
||||||
|
return &healthViewState{
|
||||||
|
state: make(map[string]structs.CheckServiceNode),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamingClient implements StreamingCacheType
|
||||||
|
func (c *StreamingHealthServices) StreamingClient() StreamingClient {
|
||||||
|
return c.client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Logger implements StreamingCacheType
|
||||||
|
func (c *StreamingHealthServices) Logger() hclog.Logger {
|
||||||
|
return c.logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// healthViewState implements MaterializedViewState for storing the view state
|
||||||
|
// of a service health result. We store it as a map to make updates and
|
||||||
|
// deletions a little easier but we could just store a result type
|
||||||
|
// (IndexedCheckServiceNodes) and update it in place for each event - that
|
||||||
|
// involves re-sorting each time etc. though.
|
||||||
|
type healthViewState struct {
|
||||||
|
state map[string]structs.CheckServiceNode
|
||||||
|
filter *bexpr.Filter
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitFilter implements MaterializedViewState
|
||||||
|
func (s *healthViewState) InitFilter(expression string) error {
|
||||||
|
// We apply filtering to the raw CheckServiceNodes before we are done mutating
|
||||||
|
// state in Update to save from storing stuff in memory we'll only filter
|
||||||
|
// later. Because the state is just a map of those types, we can simply run
|
||||||
|
// that map through filter and it will remove any entries that don't match.
|
||||||
|
filter, err := bexpr.CreateFilter(expression, nil, s.state)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.filter = filter
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update implements MaterializedViewState
|
||||||
|
func (s *healthViewState) Update(events []*agentpb.Event) error {
|
||||||
|
for _, event := range events {
|
||||||
|
serviceHealth := event.GetServiceHealth()
|
||||||
|
if serviceHealth == nil {
|
||||||
|
return fmt.Errorf("unexpected event type for service health view: %T",
|
||||||
|
event.GetPayload())
|
||||||
|
}
|
||||||
|
node := serviceHealth.CheckServiceNode
|
||||||
|
id := fmt.Sprintf("%s/%s", node.Node.Node, node.Service.ID)
|
||||||
|
|
||||||
|
switch serviceHealth.Op {
|
||||||
|
case agentpb.CatalogOp_Register:
|
||||||
|
checkServiceNode, err := serviceHealth.CheckServiceNode.ToStructs()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.state[id] = *checkServiceNode
|
||||||
|
case agentpb.CatalogOp_Deregister:
|
||||||
|
delete(s.state, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if s.filter != nil {
|
||||||
|
filtered, err := s.filter.Execute(s.state)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.state = filtered.(map[string]structs.CheckServiceNode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Result implements MaterializedViewState
|
||||||
|
func (s *healthViewState) Result(index uint64) (interface{}, error) {
|
||||||
|
var result structs.IndexedCheckServiceNodes
|
||||||
|
// Avoid a nil slice if there are no results in the view
|
||||||
|
result.Nodes = structs.CheckServiceNodes{}
|
||||||
|
for _, node := range s.state {
|
||||||
|
result.Nodes = append(result.Nodes, node)
|
||||||
|
}
|
||||||
|
result.Index = index
|
||||||
|
return &result, nil
|
||||||
|
}
|
|
@ -0,0 +1,482 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
|
||||||
|
client := NewTestStreamingClient()
|
||||||
|
typ := StreamingHealthServices{
|
||||||
|
client: client,
|
||||||
|
logger: hclog.Default(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initially there are no services registered. Server should send an
|
||||||
|
// EndOfSnapshot message immediately with index of 1.
|
||||||
|
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 1)
|
||||||
|
client.QueueEvents(&eosEv)
|
||||||
|
|
||||||
|
// This contains the view state so important we share it between calls.
|
||||||
|
opts := cache.FetchOptions{
|
||||||
|
MinIndex: 0,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}
|
||||||
|
req := &structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
}
|
||||||
|
empty := &structs.IndexedCheckServiceNodes{
|
||||||
|
Nodes: structs.CheckServiceNodes{},
|
||||||
|
QueryMeta: structs.QueryMeta{
|
||||||
|
Index: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.True(t, t.Run("empty snapshot returned", func(t *testing.T) {
|
||||||
|
// Fetch should return an empty
|
||||||
|
// result of the right type with a non-zero index, and in the background begin
|
||||||
|
// streaming updates.
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(1), result.Index)
|
||||||
|
require.Equal(t, empty, result.Value)
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("blocks for timeout", func(t *testing.T) {
|
||||||
|
// Subsequent fetch should block for the timeout
|
||||||
|
start := time.Now()
|
||||||
|
opts.Timeout = 200 * time.Millisecond
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until timeout")
|
||||||
|
|
||||||
|
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
|
||||||
|
require.Equal(t, empty, result.Value, "result value should not have changed")
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("blocks until update", func(t *testing.T) {
|
||||||
|
// Make another blocking query with a longer timeout and trigger an update
|
||||||
|
// event part way through.
|
||||||
|
start := time.Now()
|
||||||
|
go func() {
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Then a service registers
|
||||||
|
healthEv := agentpb.TestEventServiceHealthRegister(t, 4, 1, "web")
|
||||||
|
client.QueueEvents(&healthEv)
|
||||||
|
}()
|
||||||
|
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until the event was delivered")
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
|
||||||
|
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 1,
|
||||||
|
"result value should contain the new registration")
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("reconnects and resumes after transient stream error", func(t *testing.T) {
|
||||||
|
client.QueueErr(resetErr("broken pipe"))
|
||||||
|
|
||||||
|
// After the error the view should re-subscribe with same index so will get
|
||||||
|
// a "resume stream".
|
||||||
|
resumeEv := agentpb.TestEventResumeStream(t, agentpb.Topic_ServiceHealth, opts.MinIndex)
|
||||||
|
client.QueueEvents(&resumeEv)
|
||||||
|
|
||||||
|
// Next fetch will continue to block until timeout and receive the same
|
||||||
|
// result.
|
||||||
|
start := time.Now()
|
||||||
|
opts.Timeout = 200 * time.Millisecond
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until timeout")
|
||||||
|
|
||||||
|
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
|
||||||
|
require.Equal(t, opts.LastResult.Value, result.Value, "result value should not have changed")
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
|
||||||
|
// But an update should still be noticed due to reconnection
|
||||||
|
healthEv := agentpb.TestEventServiceHealthRegister(t, 10, 2, "web")
|
||||||
|
client.QueueEvents(&healthEv)
|
||||||
|
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err = typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed = time.Since(start)
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
|
||||||
|
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 2,
|
||||||
|
"result value should contain the new registration")
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("returns non-temporary error to watchers", func(t *testing.T) {
|
||||||
|
// Wait and send the error while fetcher is waiting
|
||||||
|
go func() {
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
client.QueueErr(errors.New("invalid request"))
|
||||||
|
|
||||||
|
// After the error the view should re-subscribe with same index so will get
|
||||||
|
// a "resume stream".
|
||||||
|
resumeEv := agentpb.TestEventResumeStream(t, agentpb.Topic_ServiceHealth, opts.MinIndex)
|
||||||
|
client.QueueEvents(&resumeEv)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Next fetch should return the error
|
||||||
|
start := time.Now()
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.Error(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until error was sent")
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
|
||||||
|
// We don't require instances to be returned in same order so we use
|
||||||
|
// elementsMatch which is recursive.
|
||||||
|
requireResultsSame(t,
|
||||||
|
opts.LastResult.Value.(*structs.IndexedCheckServiceNodes),
|
||||||
|
result.Value.(*structs.IndexedCheckServiceNodes),
|
||||||
|
)
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
|
||||||
|
// But an update should still be noticed due to reconnection
|
||||||
|
healthEv := agentpb.TestEventServiceHealthRegister(t, opts.MinIndex+5, 3, "web")
|
||||||
|
client.QueueEvents(&healthEv)
|
||||||
|
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err = typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed = time.Since(start)
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, opts.MinIndex+5, result.Index, "result index should not have changed")
|
||||||
|
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
|
||||||
|
"result value should contain the new registration")
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
// requireResultsSame compares two IndexedCheckServiceNodes without requiring
|
||||||
|
// the same order of results (which vary due to map usage internally).
|
||||||
|
func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) {
|
||||||
|
require.Equal(t, want.Index, got.Index)
|
||||||
|
|
||||||
|
svcIDs := func(csns structs.CheckServiceNodes) []string {
|
||||||
|
res := make([]string, 0, len(csns))
|
||||||
|
for _, csn := range csns {
|
||||||
|
res = append(res, fmt.Sprintf("%s/%s", csn.Node.Node, csn.Service.ID))
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
gotIDs := svcIDs(got.Nodes)
|
||||||
|
wantIDs := svcIDs(want.Nodes)
|
||||||
|
|
||||||
|
require.ElementsMatch(t, wantIDs, gotIDs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
|
client := NewTestStreamingClient()
|
||||||
|
typ := StreamingHealthServices{
|
||||||
|
client: client,
|
||||||
|
logger: hclog.Default(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an initial snapshot of 3 instances on different nodes
|
||||||
|
makeReg := func(index uint64, nodeNum int) *agentpb.Event {
|
||||||
|
e := agentpb.TestEventServiceHealthRegister(t, index, nodeNum, "web")
|
||||||
|
return &e
|
||||||
|
}
|
||||||
|
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5)
|
||||||
|
client.QueueEvents(
|
||||||
|
makeReg(5, 1),
|
||||||
|
makeReg(5, 2),
|
||||||
|
makeReg(5, 3),
|
||||||
|
&eosEv,
|
||||||
|
)
|
||||||
|
|
||||||
|
// This contains the view state so important we share it between calls.
|
||||||
|
opts := cache.FetchOptions{
|
||||||
|
MinIndex: 0,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}
|
||||||
|
req := &structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
}
|
||||||
|
|
||||||
|
gatherNodes := func(res interface{}) []string {
|
||||||
|
nodes := make([]string, 0, 3)
|
||||||
|
r := res.(*structs.IndexedCheckServiceNodes)
|
||||||
|
for _, csn := range r.Nodes {
|
||||||
|
nodes = append(nodes, csn.Node.Node)
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
require.True(t, t.Run("full snapshot returned", func(t *testing.T) {
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(5), result.Index)
|
||||||
|
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
|
||||||
|
gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("blocks until deregistration", func(t *testing.T) {
|
||||||
|
// Make another blocking query with a longer timeout and trigger an update
|
||||||
|
// event part way through.
|
||||||
|
start := time.Now()
|
||||||
|
go func() {
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Deregister instance on node1
|
||||||
|
healthEv := agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web")
|
||||||
|
client.QueueEvents(&healthEv)
|
||||||
|
}()
|
||||||
|
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until the event was delivered")
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(20), result.Index)
|
||||||
|
require.ElementsMatch(t, []string{"node2", "node3"},
|
||||||
|
gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("server reload is respected", func(t *testing.T) {
|
||||||
|
// Simulates the server noticing the request's ACL token privs changing. To
|
||||||
|
// detect this we'll queue up the new snapshot as a different set of nodes
|
||||||
|
// to the first.
|
||||||
|
resetEv := agentpb.TestEventResetStream(t, agentpb.Topic_ServiceHealth, 45)
|
||||||
|
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 50)
|
||||||
|
client.QueueEvents(
|
||||||
|
&resetEv,
|
||||||
|
makeReg(50, 3), // overlap existing node
|
||||||
|
makeReg(50, 4),
|
||||||
|
makeReg(50, 5),
|
||||||
|
&eosEv,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Make another blocking query with THE SAME index. It should immediately
|
||||||
|
// return the new snapshot.
|
||||||
|
start := time.Now()
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(50), result.Index)
|
||||||
|
require.ElementsMatch(t, []string{"node3", "node4", "node5"},
|
||||||
|
gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamingHealthServices_EventBatches(t *testing.T) {
|
||||||
|
client := NewTestStreamingClient()
|
||||||
|
typ := StreamingHealthServices{
|
||||||
|
client: client,
|
||||||
|
logger: hclog.Default(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an initial snapshot of 3 instances but in a single event batch
|
||||||
|
batchEv := agentpb.TestEventBatchWithEvents(t,
|
||||||
|
agentpb.TestEventServiceHealthRegister(t, 5, 1, "web"),
|
||||||
|
agentpb.TestEventServiceHealthRegister(t, 5, 2, "web"),
|
||||||
|
agentpb.TestEventServiceHealthRegister(t, 5, 3, "web"),
|
||||||
|
)
|
||||||
|
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5)
|
||||||
|
|
||||||
|
client.QueueEvents(
|
||||||
|
&batchEv,
|
||||||
|
&eosEv,
|
||||||
|
)
|
||||||
|
|
||||||
|
// This contains the view state so important we share it between calls.
|
||||||
|
opts := cache.FetchOptions{
|
||||||
|
MinIndex: 0,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}
|
||||||
|
req := &structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
}
|
||||||
|
|
||||||
|
gatherNodes := func(res interface{}) []string {
|
||||||
|
nodes := make([]string, 0, 3)
|
||||||
|
r := res.(*structs.IndexedCheckServiceNodes)
|
||||||
|
for _, csn := range r.Nodes {
|
||||||
|
nodes = append(nodes, csn.Node.Node)
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
require.True(t, t.Run("full snapshot returned", func(t *testing.T) {
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(5), result.Index)
|
||||||
|
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
|
||||||
|
gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("batched updates work too", func(t *testing.T) {
|
||||||
|
// Simulate multiple registrations happening in one Txn (so all have same
|
||||||
|
// index)
|
||||||
|
batchEv := agentpb.TestEventBatchWithEvents(t,
|
||||||
|
// Deregister an existing node
|
||||||
|
agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web"),
|
||||||
|
// Register another
|
||||||
|
agentpb.TestEventServiceHealthRegister(t, 20, 4, "web"),
|
||||||
|
)
|
||||||
|
client.QueueEvents(&batchEv)
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(20), result.Index)
|
||||||
|
require.ElementsMatch(t, []string{"node2", "node3", "node4"},
|
||||||
|
gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamingHealthServices_Filtering(t *testing.T) {
|
||||||
|
client := NewTestStreamingClient()
|
||||||
|
typ := StreamingHealthServices{
|
||||||
|
client: client,
|
||||||
|
logger: hclog.Default(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an initial snapshot of 3 instances but in a single event batch
|
||||||
|
batchEv := agentpb.TestEventBatchWithEvents(t,
|
||||||
|
agentpb.TestEventServiceHealthRegister(t, 5, 1, "web"),
|
||||||
|
agentpb.TestEventServiceHealthRegister(t, 5, 2, "web"),
|
||||||
|
agentpb.TestEventServiceHealthRegister(t, 5, 3, "web"),
|
||||||
|
)
|
||||||
|
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5)
|
||||||
|
client.QueueEvents(
|
||||||
|
&batchEv,
|
||||||
|
&eosEv,
|
||||||
|
)
|
||||||
|
|
||||||
|
// This contains the view state so important we share it between calls.
|
||||||
|
opts := cache.FetchOptions{
|
||||||
|
MinIndex: 0,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}
|
||||||
|
req := &structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
QueryOptions: structs.QueryOptions{
|
||||||
|
Filter: `Node.Node == "node2"`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
gatherNodes := func(res interface{}) []string {
|
||||||
|
nodes := make([]string, 0, 3)
|
||||||
|
r := res.(*structs.IndexedCheckServiceNodes)
|
||||||
|
for _, csn := range r.Nodes {
|
||||||
|
nodes = append(nodes, csn.Node.Node)
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
require.True(t, t.Run("filtered snapshot returned", func(t *testing.T) {
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(5), result.Index)
|
||||||
|
require.ElementsMatch(t, []string{"node2"},
|
||||||
|
gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("filtered updates work too", func(t *testing.T) {
|
||||||
|
// Simulate multiple registrations happening in one Txn (so all have same
|
||||||
|
// index)
|
||||||
|
batchEv := agentpb.TestEventBatchWithEvents(t,
|
||||||
|
// Deregister an existing node
|
||||||
|
agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web"),
|
||||||
|
// Register another
|
||||||
|
agentpb.TestEventServiceHealthRegister(t, 20, 4, "web"),
|
||||||
|
)
|
||||||
|
client.QueueEvents(&batchEv)
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(20), result.Index)
|
||||||
|
require.ElementsMatch(t, []string{"node2"},
|
||||||
|
gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
}))
|
||||||
|
}
|
|
@ -0,0 +1,443 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/lib"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// SubscribeBackoffMax controls the range of exponential backoff when errors
|
||||||
|
// are returned from subscriptions.
|
||||||
|
SubscribeBackoffMax = 60 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// StreamingClient is the interface we need from the gRPC client stub. Separate
|
||||||
|
// interface simplifies testing.
|
||||||
|
type StreamingClient interface {
|
||||||
|
Subscribe(ctx context.Context, in *agentpb.SubscribeRequest, opts ...grpc.CallOption) (agentpb.Consul_SubscribeClient, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaterializedViewState is the interface used to manage they type-specific
|
||||||
|
// materialized view logic.
|
||||||
|
type MaterializedViewState interface {
|
||||||
|
// InitFilter is called once when the view is constructed if the subscription
|
||||||
|
// has a non-empty Filter argument. The implementor is expected to create a
|
||||||
|
// *bexpr.Filter and store it locally so it can be used to filter events
|
||||||
|
// and/or results. Ideally filtering should occur inside `Update` calls such
|
||||||
|
// that we don't store objects in the view state that are just filtered when
|
||||||
|
// the result is returned, however in some cases it might not be possible and
|
||||||
|
// the type may choose to store the whole view and only apply filtering in the
|
||||||
|
// Result method just before returning a result.
|
||||||
|
InitFilter(expression string) error
|
||||||
|
|
||||||
|
// Update is called when one or more events are received. The first call will
|
||||||
|
// include _all_ events in the initial snapshot which may be an empty set.
|
||||||
|
// Subsequent calls will contain one or more update events in the order they
|
||||||
|
// are received.
|
||||||
|
Update(events []*agentpb.Event) error
|
||||||
|
|
||||||
|
// Result returns the type-specific cache result based on the state. When no
|
||||||
|
// events have been delivered yet the result should be an empty value type
|
||||||
|
// suitable to return to clients in case there is an empty result on the
|
||||||
|
// servers. The index the materialized view represents is maintained
|
||||||
|
// separately and passed in in case the return type needs an Index field
|
||||||
|
// populating. This allows implementations to not worry about maintaining
|
||||||
|
// indexes seen during Update.
|
||||||
|
Result(index uint64) (interface{}, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamingCacheType is the interface a cache-type needs to implement to make
|
||||||
|
// use of streaming as the transport for updates from the server.
|
||||||
|
type StreamingCacheType interface {
|
||||||
|
NewMaterializedViewState() MaterializedViewState
|
||||||
|
StreamingClient() StreamingClient
|
||||||
|
Logger() hclog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// temporary is a private interface as used by net and other std lib packages to
|
||||||
|
// show error types represent temporary/recoverable errors.
|
||||||
|
type temporary interface {
|
||||||
|
Temporary() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// resetErr represents a server request to reset the subscription, it's typed so
|
||||||
|
// we can mark it as temporary and so attempt to retry first time without
|
||||||
|
// notifying clients.
|
||||||
|
type resetErr string
|
||||||
|
|
||||||
|
// Temporary Implements the internal Temporary interface
|
||||||
|
func (e resetErr) Temporary() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error implements error
|
||||||
|
func (e resetErr) Error() string {
|
||||||
|
return string(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaterializedView is a partial view of the state on servers, maintained via
|
||||||
|
// streaming subscriptions. It is specialized for different cache types by
|
||||||
|
// providing a MaterializedViewState that encapsulates the logic to update the
|
||||||
|
// state and format it as the correct result type.
|
||||||
|
//
|
||||||
|
// The MaterializedView object becomes the cache.Result.State for a streaming
|
||||||
|
// cache type and manages the actual streaming RPC call to the servers behind
|
||||||
|
// the scenes until the cache result is discarded when TTL expires.
|
||||||
|
type MaterializedView struct {
|
||||||
|
// Properties above the lock are immutable after the view is constructed in
|
||||||
|
// MaterializedViewFromFetch and must not be modified.
|
||||||
|
typ StreamingCacheType
|
||||||
|
client StreamingClient
|
||||||
|
logger hclog.Logger
|
||||||
|
req agentpb.SubscribeRequest
|
||||||
|
ctx context.Context
|
||||||
|
cancelFunc func()
|
||||||
|
|
||||||
|
// l protects the mutable state - all fields below it must only be accessed
|
||||||
|
// while holding l.
|
||||||
|
l sync.Mutex
|
||||||
|
index uint64
|
||||||
|
state MaterializedViewState
|
||||||
|
snapshotDone bool
|
||||||
|
updateCh chan struct{}
|
||||||
|
retryWaiter *lib.RetryWaiter
|
||||||
|
err error
|
||||||
|
fatalErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaterializedViewFromFetch retrieves an existing view from the cache result
|
||||||
|
// state if one exists, otherwise creates a new one. Note that the returned view
|
||||||
|
// MUST have Close called eventually to avoid leaking resources. Typically this
|
||||||
|
// is done automatically if the view is returned in a cache.Result.State when
|
||||||
|
// the cache evicts the result. If the view is not returned in a result state
|
||||||
|
// though Close must be called some other way to avoid leaking the goroutine and
|
||||||
|
// memory.
|
||||||
|
func MaterializedViewFromFetch(t StreamingCacheType, opts cache.FetchOptions, subReq agentpb.SubscribeRequest) *MaterializedView {
|
||||||
|
if opts.LastResult == nil || opts.LastResult.State == nil {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
v := &MaterializedView{
|
||||||
|
typ: t,
|
||||||
|
client: t.StreamingClient(),
|
||||||
|
logger: t.Logger(),
|
||||||
|
req: subReq,
|
||||||
|
ctx: ctx,
|
||||||
|
cancelFunc: cancel,
|
||||||
|
retryWaiter: lib.NewRetryWaiter(0, 0, SubscribeBackoffMax,
|
||||||
|
lib.NewJitterRandomStagger(100)),
|
||||||
|
}
|
||||||
|
// Run init now otherwise there is a race between run() and a call to Fetch
|
||||||
|
// which expects a view state to exist.
|
||||||
|
v.reset()
|
||||||
|
go v.run()
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return opts.LastResult.State.(*MaterializedView)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements io.Close and discards view state and stops background view
|
||||||
|
// maintenance.
|
||||||
|
func (v *MaterializedView) Close() error {
|
||||||
|
v.l.Lock()
|
||||||
|
defer v.l.Unlock()
|
||||||
|
if v.cancelFunc != nil {
|
||||||
|
v.cancelFunc()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *MaterializedView) run() {
|
||||||
|
if v.ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Loop in case stream resets and we need to start over
|
||||||
|
for {
|
||||||
|
// Run a subscribe call until it fails
|
||||||
|
err := v.runSubscription()
|
||||||
|
if err != nil {
|
||||||
|
// Check if the view closed
|
||||||
|
if v.ctx.Err() != nil {
|
||||||
|
// Err doesn't matter and is likely just context cancelled
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
v.l.Lock()
|
||||||
|
// If this is a temporary error and it's the first consecutive failure,
|
||||||
|
// retry to see if we can get a result without erroring back to clients.
|
||||||
|
// If it's non-temporary or a repeated failure return to clients while we
|
||||||
|
// retry to get back in a good state.
|
||||||
|
if _, ok := err.(temporary); !ok || v.retryWaiter.Failures() > 0 {
|
||||||
|
// Report error to blocked fetchers
|
||||||
|
v.err = err
|
||||||
|
v.notifyUpdateLocked()
|
||||||
|
}
|
||||||
|
waitCh := v.retryWaiter.Failed()
|
||||||
|
failures := v.retryWaiter.Failures()
|
||||||
|
v.l.Unlock()
|
||||||
|
|
||||||
|
// Exponential backoff to avoid hammering servers if they are closing
|
||||||
|
// conns because of overload or resetting based on errors.
|
||||||
|
v.logger.Error("subscribe call failed", "err", err, "topic", v.req.Topic,
|
||||||
|
"key", v.req.Key, "failure_count", failures)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-v.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-waitCh:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Loop and keep trying to resume subscription after error
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// runSubscription opens a new subscribe streaming call to the servers and runs
|
||||||
|
// for it's lifetime or until the view is closed.
|
||||||
|
func (v *MaterializedView) runSubscription() error {
|
||||||
|
ctx, cancel := context.WithCancel(v.ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Copy the request template
|
||||||
|
req := v.req
|
||||||
|
|
||||||
|
v.l.Lock()
|
||||||
|
|
||||||
|
// Update request index to be the current view index in case we are resuming a
|
||||||
|
// broken stream.
|
||||||
|
req.Index = v.index
|
||||||
|
|
||||||
|
// Make local copy so we don't have to read with a lock for every event. We
|
||||||
|
// are the only goroutine that can update so we know it won't change without
|
||||||
|
// us knowing but we do need lock to protect external readers when we update.
|
||||||
|
snapshotDone := v.snapshotDone
|
||||||
|
|
||||||
|
v.l.Unlock()
|
||||||
|
|
||||||
|
s, err := v.client.Subscribe(ctx, &req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshotEvents := make([]*agentpb.Event, 0)
|
||||||
|
|
||||||
|
for {
|
||||||
|
event, err := s.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.GetResetStream() {
|
||||||
|
// Server has requested we reset the view and start with a fresh snapshot
|
||||||
|
// - perhaps because our ACL policy changed. We reset the view state and
|
||||||
|
// then return an error to allow the `run` method to retry after a backoff
|
||||||
|
// if necessary.
|
||||||
|
v.reset()
|
||||||
|
return resetErr("stream reset requested")
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.GetEndOfSnapshot() {
|
||||||
|
// Hold lock while mutating view state so implementer doesn't need to
|
||||||
|
// worry about synchronization.
|
||||||
|
v.l.Lock()
|
||||||
|
|
||||||
|
// Deliver snapshot events to the View state
|
||||||
|
if err := v.state.Update(snapshotEvents); err != nil {
|
||||||
|
v.l.Unlock()
|
||||||
|
// This error is kinda fatal to the view - we didn't apply some events
|
||||||
|
// the server sent us which means our view is now not in sync. The only
|
||||||
|
// thing we can do is start over and hope for a better outcome.
|
||||||
|
v.reset()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Done collecting these now
|
||||||
|
snapshotEvents = nil
|
||||||
|
v.snapshotDone = true
|
||||||
|
// update our local copy so we can read it without lock.
|
||||||
|
snapshotDone = true
|
||||||
|
v.index = event.Index
|
||||||
|
// We have a good result, reset the error flag
|
||||||
|
v.err = nil
|
||||||
|
v.retryWaiter.Reset()
|
||||||
|
// Notify watchers of the update to the view
|
||||||
|
v.notifyUpdateLocked()
|
||||||
|
v.l.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.GetResumeStream() {
|
||||||
|
// We've opened a new subscribe with a non-zero index to resume a
|
||||||
|
// connection and the server confirms it's not sending a new snapshot.
|
||||||
|
if !snapshotDone {
|
||||||
|
// We've somehow got into a bad state here - the server thinks we have
|
||||||
|
// an up-to-date snapshot but we don't think we do. Reset and start
|
||||||
|
// over.
|
||||||
|
v.reset()
|
||||||
|
return errors.New("stream resume sent but no local snapshot")
|
||||||
|
}
|
||||||
|
// Just continue on as we were!
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// We have an event for the topic
|
||||||
|
events := []*agentpb.Event{event}
|
||||||
|
|
||||||
|
// If the event is a batch, unwrap and deliver the raw events
|
||||||
|
if batch := event.GetEventBatch(); batch != nil {
|
||||||
|
events = batch.Events
|
||||||
|
}
|
||||||
|
|
||||||
|
if snapshotDone {
|
||||||
|
// We've already got a snapshot, this is an update, deliver it right away.
|
||||||
|
v.l.Lock()
|
||||||
|
if err := v.state.Update(events); err != nil {
|
||||||
|
v.l.Unlock()
|
||||||
|
// This error is kinda fatal to the view - we didn't apply some events
|
||||||
|
// the server sent us which means our view is now not in sync. The only
|
||||||
|
// thing we can do is start over and hope for a better outcome.
|
||||||
|
v.reset()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Notify watchers of the update to the view
|
||||||
|
v.index = event.Index
|
||||||
|
// We have a good result, reset the error flag
|
||||||
|
v.err = nil
|
||||||
|
v.retryWaiter.Reset()
|
||||||
|
v.notifyUpdateLocked()
|
||||||
|
v.l.Unlock()
|
||||||
|
} else {
|
||||||
|
snapshotEvents = append(snapshotEvents, events...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset clears the state ready to start a new stream from scratch.
|
||||||
|
func (v *MaterializedView) reset() {
|
||||||
|
v.l.Lock()
|
||||||
|
defer v.l.Unlock()
|
||||||
|
|
||||||
|
v.state = v.typ.NewMaterializedViewState()
|
||||||
|
if v.req.Filter != "" {
|
||||||
|
if err := v.state.InitFilter(v.req.Filter); err != nil {
|
||||||
|
// If this errors we are stuck - it's fatal for the whole request as it
|
||||||
|
// means there was a bug or an invalid filter string we couldn't parse.
|
||||||
|
// Stop the whole view by closing it and cancelling context, but also set
|
||||||
|
// the error internally so that Fetch calls can return a meaningful error
|
||||||
|
// and not just "context cancelled".
|
||||||
|
v.fatalErr = err
|
||||||
|
if v.cancelFunc != nil {
|
||||||
|
v.cancelFunc()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
v.notifyUpdateLocked()
|
||||||
|
// Always start from zero when we have a new state so we load a snapshot from
|
||||||
|
// the servers.
|
||||||
|
v.index = 0
|
||||||
|
v.snapshotDone = false
|
||||||
|
v.err = nil
|
||||||
|
v.retryWaiter.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
// notifyUpdateLocked closes the current update channel and recreates a new
|
||||||
|
// one. It must be called while holding the s.l lock.
|
||||||
|
func (v *MaterializedView) notifyUpdateLocked() {
|
||||||
|
if v.updateCh != nil {
|
||||||
|
close(v.updateCh)
|
||||||
|
}
|
||||||
|
v.updateCh = make(chan struct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch implements the logic a StreamingCacheType will need during it's Fetch
|
||||||
|
// call. Cache types that use streaming should just be able to proxy to this
|
||||||
|
// once they have a subscription object and return it's results directly.
|
||||||
|
func (v *MaterializedView) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
|
||||||
|
var result cache.FetchResult
|
||||||
|
|
||||||
|
// Get current view Result and index
|
||||||
|
v.l.Lock()
|
||||||
|
index := v.index
|
||||||
|
val, err := v.state.Result(v.index)
|
||||||
|
updateCh := v.updateCh
|
||||||
|
v.l.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Index = index
|
||||||
|
result.Value = val
|
||||||
|
result.State = v
|
||||||
|
|
||||||
|
// If our index is > req.Index return right away. If index is zero then we
|
||||||
|
// haven't loaded a snapshot at all yet which means we should wait for one on
|
||||||
|
// the update chan. Note it's opts.MinIndex that the cache is using here the
|
||||||
|
// request min index might be different and from initial user request.
|
||||||
|
if index > 0 && index > opts.MinIndex {
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watch for timeout of the Fetch. Note it's opts.Timeout not req.Timeout
|
||||||
|
// since that is the timeout the client requested from the cache Get while the
|
||||||
|
// options one is the internal "background refresh" timeout which is what the
|
||||||
|
// Fetch call should be using.
|
||||||
|
timeoutCh := time.After(opts.Timeout)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-updateCh:
|
||||||
|
// View updated, return the new result
|
||||||
|
v.l.Lock()
|
||||||
|
result.Index = v.index
|
||||||
|
// Grab the new updateCh in case we need to keep waiting for the next
|
||||||
|
// update.
|
||||||
|
updateCh = v.updateCh
|
||||||
|
fetchErr := v.err
|
||||||
|
if fetchErr == nil {
|
||||||
|
// Only generate a new result if there was no error to avoid pointless
|
||||||
|
// work potentially shuffling the same data around.
|
||||||
|
result.Value, err = v.state.Result(v.index)
|
||||||
|
}
|
||||||
|
v.l.Unlock()
|
||||||
|
|
||||||
|
// If there was a non-transient error return it
|
||||||
|
if fetchErr != nil {
|
||||||
|
return result, fetchErr
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanity check the update is actually later than the one the user
|
||||||
|
// requested.
|
||||||
|
if result.Index <= opts.MinIndex {
|
||||||
|
// The result is still older/same as the requested index, continue to
|
||||||
|
// wait for further updates.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the updated result
|
||||||
|
return result, nil
|
||||||
|
|
||||||
|
case <-timeoutCh:
|
||||||
|
// Just return whatever we got originally, might still be empty
|
||||||
|
return result, nil
|
||||||
|
|
||||||
|
case <-v.ctx.Done():
|
||||||
|
v.l.Lock()
|
||||||
|
err := v.fatalErr
|
||||||
|
v.l.Unlock()
|
||||||
|
if err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
return result, v.ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/agentpb"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestStreamingClient is a mock StreamingClient for testing that allows
|
||||||
|
// for queueing up custom events to a subscriber.
|
||||||
|
type TestStreamingClient struct {
|
||||||
|
events chan eventOrErr
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventOrErr struct {
|
||||||
|
Err error
|
||||||
|
Event *agentpb.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTestStreamingClient() *TestStreamingClient {
|
||||||
|
return &TestStreamingClient{
|
||||||
|
events: make(chan eventOrErr, 32),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) Subscribe(ctx context.Context, in *agentpb.SubscribeRequest, opts ...grpc.CallOption) (agentpb.Consul_SubscribeClient, error) {
|
||||||
|
t.ctx = ctx
|
||||||
|
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) QueueEvents(events ...*agentpb.Event) {
|
||||||
|
for _, e := range events {
|
||||||
|
t.events <- eventOrErr{Event: e}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) QueueErr(err error) {
|
||||||
|
t.events <- eventOrErr{Err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) Recv() (*agentpb.Event, error) {
|
||||||
|
select {
|
||||||
|
case eoe := <-t.events:
|
||||||
|
if eoe.Err != nil {
|
||||||
|
return nil, eoe.Err
|
||||||
|
}
|
||||||
|
return eoe.Event, nil
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
return nil, t.ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) Header() (metadata.MD, error) { return nil, nil }
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) Trailer() metadata.MD { return nil }
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) CloseSend() error { return nil }
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) Context() context.Context { return nil }
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) SendMsg(m interface{}) error { return nil }
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) RecvMsg(m interface{}) error { return nil }
|
|
@ -17,6 +17,7 @@ package cache
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
@ -717,6 +718,12 @@ func (c *Cache) runExpiryLoop() {
|
||||||
case <-expiryCh:
|
case <-expiryCh:
|
||||||
c.entriesLock.Lock()
|
c.entriesLock.Lock()
|
||||||
|
|
||||||
|
// Perform cleanup operations on the entry's state, if applicable.
|
||||||
|
state := c.entries[entry.Key].State
|
||||||
|
if closer, ok := state.(io.Closer); ok {
|
||||||
|
closer.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// Entry expired! Remove it.
|
// Entry expired! Remove it.
|
||||||
delete(c.entries, entry.Key)
|
delete(c.entries, entry.Key)
|
||||||
heap.Remove(c.entriesExpiryHeap, entry.HeapIndex)
|
heap.Remove(c.entriesExpiryHeap, entry.HeapIndex)
|
||||||
|
|
|
@ -808,6 +808,49 @@ func TestCacheGet_expireResetGet(t *testing.T) {
|
||||||
typ.AssertExpectations(t)
|
typ.AssertExpectations(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testCloser struct {
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testCloser) Close() error {
|
||||||
|
t.closed = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that entries with state that satisfies io.Closer get cleaned up
|
||||||
|
func TestCacheGet_expireClose(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
typ := TestType(t)
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := TestCache(t)
|
||||||
|
|
||||||
|
// Register the type with a timeout
|
||||||
|
c.RegisterType("t", typ, &RegisterOptions{
|
||||||
|
LastGetTTL: 100 * time.Millisecond,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Configure the type
|
||||||
|
state := &testCloser{}
|
||||||
|
typ.Static(FetchResult{Value: 42, State: state}, nil).Times(1)
|
||||||
|
|
||||||
|
// Get, should fetch
|
||||||
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
|
result, meta, err := c.Get("t", req)
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
require.False(state.closed)
|
||||||
|
|
||||||
|
// Sleep for the expiry
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// state.Close() should have been called
|
||||||
|
require.True(state.closed)
|
||||||
|
}
|
||||||
|
|
||||||
// Test a Get with a request that returns the same cache key across
|
// Test a Get with a request that returns the same cache key across
|
||||||
// two different "types" returns two separate results.
|
// two different "types" returns two separate results.
|
||||||
func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {
|
func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {
|
||||||
|
|
|
@ -62,7 +62,7 @@ func testServiceRegistration(t *testing.T, svc string, opts ...regOption) *struc
|
||||||
}
|
}
|
||||||
|
|
||||||
func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) agentpb.Event {
|
func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) agentpb.Event {
|
||||||
e := agentpb.TestEventServiceHealthRegister(t, 1, svc)
|
e := agentpb.TestEventServiceHealthRegister(t, 100, 1, svc)
|
||||||
|
|
||||||
// Normalize a few things that are different in the generic event which was
|
// Normalize a few things that are different in the generic event which was
|
||||||
// based on original code here but made more general. This means we don't have
|
// based on original code here but made more general. This means we don't have
|
||||||
|
@ -79,7 +79,7 @@ func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) agent
|
||||||
}
|
}
|
||||||
|
|
||||||
func testServiceHealthDeregistrationEvent(t *testing.T, svc string, opts ...eventOption) agentpb.Event {
|
func testServiceHealthDeregistrationEvent(t *testing.T, svc string, opts ...eventOption) agentpb.Event {
|
||||||
e := agentpb.TestEventServiceHealthDeregister(t, 1, svc)
|
e := agentpb.TestEventServiceHealthDeregister(t, 100, 1, svc)
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
err := opt(&e)
|
err := opt(&e)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -198,8 +198,13 @@ func (s *HTTPServer) healthServiceNodes(resp http.ResponseWriter, req *http.Requ
|
||||||
var out structs.IndexedCheckServiceNodes
|
var out structs.IndexedCheckServiceNodes
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
|
||||||
if args.QueryOptions.UseCache {
|
// Use the streaming cachetype if enabled.
|
||||||
raw, m, err := s.agent.cache.Get(cachetype.HealthServicesName, &args)
|
if args.QueryOptions.UseCache || (s.agent.config.EnableBackendStreaming && args.MinQueryIndex > 0) {
|
||||||
|
cType := cachetype.HealthServicesName
|
||||||
|
if s.agent.config.EnableBackendStreaming {
|
||||||
|
cType = cachetype.StreamingHealthServicesName
|
||||||
|
}
|
||||||
|
raw, m, err := s.agent.cache.Get(cType, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,9 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
|
@ -672,6 +674,12 @@ func TestHealthServiceNodes(t *testing.T) {
|
||||||
if len(nodes) != 2 {
|
if len(nodes) != 2 {
|
||||||
r.Fatalf("Want 2 nodes")
|
r.Fatalf("Want 2 nodes")
|
||||||
}
|
}
|
||||||
|
header := resp.Header().Get("X-Consul-Index")
|
||||||
|
if header == "" || header == "0" {
|
||||||
|
r.Fatalf("Want non-zero header: %q", header)
|
||||||
|
}
|
||||||
|
_, err = strconv.ParseUint(header, 10, 64)
|
||||||
|
r.Check(err)
|
||||||
|
|
||||||
// Should be a cache hit! The data should've updated in the cache
|
// Should be a cache hit! The data should've updated in the cache
|
||||||
// in the background so this should've been fetched directly from
|
// in the background so this should've been fetched directly from
|
||||||
|
@ -683,6 +691,144 @@ func TestHealthServiceNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHealthServiceNodes_Blocking(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
hcl string
|
||||||
|
}{
|
||||||
|
//{"no streaming", ""},
|
||||||
|
{"streaming", "enable_backend_streaming = true"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
tc := tc
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
a := NewTestAgent(t, t.Name(), tc.hcl)
|
||||||
|
defer a.Shutdown()
|
||||||
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register some initial service instances
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
args := &structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "bar",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: fmt.Sprintf("test%03d", i),
|
||||||
|
Service: "test",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var out struct{}
|
||||||
|
require.NoError(t, a.RPC("Catalog.Register", args, &out))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initial request should return two instances
|
||||||
|
req, _ := http.NewRequest("GET", "/v1/health/service/test?dc=dc1", nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
|
require.Len(t, nodes, 2)
|
||||||
|
|
||||||
|
idx := getIndex(t, resp)
|
||||||
|
require.True(t, idx > 0)
|
||||||
|
|
||||||
|
// errCh collects errors from goroutines since it's unsafe for them to use
|
||||||
|
// t to fail tests directly.
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
|
||||||
|
checkErrs := func() {
|
||||||
|
// Ensure no errors were sent on errCh and drain any nils we have
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
require.NoError(t, err)
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Blocking on that index should block. We test that by launching another
|
||||||
|
// goroutine that will wait a while before updating the registration and
|
||||||
|
// make sure that we unblock before timeout and see the update but that it
|
||||||
|
// takes at least as long as the sleep time.
|
||||||
|
sleep := 200 * time.Millisecond
|
||||||
|
start := time.Now()
|
||||||
|
go func() {
|
||||||
|
time.Sleep(sleep)
|
||||||
|
|
||||||
|
args := &structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "zoo",
|
||||||
|
Address: "127.0.0.3",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: "test",
|
||||||
|
Service: "test",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var out struct{}
|
||||||
|
errCh <- a.RPC("Catalog.Register", args, &out)
|
||||||
|
}()
|
||||||
|
|
||||||
|
{
|
||||||
|
timeout := 30 * time.Second
|
||||||
|
url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s",
|
||||||
|
idx, timeout)
|
||||||
|
req, _ := http.NewRequest("GET", url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed > sleep, "request should block for at "+
|
||||||
|
" least as long as sleep. sleep=%s, elapsed=%s", sleep, elapsed)
|
||||||
|
|
||||||
|
require.True(t, elapsed < timeout, "request should unblock before"+
|
||||||
|
" it timed out. timeout=%s, elapsed=%s", timeout, elapsed)
|
||||||
|
|
||||||
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
|
require.Len(t, nodes, 3)
|
||||||
|
|
||||||
|
newIdx := getIndex(t, resp)
|
||||||
|
require.True(t, idx < newIdx, "index should have increased."+
|
||||||
|
"idx=%d, newIdx=%d", idx, newIdx)
|
||||||
|
|
||||||
|
idx = newIdx
|
||||||
|
|
||||||
|
checkErrs()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Blocking should last until timeout in absence of updates
|
||||||
|
start = time.Now()
|
||||||
|
{
|
||||||
|
timeout := 200 * time.Millisecond
|
||||||
|
url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s",
|
||||||
|
idx, timeout)
|
||||||
|
req, _ := http.NewRequest("GET", url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
// Note that servers add jitter to timeout requested but don't remove it
|
||||||
|
// so this should always be true.
|
||||||
|
require.True(t, elapsed > timeout, "request should block for at "+
|
||||||
|
" least as long as timeout. timeout=%s, elapsed=%s", timeout, elapsed)
|
||||||
|
|
||||||
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
|
require.Len(t, nodes, 3)
|
||||||
|
|
||||||
|
newIdx := getIndex(t, resp)
|
||||||
|
require.Equal(t, idx, newIdx)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
a := NewTestAgent(t, "")
|
a := NewTestAgent(t, "")
|
||||||
|
|
|
@ -132,11 +132,16 @@ func (rw *RetryWaiter) Failed() <-chan struct{} {
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resets the internal failure counter
|
// Resets the internal failure counter.
|
||||||
func (rw *RetryWaiter) Reset() {
|
func (rw *RetryWaiter) Reset() {
|
||||||
rw.failures = 0
|
rw.failures = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Failures returns the current number of consecutive failures recorded.
|
||||||
|
func (rw *RetryWaiter) Failures() int {
|
||||||
|
return int(rw.failures)
|
||||||
|
}
|
||||||
|
|
||||||
// WaitIf is a convenice method to record whether the last
|
// WaitIf is a convenice method to record whether the last
|
||||||
// operation was a success or failure and return a chan that
|
// operation was a success or failure and return a chan that
|
||||||
// will be selectablw when the next operation can be done.
|
// will be selectablw when the next operation can be done.
|
||||||
|
|
Loading…
Reference in New Issue