agent/cache: Add cache-type and materialized view for streaming health

Extracted from d97412ce4c

Co-authored-by: Paul Banks <banks@banksco.de>
This commit is contained in:
Daniel Nephin 2020-09-18 18:25:56 -04:00
parent f5d11562f2
commit 132b76acef
6 changed files with 1199 additions and 0 deletions

View File

@ -0,0 +1,149 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/agentpb"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
)
const (
// Recommended name for registration.
StreamingHealthServicesName = "streaming-health-services"
)
// StreamingHealthServices supports fetching discovering service instances via the
// catalog using the streaming gRPC endpoint.
type StreamingHealthServices struct {
client StreamingClient
logger hclog.Logger
}
// NewStreamingHealthServices creates a cache-type for watching for service
// health results via streaming updates.
func NewStreamingHealthServices(client StreamingClient, logger hclog.Logger) *StreamingHealthServices {
return &StreamingHealthServices{
client: client,
logger: logger,
}
}
// Fetch implements cache.Type
func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
// The request should be a ServiceSpecificRequest.
reqReal, ok := req.(*structs.ServiceSpecificRequest)
if !ok {
return cache.FetchResult{}, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
r := agentpb.SubscribeRequest{
Topic: agentpb.Topic_ServiceHealth,
Key: reqReal.ServiceName,
Token: reqReal.Token,
Index: reqReal.MinQueryIndex,
Filter: reqReal.Filter,
Datacenter: reqReal.Datacenter,
}
// Connect requests need a different topic
if reqReal.Connect {
r.Topic = agentpb.Topic_ServiceHealthConnect
}
view := MaterializedViewFromFetch(c, opts, r)
return view.Fetch(opts)
}
// SupportsBlocking implements cache.Type
func (c *StreamingHealthServices) SupportsBlocking() bool {
return true
}
// NewMaterializedView implements StreamingCacheType
func (c *StreamingHealthServices) NewMaterializedViewState() MaterializedViewState {
return &healthViewState{
state: make(map[string]structs.CheckServiceNode),
}
}
// StreamingClient implements StreamingCacheType
func (c *StreamingHealthServices) StreamingClient() StreamingClient {
return c.client
}
// Logger implements StreamingCacheType
func (c *StreamingHealthServices) Logger() hclog.Logger {
return c.logger
}
// healthViewState implements MaterializedViewState for storing the view state
// of a service health result. We store it as a map to make updates and
// deletions a little easier but we could just store a result type
// (IndexedCheckServiceNodes) and update it in place for each event - that
// involves re-sorting each time etc. though.
type healthViewState struct {
state map[string]structs.CheckServiceNode
filter *bexpr.Filter
}
// InitFilter implements MaterializedViewState
func (s *healthViewState) InitFilter(expression string) error {
// We apply filtering to the raw CheckServiceNodes before we are done mutating
// state in Update to save from storing stuff in memory we'll only filter
// later. Because the state is just a map of those types, we can simply run
// that map through filter and it will remove any entries that don't match.
filter, err := bexpr.CreateFilter(expression, nil, s.state)
if err != nil {
return err
}
s.filter = filter
return nil
}
// Update implements MaterializedViewState
func (s *healthViewState) Update(events []*agentpb.Event) error {
for _, event := range events {
serviceHealth := event.GetServiceHealth()
if serviceHealth == nil {
return fmt.Errorf("unexpected event type for service health view: %T",
event.GetPayload())
}
node := serviceHealth.CheckServiceNode
id := fmt.Sprintf("%s/%s", node.Node.Node, node.Service.ID)
switch serviceHealth.Op {
case agentpb.CatalogOp_Register:
checkServiceNode, err := serviceHealth.CheckServiceNode.ToStructs()
if err != nil {
return err
}
s.state[id] = *checkServiceNode
case agentpb.CatalogOp_Deregister:
delete(s.state, id)
}
}
if s.filter != nil {
filtered, err := s.filter.Execute(s.state)
if err != nil {
return err
}
s.state = filtered.(map[string]structs.CheckServiceNode)
}
return nil
}
// Result implements MaterializedViewState
func (s *healthViewState) Result(index uint64) (interface{}, error) {
var result structs.IndexedCheckServiceNodes
// Avoid a nil slice if there are no results in the view
result.Nodes = structs.CheckServiceNodes{}
for _, node := range s.state {
result.Nodes = append(result.Nodes, node)
}
result.Index = index
return &result, nil
}

