Merge pull request #8809 from hashicorp/streaming/materialize-view

Add StreamingHealthServices cache-type
This commit is contained in:
Daniel Nephin 2020-10-07 21:26:38 -04:00 committed by GitHub
commit 2513f42c68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1337 additions and 2 deletions

View File

@ -0,0 +1,173 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
)
func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Topic: topic,
Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}
}
func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic) *pbsubscribe.Event {
return &pbsubscribe.Event{
Topic: topic,
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
}
}
// newEventServiceHealthRegister returns a realistically populated service
// health registration event for tests. The nodeNum is a
// logical node and is used to create the node name ("node%d") but also change
// the node ID and IP address to make it a little more realistic for cases that
// need that. nodeNum should be less than 64k to make the IP address look
// realistic. Any other changes can be made on the returned event to avoid
// adding too many options to callers.
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: svc,
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
ID: nodeID,
Node: node,
Address: addr,
Datacenter: "dc1",
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
// Empty sadness
Proxy: pbservice.ConnectProxyConfig{
MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{},
},
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Checks: []*pbservice.HealthCheck{
{
Node: node,
CheckID: "serf-health",
Name: "serf-health",
Status: "passing",
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
{
Node: node,
CheckID: types.CheckID("service:" + svc),
Name: "service:" + svc,
ServiceID: svc,
ServiceName: svc,
Type: "ttl",
Status: "passing",
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
},
}
}
// TestEventServiceHealthDeregister returns a realistically populated service
// health deregistration event for tests. The nodeNum is a
// logical node and is used to create the node name ("node%d") but also change
// the node ID and IP address to make it a little more realistic for cases that
// need that. nodeNum should be less than 64k to make the IP address look
// realistic. Any other changes can be made on the returned event to avoid
// adding too many options to callers.
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: svc,
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: node,
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
// Empty sadness
Proxy: pbservice.ConnectProxyConfig{
MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{},
},
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
// The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we
// setup at index 10 and then mutate at index 100. It can be
// modified by the caller later and makes it easier than having
// yet another argument in the common case.
CreateIndex: 10,
ModifyIndex: 10,
},
},
},
},
},
}
}
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
events := make([]*pbsubscribe.Event, len(evs)+1)
events[0] = first
for i := range evs {
events[i+1] = evs[i]
}
return &pbsubscribe.Event{
Topic: first.Topic,
Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events},
},
}
}

View File