View File

@ -0,0 +1,485 @@
package cachetype
import (
"errors"
"fmt"
"testing"
"time"
"github.com/hashicorp/consul/agent/agentpb"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
client := NewTestStreamingClient()
typ := StreamingHealthServices{
client: client,
logger: hclog.Default(),
}
// Initially there are no services registered. Server should send an
// EndOfSnapshot message immediately with index of 1.
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 1)
client.QueueEvents(&eosEv)
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
empty := &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{},
QueryMeta: structs.QueryMeta{
Index: 1,
},
}
require.True(t, t.Run("empty snapshot returned", func(t *testing.T) {
// Fetch should return an empty
// result of the right type with a non-zero index, and in the background begin
// streaming updates.
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(1), result.Index)
require.Equal(t, empty, result.Value)
opts.MinIndex = result.Index
opts.LastResult = &result
}))
require.True(t, t.Run("blocks for timeout", func(t *testing.T) {
// Subsequent fetch should block for the timeout
start := time.Now()
opts.Timeout = 200 * time.Millisecond
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
require.Equal(t, empty, result.Value, "result value should not have changed")
opts.MinIndex = result.Index
opts.LastResult = &result
}))
require.True(t, t.Run("blocks until update", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update
// event part way through.
start := time.Now()
go func() {
time.Sleep(200 * time.Millisecond)
// Then a service registers
healthEv := agentpb.TestEventServiceHealthRegister(t, 4, 1, "web")
client.QueueEvents(&healthEv)
}()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until the event was delivered")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 1,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
}))
require.True(t, t.Run("reconnects and resumes after transient stream error", func(t *testing.T) {
// Use resetErr just because it's "temporary" this is a stand in for any
// network error that uses that same interface though.
client.QueueErr(resetErr("broken pipe"))
// After the error the view should re-subscribe with same index so will get
// a "resume stream".
resumeEv := agentpb.TestEventResumeStream(t, agentpb.Topic_ServiceHealth, opts.MinIndex)
client.QueueEvents(&resumeEv)
// Next fetch will continue to block until timeout and receive the same
// result.
start := time.Now()
opts.Timeout = 200 * time.Millisecond
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
require.Equal(t, opts.LastResult.Value, result.Value, "result value should not have changed")
opts.MinIndex = result.Index
opts.LastResult = &result
// But an update should still be noticed due to reconnection
healthEv := agentpb.TestEventServiceHealthRegister(t, 10, 2, "web")
client.QueueEvents(&healthEv)
start = time.Now()
opts.Timeout = time.Second
result, err = typ.Fetch(opts, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 2,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
}))
require.True(t, t.Run("returns non-temporary error to watchers", func(t *testing.T) {
// Wait and send the error while fetcher is waiting
go func() {
time.Sleep(200 * time.Millisecond)
client.QueueErr(errors.New("invalid request"))
// After the error the view should re-subscribe with same index so will get
// a "resume stream".
resumeEv := agentpb.TestEventResumeStream(t, agentpb.Topic_ServiceHealth, opts.MinIndex)
client.QueueEvents(&resumeEv)
}()
// Next fetch should return the error
start := time.Now()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.Error(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until error was sent")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
// We don't require instances to be returned in same order so we use
// elementsMatch which is recursive.
requireResultsSame(t,
opts.LastResult.Value.(*structs.IndexedCheckServiceNodes),
result.Value.(*structs.IndexedCheckServiceNodes),
)
opts.MinIndex = result.Index
opts.LastResult = &result
// But an update should still be noticed due to reconnection
healthEv := agentpb.TestEventServiceHealthRegister(t, opts.MinIndex+5, 3, "web")
client.QueueEvents(&healthEv)
opts.Timeout = time.Second
result, err = typ.Fetch(opts, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, opts.MinIndex+5, result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
}))
}
// requireResultsSame compares two IndexedCheckServiceNodes without requiring
// the same order of results (which vary due to map usage internally).
func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) {
require.Equal(t, want.Index, got.Index)
svcIDs := func(csns structs.CheckServiceNodes) []string {
res := make([]string, 0, len(csns))
for _, csn := range csns {
res = append(res, fmt.Sprintf("%s/%s", csn.Node.Node, csn.Service.ID))
}
return res
}
gotIDs := svcIDs(got.Nodes)
wantIDs := svcIDs(want.Nodes)
require.ElementsMatch(t, wantIDs, gotIDs)
}
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
client := NewTestStreamingClient()
typ := StreamingHealthServices{
client: client,
logger: hclog.Default(),
}
// Create an initial snapshot of 3 instances on different nodes
makeReg := func(index uint64, nodeNum int) *agentpb.Event {
e := agentpb.TestEventServiceHealthRegister(t, index, nodeNum, "web")
return &e
}
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5)
client.QueueEvents(
makeReg(5, 1),
makeReg(5, 2),
makeReg(5, 3),
&eosEv,
)
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
return nodes
}
require.True(t, t.Run("full snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
}))
require.True(t, t.Run("blocks until deregistration", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update
// event part way through.
start := time.Now()
go func() {
time.Sleep(200 * time.Millisecond)
// Deregister instance on node1
healthEv := agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web")
client.QueueEvents(&healthEv)
}()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until the event was delivered")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(20), result.Index)
require.ElementsMatch(t, []string{"node2", "node3"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
}))
require.True(t, t.Run("server reload is respected", func(t *testing.T) {
// Simulates the server noticing the request's ACL token privs changing. To
// detect this we'll queue up the new snapshot as a different set of nodes
// to the first.
resetEv := agentpb.TestEventResetStream(t, agentpb.Topic_ServiceHealth, 45)
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 50)
client.QueueEvents(
&resetEv,
makeReg(50, 3), // overlap existing node
makeReg(50, 4),
makeReg(50, 5),
&eosEv,
)
// Make another blocking query with THE SAME index. It should immediately
// return the new snapshot.
start := time.Now()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index)
require.ElementsMatch(t, []string{"node3", "node4", "node5"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
}))
}
func TestStreamingHealthServices_EventBatches(t *testing.T) {
client := NewTestStreamingClient()
typ := StreamingHealthServices{
client: client,
logger: hclog.Default(),
}
// Create an initial snapshot of 3 instances but in a single event batch
batchEv := agentpb.TestEventBatchWithEvents(t,
agentpb.TestEventServiceHealthRegister(t, 5, 1, "web"),
agentpb.TestEventServiceHealthRegister(t, 5, 2, "web"),
agentpb.TestEventServiceHealthRegister(t, 5, 3, "web"),
)
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5)
client.QueueEvents(
&batchEv,
&eosEv,
)
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
return nodes
}
require.True(t, t.Run("full snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
}))
require.True(t, t.Run("batched updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (so all have same
// index)
batchEv := agentpb.TestEventBatchWithEvents(t,
// Deregister an existing node
agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web"),
// Register another
agentpb.TestEventServiceHealthRegister(t, 20, 4, "web"),
)
client.QueueEvents(&batchEv)
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(20), result.Index)
require.ElementsMatch(t, []string{"node2", "node3", "node4"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
}))
}
func TestStreamingHealthServices_Filtering(t *testing.T) {
client := NewTestStreamingClient()
typ := StreamingHealthServices{
client: client,
logger: hclog.Default(),
}
// Create an initial snapshot of 3 instances but in a single event batch
batchEv := agentpb.TestEventBatchWithEvents(t,
agentpb.TestEventServiceHealthRegister(t, 5, 1, "web"),
agentpb.TestEventServiceHealthRegister(t, 5, 2, "web"),
agentpb.TestEventServiceHealthRegister(t, 5, 3, "web"),
)
eosEv := agentpb.TestEventEndOfSnapshot(t, agentpb.Topic_ServiceHealth, 5)
client.QueueEvents(
&batchEv,
&eosEv,
)
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
},
}
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
return nodes
}
require.True(t, t.Run("filtered snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
require.ElementsMatch(t, []string{"node2"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
}))
require.True(t, t.Run("filtered updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (so all have same
// index)
batchEv := agentpb.TestEventBatchWithEvents(t,
// Deregister an existing node
agentpb.TestEventServiceHealthDeregister(t, 20, 1, "web"),
// Register another
agentpb.TestEventServiceHealthRegister(t, 20, 4, "web"),
)
client.QueueEvents(&batchEv)
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(20), result.Index)
require.ElementsMatch(t, []string{"node2"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
}))
}

View File

@ -0,0 +1,447 @@
package cachetype
import (
"context"
"errors"
"sync"
"time"
"github.com/hashicorp/consul/agent/agentpb"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
)
const (
// SubscribeBackoffMax controls the range of exponential backoff when errors
// are returned from subscriptions.
SubscribeBackoffMax = 60 * time.Second
)
// StreamingClient is the interface we need from the gRPC client stub. Separate
// interface simplifies testing.
type StreamingClient interface {
Subscribe(ctx context.Context, in *agentpb.SubscribeRequest, opts ...grpc.CallOption) (agentpb.Consul_SubscribeClient, error)
}
// MaterializedViewState is the interface used to manage they type-specific
// materialized view logic.
type MaterializedViewState interface {
// InitFilter is called once when the view is constructed if the subscription
// has a non-empty Filter argument. The implementor is expected to create a
// *bexpr.Filter and store it locally so it can be used to filter events
// and/or results. Ideally filtering should occur inside `Update` calls such
// that we don't store objects in the view state that are just filtered when
// the result is returned, however in some cases it might not be possible and
// the type may choose to store the whole view and only apply filtering in the
// Result method just before returning a result.
InitFilter(expression string) error
// Update is called when one or more events are received. The first call will
// include _all_ events in the initial snapshot which may be an empty set.
// Subsequent calls will contain one or more update events in the order they
// are received.
Update(events []*agentpb.Event) error
// Result returns the type-specific cache result based on the state. When no
// events have been delivered yet the result should be an empty value type
// suitable to return to clients in case there is an empty result on the
// servers. The index the materialized view represents is maintained
// separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining
// indexes seen during Update.
Result(index uint64) (interface{}, error)
}
// StreamingCacheType is the interface a cache-type needs to implement to make
// use of streaming as the transport for updates from the server.
type StreamingCacheType interface {
NewMaterializedViewState() MaterializedViewState
StreamingClient() StreamingClient
Logger() hclog.Logger
}
// temporary is a private interface as used by net and other std lib packages to
// show error types represent temporary/recoverable errors.
type temporary interface {
Temporary() bool
}
// resetErr represents a server request to reset the subscription, it's typed so
// we can mark it as temporary and so attempt to retry first time without
// notifying clients.
type resetErr string
// Temporary Implements the internal Temporary interface
func (e resetErr) Temporary() bool {
return true
}
// Error implements error
func (e resetErr) Error() string {
return string(e)
}
// MaterializedView is a partial view of the state on servers, maintained via
// streaming subscriptions. It is specialized for different cache types by
// providing a MaterializedViewState that encapsulates the logic to update the
// state and format it as the correct result type.
//
// The MaterializedView object becomes the cache.Result.State for a streaming
// cache type and manages the actual streaming RPC call to the servers behind
// the scenes until the cache result is discarded when TTL expires.
type MaterializedView struct {
// Properties above the lock are immutable after the view is constructed in
// MaterializedViewFromFetch and must not be modified.
typ StreamingCacheType
client StreamingClient
logger hclog.Logger
req agentpb.SubscribeRequest
ctx context.Context
cancelFunc func()
// l protects the mutable state - all fields below it must only be accessed
// while holding l.
l sync.Mutex
index uint64
state MaterializedViewState
snapshotDone bool
updateCh chan struct{}
retryWaiter *lib.RetryWaiter
err error
fatalErr error
}
// MaterializedViewFromFetch retrieves an existing view from the cache result
// state if one exists, otherwise creates a new one. Note that the returned view
// MUST have Close called eventually to avoid leaking resources. Typically this
// is done automatically if the view is returned in a cache.Result.State when
// the cache evicts the result. If the view is not returned in a result state
// though Close must be called some other way to avoid leaking the goroutine and
// memory.
func MaterializedViewFromFetch(t StreamingCacheType, opts cache.FetchOptions,
subReq agentpb.SubscribeRequest) *MaterializedView {
if opts.LastResult == nil || opts.LastResult.State == nil {
ctx, cancel := context.WithCancel(context.Background())
v := &MaterializedView{
typ: t,
client: t.StreamingClient(),
logger: t.Logger(),
req: subReq,
ctx: ctx,
cancelFunc: cancel,
// Allow first retry without wait, this is important and we rely on it in
// tests.
retryWaiter: lib.NewRetryWaiter(1, 0, SubscribeBackoffMax,
lib.NewJitterRandomStagger(100)),
}
// Run init now otherwise there is a race between run() and a call to Fetch
// which expects a view state to exist.
v.reset()
go v.run()
return v
}
return opts.LastResult.State.(*MaterializedView)
}
// Close implements io.Close and discards view state and stops background view
// maintenance.
func (v *MaterializedView) Close() error {
v.l.Lock()
defer v.l.Unlock()
if v.cancelFunc != nil {
v.cancelFunc()
}
return nil
}
func (v *MaterializedView) run() {
if v.ctx.Err() != nil {
return
}
// Loop in case stream resets and we need to start over
for {
// Run a subscribe call until it fails
err := v.runSubscription()
if err != nil {
// Check if the view closed
if v.ctx.Err() != nil {
// Err doesn't matter and is likely just context cancelled
return
}
v.l.Lock()
// If this is a temporary error and it's the first consecutive failure,
// retry to see if we can get a result without erroring back to clients.
// If it's non-temporary or a repeated failure return to clients while we
// retry to get back in a good state.
if _, ok := err.(temporary); !ok || v.retryWaiter.Failures() > 0 {
// Report error to blocked fetchers
v.err = err
v.notifyUpdateLocked()
}
waitCh := v.retryWaiter.Failed()
failures := v.retryWaiter.Failures()
v.l.Unlock()
// Exponential backoff to avoid hammering servers if they are closing
// conns because of overload or resetting based on errors.
v.logger.Error("subscribe call failed", "err", err, "topic", v.req.Topic,
"key", v.req.Key, "failure_count", failures)
select {
case <-v.ctx.Done():
return
case <-waitCh:
}
}
// Loop and keep trying to resume subscription after error
}
}
// runSubscription opens a new subscribe streaming call to the servers and runs
// for it's lifetime or until the view is closed.
func (v *MaterializedView) runSubscription() error {
ctx, cancel := context.WithCancel(v.ctx)
defer cancel()
// Copy the request template
req := v.req
v.l.Lock()
// Update request index to be the current view index in case we are resuming a
// broken stream.
req.Index = v.index
// Make local copy so we don't have to read with a lock for every event. We
// are the only goroutine that can update so we know it won't change without
// us knowing but we do need lock to protect external readers when we update.
snapshotDone := v.snapshotDone
v.l.Unlock()
s, err := v.client.Subscribe(ctx, &req)
if err != nil {
return err
}
snapshotEvents := make([]*agentpb.Event, 0)
for {
event, err := s.Recv()
if err != nil {
return err
}
if event.GetResetStream() {
// Server has requested we reset the view and start with a fresh snapshot
// - perhaps because our ACL policy changed. We reset the view state and
// then return an error to allow the `run` method to retry after a backoff
// if necessary.
v.reset()
return resetErr("stream reset requested")
}
if event.GetEndOfSnapshot() {
// Hold lock while mutating view state so implementer doesn't need to
// worry about synchronization.
v.l.Lock()
// Deliver snapshot events to the View state
if err := v.state.Update(snapshotEvents); err != nil {
v.l.Unlock()
// This error is kinda fatal to the view - we didn't apply some events
// the server sent us which means our view is now not in sync. The only
// thing we can do is start over and hope for a better outcome.
v.reset()
return err
}
// Done collecting these now
snapshotEvents = nil
v.snapshotDone = true
// update our local copy so we can read it without lock.
snapshotDone = true
v.index = event.Index
// We have a good result, reset the error flag
v.err = nil
v.retryWaiter.Reset()
// Notify watchers of the update to the view
v.notifyUpdateLocked()
v.l.Unlock()
continue
}
if event.GetResumeStream() {
// We've opened a new subscribe with a non-zero index to resume a
// connection and the server confirms it's not sending a new snapshot.
if !snapshotDone {
// We've somehow got into a bad state here - the server thinks we have
// an up-to-date snapshot but we don't think we do. Reset and start
// over.
v.reset()
return errors.New("stream resume sent but no local snapshot")
}
// Just continue on as we were!
continue
}
// We have an event for the topic
events := []*agentpb.Event{event}
// If the event is a batch, unwrap and deliver the raw events
if batch := event.GetEventBatch(); batch != nil {
events = batch.Events
}
if snapshotDone {
// We've already got a snapshot, this is an update, deliver it right away.
v.l.Lock()
if err := v.state.Update(events); err != nil {
v.l.Unlock()
// This error is kinda fatal to the view - we didn't apply some events
// the server sent us which means our view is now not in sync. The only
// thing we can do is start over and hope for a better outcome.
v.reset()
return err
}
// Notify watchers of the update to the view
v.index = event.Index
// We have a good result, reset the error flag
v.err = nil
v.retryWaiter.Reset()
v.notifyUpdateLocked()
v.l.Unlock()
} else {
snapshotEvents = append(snapshotEvents, events...)
}
}
}
// reset clears the state ready to start a new stream from scratch.
func (v *MaterializedView) reset() {
v.l.Lock()
defer v.l.Unlock()
v.state = v.typ.NewMaterializedViewState()
if v.req.Filter != "" {
if err := v.state.InitFilter(v.req.Filter); err != nil {
// If this errors we are stuck - it's fatal for the whole request as it
// means there was a bug or an invalid filter string we couldn't parse.
// Stop the whole view by closing it and cancelling context, but also set
// the error internally so that Fetch calls can return a meaningful error
// and not just "context cancelled".
v.fatalErr = err
if v.cancelFunc != nil {
v.cancelFunc()
}
return
}
}
v.notifyUpdateLocked()
// Always start from zero when we have a new state so we load a snapshot from
// the servers.
v.index = 0
v.snapshotDone = false
v.err = nil
v.retryWaiter.Reset()
}
// notifyUpdateLocked closes the current update channel and recreates a new
// one. It must be called while holding the s.l lock.
func (v *MaterializedView) notifyUpdateLocked() {
if v.updateCh != nil {
close(v.updateCh)
}
v.updateCh = make(chan struct{})
}
// Fetch implements the logic a StreamingCacheType will need during it's Fetch
// call. Cache types that use streaming should just be able to proxy to this
// once they have a subscription object and return it's results directly.
func (v *MaterializedView) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
var result cache.FetchResult
// Get current view Result and index
v.l.Lock()
index := v.index
val, err := v.state.Result(v.index)
updateCh := v.updateCh
v.l.Unlock()
if err != nil {
return result, err
}
result.Index = index
result.Value = val
result.State = v
// If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan. Note it's opts.MinIndex that the cache is using here the
// request min index might be different and from initial user request.
if index > 0 && index > opts.MinIndex {
return result, nil
}
// Watch for timeout of the Fetch. Note it's opts.Timeout not req.Timeout
// since that is the timeout the client requested from the cache Get while the
// options one is the internal "background refresh" timeout which is what the
// Fetch call should be using.
timeoutCh := time.After(opts.Timeout)
for {
select {
case <-updateCh:
// View updated, return the new result
v.l.Lock()
result.Index = v.index
// Grab the new updateCh in case we need to keep waiting for the next
// update.
updateCh = v.updateCh
fetchErr := v.err
if fetchErr == nil {
// Only generate a new result if there was no error to avoid pointless
// work potentially shuffling the same data around.
result.Value, err = v.state.Result(v.index)
}
v.l.Unlock()
// If there was a non-transient error return it
if fetchErr != nil {
return result, fetchErr
}
if err != nil {
return result, err
}
// Sanity check the update is actually later than the one the user
// requested.
if result.Index <= opts.MinIndex {
// The result is still older/same as the requested index, continue to
// wait for further updates.
continue
}
// Return the updated result
return result, nil
case <-timeoutCh:
// Just return whatever we got originally, might still be empty
return result, nil
case <-v.ctx.Done():
v.l.Lock()
err := v.fatalErr
v.l.Unlock()
if err != nil {
return result, err
}
return result, v.ctx.Err()
}
}
}