@ -0,0 +1,194 @@
package cachetype
import (
"context"
"fmt"
"time"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
const (
// Recommended name for registration.
StreamingHealthServicesName = "streaming-health-services"
)
// StreamingHealthServices supports fetching discovering service instances via the
// catalog using the streaming gRPC endpoint.
type StreamingHealthServices struct {
RegisterOptionsBlockingRefresh
deps MaterializerDeps
}
// NewStreamingHealthServices creates a cache-type for watching for service
// health results via streaming updates.
func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices {
return &StreamingHealthServices{deps: deps}
}
type MaterializerDeps struct {
Client submatview.StreamClient
Logger hclog.Logger
}
// Fetch service health from the materialized view. If no materialized view
// exists, create one and start it running in a goroutine. The goroutine will
// exit when the cache entry storing the result is expired, the cache will call
// Close on the result.State.
//
// Fetch implements part of the cache.Type interface, and assumes that the
// caller ensures that only a single call to Fetch is running at any time.
func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
if opts.LastResult != nil && opts.LastResult.State != nil {
return opts.LastResult.State.(*streamingHealthState).Fetch(opts)
}
srvReq := req.(*structs.ServiceSpecificRequest)
newReqFn := func(index uint64) pbsubscribe.SubscribeRequest {
req := pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: srvReq.ServiceName,
Token: srvReq.Token,
Datacenter: srvReq.Datacenter,
Index: index,
// TODO(streaming): set Namespace from srvReq.EnterpriseMeta.Namespace
}
if srvReq.Connect {
req.Topic = pbsubscribe.Topic_ServiceHealthConnect
}
return req
}
materializer, err := newMaterializer(c.deps, newReqFn, srvReq.Filter)
if err != nil {
return cache.FetchResult{}, err
}
ctx, cancel := context.WithCancel(context.TODO())
go materializer.Run(ctx)
state := &streamingHealthState{
materializer: materializer,
done: ctx.Done(),
cancel: cancel,
}
return state.Fetch(opts)
}
func newMaterializer(
deps MaterializerDeps,
newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
filter string,
) (*submatview.Materializer, error) {
view, err := newHealthView(filter)
if err != nil {
return nil, err
}
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: deps.Client,
Logger: deps.Logger,
Waiter: &retry.Waiter{
MinFailures: 1,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
},
Request: newRequestFn,
}), nil
}
// streamingHealthState wraps a Materializer to manage its lifecycle, and to
// add itself to the FetchResult.State.
type streamingHealthState struct {
materializer *submatview.Materializer
done <-chan struct{}
cancel func()
}
func (s *streamingHealthState) Close() error {
s.cancel()
return nil
}
func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
result, err := s.materializer.Fetch(s.done, opts)
result.State = s
return result, err
}
func newHealthView(filterExpr string) (*healthView, error) {
s := &healthView{state: make(map[string]structs.CheckServiceNode)}
// We apply filtering to the raw CheckServiceNodes before we are done mutating
// state in Update to save from storing stuff in memory we'll only filter
// later. Because the state is just a map of those types, we can simply run
// that map through filter and it will remove any entries that don't match.
var err error
s.filter, err = bexpr.CreateFilter(filterExpr, nil, s.state)
return s, err
}
// healthView implements submatview.View for storing the view state
// of a service health result. We store it as a map to make updates and
// deletions a little easier but we could just store a result type
// (IndexedCheckServiceNodes) and update it in place for each event - that
// involves re-sorting each time etc. though.
type healthView struct {
state map[string]structs.CheckServiceNode
filter *bexpr.Filter
}
// Update implements View
func (s *healthView) Update(events []*pbsubscribe.Event) error {
for _, event := range events {
serviceHealth := event.GetServiceHealth()
if serviceHealth == nil {
return fmt.Errorf("unexpected event type for service health view: %T",
event.GetPayload())
}
node := serviceHealth.CheckServiceNode
id := fmt.Sprintf("%s/%s", node.Node.Node, node.Service.ID)
switch serviceHealth.Op {
case pbsubscribe.CatalogOp_Register:
checkServiceNode := pbservice.CheckServiceNodeToStructs(serviceHealth.CheckServiceNode)
s.state[id] = *checkServiceNode
case pbsubscribe.CatalogOp_Deregister:
delete(s.state, id)
}
}
if s.filter != nil {
filtered, err := s.filter.Execute(s.state)
if err != nil {
return err
}
s.state = filtered.(map[string]structs.CheckServiceNode)
}
return nil
}
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
func (s *healthView) Result(index uint64) (interface{}, error) {
result := structs.IndexedCheckServiceNodes{
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
QueryMeta: structs.QueryMeta{
Index: index,
},
}
for _, node := range s.state {
result.Nodes = append(result.Nodes, node)
}
return &result, nil
}
func (s *healthView) Reset() {
s.state = make(map[string]structs.CheckServiceNode)
}

View File

@ -0,0 +1,493 @@
package cachetype
import (
"errors"
"fmt"
"sort"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
client := NewTestStreamingClient()
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client,
Logger: hclog.Default(),
}}
// Initially there are no services registered. Server should send an
// EndOfSnapshot message immediately with index of 1.
client.QueueEvents(newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 1))
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
empty := &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{},
QueryMeta: structs.QueryMeta{
Index: 1,
},
}
runStep(t, "empty snapshot returned", func(t *testing.T) {
// Fetch should return an empty
// result of the right type with a non-zero index, and in the background begin
// streaming updates.
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(1), result.Index)
require.Equal(t, empty, result.Value)
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "blocks for timeout", func(t *testing.T) {
// Subsequent fetch should block for the timeout
start := time.Now()
opts.Timeout = 200 * time.Millisecond
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
require.Equal(t, empty, result.Value, "result value should not have changed")
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "blocks until update", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update
// event part way through.
start := time.Now()
go func() {
time.Sleep(200 * time.Millisecond)
client.QueueEvents(newEventServiceHealthRegister(4, 1, "web"))
}()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until the event was delivered")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 1,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
client.QueueErr(tempError("broken pipe"))
// Next fetch will continue to block until timeout and receive the same
// result.
start := time.Now()
opts.Timeout = 200 * time.Millisecond
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
require.Equal(t, opts.LastResult.Value, result.Value, "result value should not have changed")
opts.MinIndex = result.Index
opts.LastResult = &result
// But an update should still be noticed due to reconnection
client.QueueEvents(newEventServiceHealthRegister(10, 2, "web"))
start = time.Now()
opts.Timeout = time.Second
result, err = typ.Fetch(opts, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 2,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
// Wait and send the error while fetcher is waiting
go func() {
time.Sleep(200 * time.Millisecond)
client.QueueErr(errors.New("invalid request"))
}()
// Next fetch should return the error
start := time.Now()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.Error(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until error was sent")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
// We don't require instances to be returned in same order so we use
// elementsMatch which is recursive.
requireResultsSame(t,
opts.LastResult.Value.(*structs.IndexedCheckServiceNodes),
result.Value.(*structs.IndexedCheckServiceNodes),
)
opts.MinIndex = result.Index
opts.LastResult = &result
// But an update should still be noticed due to reconnection
client.QueueEvents(newEventServiceHealthRegister(opts.MinIndex+5, 3, "web"))
opts.Timeout = time.Second
result, err = typ.Fetch(opts, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, opts.MinIndex+5, result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
})
}
type tempError string
func (e tempError) Error() string {
return string(e)
}
func (e tempError) Temporary() bool {
return true
}
// requireResultsSame compares two IndexedCheckServiceNodes without requiring
// the same order of results (which vary due to map usage internally).
func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) {
require.Equal(t, want.Index, got.Index)
svcIDs := func(csns structs.CheckServiceNodes) []string {
res := make([]string, 0, len(csns))
for _, csn := range csns {
res = append(res, fmt.Sprintf("%s/%s", csn.Node.Node, csn.Service.ID))
}
return res
}
gotIDs := svcIDs(got.Nodes)
wantIDs := svcIDs(want.Nodes)
require.ElementsMatch(t, wantIDs, gotIDs)
}
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
client := NewTestStreamingClient()
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client,
Logger: hclog.Default(),
}}
// Create an initial snapshot of 3 instances on different nodes
registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event {
return newEventServiceHealthRegister(index, nodeNum, "web")
}
client.QueueEvents(
registerServiceWeb(5, 1),
registerServiceWeb(5, 2),
registerServiceWeb(5, 3),
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
sort.Strings(nodes)
return nodes
}
runStep(t, "full snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "blocks until deregistration", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update
// event part way through.
start := time.Now()
go func() {
time.Sleep(200 * time.Millisecond)
// Deregister instance on node1
client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web"))
}()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until the event was delivered")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(20), result.Index)
require.Equal(t, []string{"node2", "node3"}, gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "server reload is respected", func(t *testing.T) {
// Simulates the server noticing the request's ACL token privs changing. To
// detect this we'll queue up the new snapshot as a different set of nodes
// to the first.
client.QueueErr(status.Error(codes.Aborted, "reset by server"))
client.QueueEvents(
registerServiceWeb(50, 3), // overlap existing node
registerServiceWeb(50, 4),
registerServiceWeb(50, 5),
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50))
// Make another blocking query with THE SAME index. It should immediately
// return the new snapshot.
start := time.Now()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index)
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) {
client.QueueErr(tempError("temporary connection error"))
client.QueueEvents(
newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth),
registerServiceWeb(50, 3), // overlap existing node
registerServiceWeb(50, 4),
registerServiceWeb(50, 5),
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50))
start := time.Now()
opts.MinIndex = 49
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index)
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value))
})
}
func TestStreamingHealthServices_EventBatches(t *testing.T) {
client := NewTestStreamingClient()
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client,
Logger: hclog.Default(),
}}
// Create an initial snapshot of 3 instances but in a single event batch
batchEv := newEventBatchWithEvents(
newEventServiceHealthRegister(5, 1, "web"),
newEventServiceHealthRegister(5, 2, "web"),
newEventServiceHealthRegister(5, 3, "web"))
client.QueueEvents(
batchEv,
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
return nodes
}
runStep(t, "full snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "batched updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (so all have same
// index)
batchEv := newEventBatchWithEvents(
// Deregister an existing node
newEventServiceHealthDeregister(20, 1, "web"),
// Register another
newEventServiceHealthRegister(20, 4, "web"),
)
client.QueueEvents(batchEv)
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(20), result.Index)
require.ElementsMatch(t, []string{"node2", "node3", "node4"},
gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
})
}
func TestStreamingHealthServices_Filtering(t *testing.T) {
client := NewTestStreamingClient()
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client,
Logger: hclog.Default(),
}}
// Create an initial snapshot of 3 instances but in a single event batch
batchEv := newEventBatchWithEvents(
newEventServiceHealthRegister(5, 1, "web"),
newEventServiceHealthRegister(5, 2, "web"),
newEventServiceHealthRegister(5, 3, "web"))
client.QueueEvents(
batchEv,
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
},
}
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
return nodes
}
runStep(t, "filtered snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(5), result.Index)
require.Equal(t, []string{"node2"}, gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "filtered updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (so all have same
// index)
batchEv := newEventBatchWithEvents(
// Deregister an existing node
newEventServiceHealthDeregister(20, 1, "web"),
// Register another
newEventServiceHealthRegister(20, 4, "web"),
)
client.QueueEvents(batchEv)
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(20), result.Index)
require.Equal(t, []string{"node2"}, gatherNodes(result.Value))
opts.MinIndex = result.Index
opts.LastResult = &result
})
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}

View File

@ -0,0 +1,59 @@
package cachetype
import (
"context"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
pbsubscribe.StateChangeSubscription_SubscribeClient
events chan eventOrErr
ctx context.Context
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func NewTestStreamingClient() *TestStreamingClient {
return &TestStreamingClient{
events: make(chan eventOrErr, 32),
}
}
func (t *TestStreamingClient) Subscribe(
ctx context.Context,
_ *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
t.ctx = ctx
return t, nil
}
func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
for _, e := range events {
t.events <- eventOrErr{Event: e}
}
}
func (t *TestStreamingClient) QueueErr(err error) {
t.events <- eventOrErr{Err: err}
}
func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-t.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}

10
agent/cache/cache.go vendored
View File

@ -18,14 +18,16 @@ import (
"container/heap"
"context"
"fmt"
"io"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/lib"
)
//go:generate mockery -all -inpkg
@ -773,6 +775,12 @@ func (c *Cache) runExpiryLoop() {
case <-expiryCh:
c.entriesLock.Lock()
// Perform cleanup operations on the entry's state, if applicable.
state := c.entries[entry.Key].State
if closer, ok := state.(io.Closer); ok {
closer.Close()
}
// Entry expired! Remove it.
delete(c.entries, entry.Key)
heap.Remove(c.entriesExpiryHeap, entry.HeapIndex)

View File

@ -10,11 +10,12 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/sdk/testutil"
)
// Test a basic Get with no indexes (and therefore no blocking queries).
@ -841,6 +842,56 @@ func TestCacheGet_expireResetGet(t *testing.T) {
typ.AssertExpectations(t)
}
// Test that entries with state that satisfies io.Closer get cleaned up
func TestCacheGet_expireClose(t *testing.T) {
t.Parallel()
require := require.New(t)
typ := &MockType{}
defer typ.AssertExpectations(t)
c := New(Options{})
defer c.Close()
typ.On("RegisterOptions").Return(RegisterOptions{
SupportsBlocking: true,
LastGetTTL: 100 * time.Millisecond,
})
// Register the type with a timeout
c.RegisterType("t", typ)
// Configure the type
state := &testCloser{}
typ.Static(FetchResult{Value: 42, State: state}, nil).Times(1)
ctx := context.Background()
req := TestRequest(t, RequestInfo{Key: "hello"})
result, meta, err := c.Get(ctx, "t", req)
require.NoError(err)
require.Equal(42, result)
require.False(meta.Hit)
require.False(state.isClosed())
// Sleep for the expiry
time.Sleep(200 * time.Millisecond)
// state.Close() should have been called
require.True(state.isClosed())
}
type testCloser struct {
closed uint32
}
func (t *testCloser) Close() error {
atomic.SwapUint32(&t.closed, 1)
return nil
}
func (t *testCloser) isClosed() bool {
return atomic.LoadUint32(&t.closed) == 1
}
// Test a Get with a request that returns the same cache key across
// two different "types" returns two separate results.
func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {

View File

@ -61,6 +61,7 @@ type SnapshotHandlers map[Topic]SnapshotFunc
// SnapshotFunc builds a snapshot for the subscription request, and appends the
// events to the Snapshot using SnapshotAppender.
// If err is not nil the SnapshotFunc must return a non-zero index.
type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)
// SnapshotAppender appends groups of events to create a Snapshot of state.

View File

@ -374,6 +374,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}

View File

@ -0,0 +1,70 @@
package submatview
import (
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// eventHandler is a function which performs some operation on the received
// events, then returns the eventHandler that should be used for the next set
// of events.
// If eventHandler fails to handle the events it may return an error. If an
// error is returned the next eventHandler will be ignored.
// eventHandler is used to implement a very simple finite-state machine.
type eventHandler func(state viewState, events *pbsubscribe.Event) (next eventHandler, err error)
type viewState interface {
updateView(events []*pbsubscribe.Event, index uint64) error
reset()
}
func initialHandler(index uint64) eventHandler {
if index == 0 {
return newSnapshotHandler()
}
return resumeStreamHandler
}
// snapshotHandler accumulates events. When it receives an EndOfSnapshot event
// it updates the view, and then returns eventStreamHandler to handle new events.
type snapshotHandler struct {
events []*pbsubscribe.Event
}
func newSnapshotHandler() eventHandler {
return (&snapshotHandler{}).handle
}
func (h *snapshotHandler) handle(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
if event.GetEndOfSnapshot() {
err := state.updateView(h.events, event.Index)
return eventStreamHandler, err
}
h.events = append(h.events, eventsFromEvent(event)...)
return h.handle, nil
}
// eventStreamHandler handles events by updating the view. It always returns
// itself as the next handler.
func eventStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
err := state.updateView(eventsFromEvent(event), event.Index)
return eventStreamHandler, err
}
func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event {
if batch := event.GetEventBatch(); batch != nil {
return batch.Events
}
return []*pbsubscribe.Event{event}
}
// resumeStreamHandler checks if the event is a NewSnapshotToFollow event. If it
// is it resets the view and returns a snapshotHandler to handle the next event.
// Otherwise it uses eventStreamHandler to handle events.
func resumeStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
if event.GetNewSnapshotToFollow() {
state.reset()
return newSnapshotHandler(), nil
}
return eventStreamHandler(state, event)
}

View File

@ -0,0 +1,285 @@
package submatview
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// View receives events from, and return results to, Materializer. A view is
// responsible for converting the pbsubscribe.Event.Payload into the local
// type, and storing it so that it can be returned by Result().
type View interface {
// Update is called when one or more events are received. The first call will
// include _all_ events in the initial snapshot which may be an empty set.
// Subsequent calls will contain one or more update events in the order they
// are received.
Update(events []*pbsubscribe.Event) error
// Result returns the type-specific cache result based on the state. When no
// events have been delivered yet the result should be an empty value type
// suitable to return to clients in case there is an empty result on the
// servers. The index the materialized view represents is maintained
// separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining
// indexes seen during Update.
Result(index uint64) (interface{}, error)
// Reset the view to the zero state, done in preparation for receiving a new
// snapshot.
Reset()
}
// Materializer consumes the event stream, handling any framing events, and
// sends the events to View as they are received.
//
// Materializer is used as the cache.Result.State for a streaming
// cache type and manages the actual streaming RPC call to the servers behind
// the scenes until the cache result is discarded when TTL expires.
type Materializer struct {
deps Deps
retryWaiter *retry.Waiter
handler eventHandler
// lock protects the mutable state - all fields below it must only be accessed
// while holding lock.
lock sync.Mutex
index uint64
view View
updateCh chan struct{}
err error
}
type Deps struct {
View View
Client StreamClient
Logger hclog.Logger
Waiter *retry.Waiter
Request func(index uint64) pbsubscribe.SubscribeRequest
}
// StreamClient provides a subscription to state change events.
type StreamClient interface {
Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
}
// NewMaterializer returns a new Materializer. Run must be called to start it.
func NewMaterializer(deps Deps) *Materializer {
v := &Materializer{
deps: deps,
view: deps.View,
retryWaiter: deps.Waiter,
updateCh: make(chan struct{}),
}
return v
}
// Run receives events from the StreamClient and sends them to the View. It runs
// until ctx is cancelled, so it is expected to be run in a goroutine.
func (m *Materializer) Run(ctx context.Context) {
for {
req := m.deps.Request(m.index)
err := m.runSubscription(ctx, req)
if ctx.Err() != nil {
return
}
failures := m.retryWaiter.Failures()
if isNonTemporaryOrConsecutiveFailure(err, failures) {
m.lock.Lock()
m.notifyUpdateLocked(err)
m.lock.Unlock()
}
m.deps.Logger.Error("subscribe call failed",
"err", err,
"topic", req.Topic,
"key", req.Key,
"failure_count", failures+1)
if err := m.retryWaiter.Wait(ctx); err != nil {
return
}
}
}
// isNonTemporaryOrConsecutiveFailure returns true if the error is not a
// temporary error or if failures > 0.
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool {
// temporary is an interface used by net and other std lib packages to
// show error types represent temporary/recoverable errors.
temp, ok := err.(interface {
Temporary() bool
})
return !ok || !temp.Temporary() || failures > 0
}
// runSubscription opens a new subscribe streaming call to the servers and runs
// for it's lifetime or until the view is closed.
func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.SubscribeRequest) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
m.handler = initialHandler(req.Index)
s, err := m.deps.Client.Subscribe(ctx, &req)
if err != nil {
return err
}
for {
event, err := s.Recv()
switch {
case isGrpcStatus(err, codes.Aborted):
m.reset()
return resetErr("stream reset requested")
case err != nil:
return err
}
m.handler, err = m.handler(m, event)
if err != nil {
m.reset()
return err
}
}
}
func isGrpcStatus(err error, code codes.Code) bool {
s, ok := status.FromError(err)
return ok && s.Code() == code
}
// resetErr represents a server request to reset the subscription, it's typed so
// we can mark it as temporary and so attempt to retry first time without
// notifying clients.
type resetErr string
// Temporary Implements the internal Temporary interface
func (e resetErr) Temporary() bool {
return true
}
// Error implements error
func (e resetErr) Error() string {
return string(e)
}
// reset clears the state ready to start a new stream from scratch.
func (m *Materializer) reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.view.Reset()
m.index = 0
m.retryWaiter.Reset()
}
func (m *Materializer) updateView(events []*pbsubscribe.Event, index uint64) error {
m.lock.Lock()
defer m.lock.Unlock()
if err := m.view.Update(events); err != nil {
return err
}
m.index = index
m.notifyUpdateLocked(nil)
m.retryWaiter.Reset()
return nil
}
// notifyUpdateLocked closes the current update channel and recreates a new
// one. It must be called while holding the s.lock lock.
func (m *Materializer) notifyUpdateLocked(err error) {
m.err = err
close(m.updateCh)
m.updateCh = make(chan struct{})
}
// Fetch implements the logic a StreamingCacheType will need during it's Fetch
// call. Cache types that use streaming should just be able to proxy to this
// once they have a subscription object and return it's results directly.
func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) {
var result cache.FetchResult
// Get current view Result and index
m.lock.Lock()
index := m.index
val, err := m.view.Result(m.index)
updateCh := m.updateCh
m.lock.Unlock()
if err != nil {
return result, err
}
result.Index = index
result.Value = val
// If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan. Note it's opts.MinIndex that the cache is using here the
// request min index might be different and from initial user request.
if index > 0 && index > opts.MinIndex {
return result, nil
}
// Watch for timeout of the Fetch. Note it's opts.Timeout not req.Timeout
// since that is the timeout the client requested from the cache Get while the
// options one is the internal "background refresh" timeout which is what the
// Fetch call should be using.
timeoutCh := time.After(opts.Timeout)
for {
select {
case <-updateCh:
// View updated, return the new result
m.lock.Lock()
result.Index = m.index
// Grab the new updateCh in case we need to keep waiting for the next
// update.
updateCh = m.updateCh
fetchErr := m.err
if fetchErr == nil {
// Only generate a new result if there was no error to avoid pointless
// work potentially shuffling the same data around.
result.Value, err = m.view.Result(m.index)
}
m.lock.Unlock()
// If there was a non-transient error return it
if fetchErr != nil {
return result, fetchErr
}
if err != nil {
return result, err
}
// Sanity check the update is actually later than the one the user
// requested.
if result.Index <= opts.MinIndex {
// The result is still older/same as the requested index, continue to
// wait for further updates.
continue
}
// Return the updated result
return result, nil
case <-timeoutCh:
// Just return whatever we got originally, might still be empty
return result, nil
case <-done:
return result, context.Canceled
}
}
}