View File

@ -0,0 +1,67 @@
package cachetype
import (
"context"
"github.com/hashicorp/consul/agent/agentpb"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
events chan eventOrErr
ctx context.Context
}
type eventOrErr struct {
Err error
Event *agentpb.Event
}
func NewTestStreamingClient() *TestStreamingClient {
return &TestStreamingClient{
events: make(chan eventOrErr, 32),
}
}
func (t *TestStreamingClient) Subscribe(ctx context.Context, in *agentpb.SubscribeRequest, opts ...grpc.CallOption) (agentpb.Consul_SubscribeClient, error) {
t.ctx = ctx
return t, nil
}
func (t *TestStreamingClient) QueueEvents(events ...*agentpb.Event) {
for _, e := range events {
t.events <- eventOrErr{Event: e}
}
}
func (t *TestStreamingClient) QueueErr(err error) {
t.events <- eventOrErr{Err: err}
}
func (t *TestStreamingClient) Recv() (*agentpb.Event, error) {
select {
case eoe := <-t.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}
func (t *TestStreamingClient) Header() (metadata.MD, error) { return nil, nil }
func (t *TestStreamingClient) Trailer() metadata.MD { return nil }
func (t *TestStreamingClient) CloseSend() error { return nil }
func (t *TestStreamingClient) Context() context.Context { return nil }
func (t *TestStreamingClient) SendMsg(m interface{}) error { return nil }
func (t *TestStreamingClient) RecvMsg(m interface{}) error { return nil }

View File

@ -18,6 +18,7 @@ import (
"container/heap" "container/heap"
"context" "context"
"fmt" "fmt"
io "io"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -773,6 +774,12 @@ func (c *Cache) runExpiryLoop() {
case <-expiryCh: case <-expiryCh:
c.entriesLock.Lock() c.entriesLock.Lock()
// Perform cleanup operations on the entry's state, if applicable.
state := c.entries[entry.Key].State
if closer, ok := state.(io.Closer); ok {
closer.Close()
}
// Entry expired! Remove it. // Entry expired! Remove it.
delete(c.entries, entry.Key) delete(c.entries, entry.Key)
heap.Remove(c.entriesExpiryHeap, entry.HeapIndex) heap.Remove(c.entriesExpiryHeap, entry.HeapIndex)

View File

@ -841,6 +841,50 @@ func TestCacheGet_expireResetGet(t *testing.T) {
typ.AssertExpectations(t) typ.AssertExpectations(t)
} }
type testCloser struct {
closed bool
}
func (t *testCloser) Close() error {
t.closed = true
return nil
}
// Test that entries with state that satisfies io.Closer get cleaned up
func TestCacheGet_expireClose(t *testing.T) {
t.Parallel()
require := require.New(t)
typ := TestType(t)
defer typ.AssertExpectations(t)
c := New(Options{})
typ.On("RegisterOptions").Return(RegisterOptions{
LastGetTTL: 100 * time.Millisecond,
})
// Register the type with a timeout
c.RegisterType("t", typ)
// Configure the type
state := &testCloser{}
typ.Static(FetchResult{Value: 42, State: state}, nil).Times(1)
ctx := context.Background()
req := TestRequest(t, RequestInfo{Key: "hello"})
result, meta, err := c.Get(ctx, "t", req)
require.NoError(err)
require.Equal(42, result)
require.False(meta.Hit)
require.False(state.closed)
// Sleep for the expiry
time.Sleep(200 * time.Millisecond)
// state.Close() should have been called
require.True(state.closed)
}
// Test a Get with a request that returns the same cache key across // Test a Get with a request that returns the same cache key across
// two different "types" returns two separate results. // two different "types" returns two separate results.
func TestCacheGet_duplicateKeyDifferentType(t *testing.T) { func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {