diff --git a/agent/cache-types/streaming_events_test.go b/agent/cache-types/streaming_events_test.go index f1fe9ce7a1..05bc3649fd 100644 --- a/agent/cache-types/streaming_events_test.go +++ b/agent/cache-types/streaming_events_test.go @@ -1,7 +1,7 @@ package cachetype import ( - fmt "fmt" + "fmt" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbservice" @@ -17,10 +17,9 @@ func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.E } } -func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event { +func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic) *pbsubscribe.Event { return &pbsubscribe.Event{ Topic: topic, - Index: index, Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, } } @@ -32,12 +31,12 @@ func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic, index uint64) *pbsubsc // need that. nodeNum should be less than 64k to make the IP address look // realistic. Any other changes can be made on the returned event to avoid // adding too many options to callers. -func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) pbsubscribe.Event { +func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { node := fmt.Sprintf("node%d", nodeNum) nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) - return pbsubscribe.Event{ + return &pbsubscribe.Event{ Topic: pbsubscribe.Topic_ServiceHealth, Key: svc, Index: index, @@ -114,10 +113,10 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) pbsubs // need that. nodeNum should be less than 64k to make the IP address look // realistic. Any other changes can be made on the returned event to avoid // adding too many options to callers. -func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) pbsubscribe.Event { +func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { node := fmt.Sprintf("node%d", nodeNum) - return pbsubscribe.Event{ + return &pbsubscribe.Event{ Topic: pbsubscribe.Topic_ServiceHealth, Key: svc, Index: index, @@ -158,13 +157,13 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) pbsu } } -func newEventBatchWithEvents(first pbsubscribe.Event, evs ...pbsubscribe.Event) pbsubscribe.Event { +func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event { events := make([]*pbsubscribe.Event, len(evs)+1) - events[0] = &first + events[0] = first for i := range evs { - events[i+1] = &evs[i] + events[i+1] = evs[i] } - return pbsubscribe.Event{ + return &pbsubscribe.Event{ Topic: first.Topic, Index: first.Index, Payload: &pbsubscribe.Event_EventBatch{ diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index addbbc0593..1b1293d70c 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -8,12 +8,10 @@ import ( "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" - "github.com/hashicorp/consul/agent/submatview" - - "github.com/hashicorp/consul/lib/retry" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/submatview" + "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" ) @@ -44,8 +42,7 @@ type MaterializerDeps struct { // Fetch implements cache.Type func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { if opts.LastResult != nil && opts.LastResult.State != nil { - state := opts.LastResult.State.(*streamingHealthState) - return state.materializer.Fetch(state.done, opts) + return opts.LastResult.State.(*streamingHealthState).Fetch(opts) } srvReq := req.(*structs.ServiceSpecificRequest) @@ -63,57 +60,64 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque return req } - m, err := newMaterializer(c.deps, newReqFn, srvReq.Filter) + materializer, err := newMaterializer(c.deps, newReqFn, srvReq.Filter) if err != nil { return cache.FetchResult{}, err } ctx, cancel := context.WithCancel(context.TODO()) - go m.Run(ctx) + go materializer.Run(ctx) - result, err := m.Fetch(ctx.Done(), opts) - result.State = &streamingHealthState{ - materializer: m, + state := &streamingHealthState{ + materializer: materializer, done: ctx.Done(), cancel: cancel, } - return result, err + return state.Fetch(opts) } func newMaterializer( - d MaterializerDeps, - r func(uint64) pbsubscribe.SubscribeRequest, + deps MaterializerDeps, + newRequestFn func(uint64) pbsubscribe.SubscribeRequest, filter string, ) (*submatview.Materializer, error) { - view, err := newHealthViewState(filter) + view, err := newHealthView(filter) if err != nil { return nil, err } return submatview.NewMaterializer(submatview.Deps{ View: view, - Client: d.Client, - Logger: d.Logger, + Client: deps.Client, + Logger: deps.Logger, Waiter: &retry.Waiter{ MinFailures: 1, MinWait: 0, MaxWait: 60 * time.Second, Jitter: retry.NewJitter(100), }, - Request: r, + Request: newRequestFn, }), nil } +// streamingHealthState wraps a Materializer to manage its lifecycle, and to +// add itself to the FetchResult.State. type streamingHealthState struct { materializer *submatview.Materializer done <-chan struct{} cancel func() } -func (c *streamingHealthState) Close() error { - c.cancel() +func (s *streamingHealthState) Close() error { + s.cancel() return nil } -func newHealthViewState(filterExpr string) (submatview.View, error) { +func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) { + result, err := s.materializer.Fetch(s.done, opts) + result.State = s + return result, err +} + +func newHealthView(filterExpr string) (*healthView, error) { s := &healthView{state: make(map[string]structs.CheckServiceNode)} // We apply filtering to the raw CheckServiceNodes before we are done mutating @@ -131,8 +135,7 @@ func newHealthViewState(filterExpr string) (submatview.View, error) { // (IndexedCheckServiceNodes) and update it in place for each event - that // involves re-sorting each time etc. though. type healthView struct { - state map[string]structs.CheckServiceNode - // TODO: test case with filter + state map[string]structs.CheckServiceNode filter *bexpr.Filter } @@ -155,7 +158,6 @@ func (s *healthView) Update(events []*pbsubscribe.Event) error { delete(s.state, id) } } - // TODO: replace with a no-op filter instead of a conditional if s.filter != nil { filtered, err := s.filter.Execute(s.state) if err != nil { @@ -166,16 +168,17 @@ func (s *healthView) Update(events []*pbsubscribe.Event) error { return nil } -// Result implements View +// Result returns the structs.IndexedCheckServiceNodes stored by this view. func (s *healthView) Result(index uint64) (interface{}, error) { - var result structs.IndexedCheckServiceNodes - // Avoid a nil slice if there are no results in the view - // TODO: why this ^ - result.Nodes = structs.CheckServiceNodes{} + result := structs.IndexedCheckServiceNodes{ + Nodes: make(structs.CheckServiceNodes, 0, len(s.state)), + QueryMeta: structs.QueryMeta{ + Index: index, + }, + } for _, node := range s.state { result.Nodes = append(result.Nodes, node) } - result.Index = index return &result, nil } diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index 2eae5c93a9..d88b38dd3c 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -80,10 +80,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { start := time.Now() go func() { time.Sleep(200 * time.Millisecond) - - // Then a service registers - healthEv := newEventServiceHealthRegister(4, 1, "web") - client.QueueEvents(&healthEv) + client.QueueEvents(newEventServiceHealthRegister(4, 1, "web")) }() opts.Timeout = time.Second @@ -103,15 +100,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { opts.LastResult = &result }) - runStep(t, "reconnects and resumes after transient stream error", func(t *testing.T) { - // Use resetErr just because it's "temporary" this is a stand in for any - // network error that uses that same interface though. + runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) { client.QueueErr(tempError("broken pipe")) - // After the error the view should re-subscribe with same index so will get - // a "resume stream". - client.QueueEvents(newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex)) - // Next fetch will continue to block until timeout and receive the same // result. start := time.Now() @@ -129,8 +120,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { opts.LastResult = &result // But an update should still be noticed due to reconnection - healthEv := newEventServiceHealthRegister(10, 2, "web") - client.QueueEvents(&healthEv) + client.QueueEvents(newEventServiceHealthRegister(10, 2, "web")) start = time.Now() opts.Timeout = time.Second @@ -153,10 +143,6 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { go func() { time.Sleep(200 * time.Millisecond) client.QueueErr(errors.New("invalid request")) - - // After the error the view should re-subscribe with same index so will get - // a "resume stream". - client.QueueEvents(newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex)) }() // Next fetch should return the error @@ -182,8 +168,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { opts.LastResult = &result // But an update should still be noticed due to reconnection - healthEv := newEventServiceHealthRegister(opts.MinIndex+5, 3, "web") - client.QueueEvents(&healthEv) + client.QueueEvents(newEventServiceHealthRegister(opts.MinIndex+5, 3, "web")) opts.Timeout = time.Second result, err = typ.Fetch(opts, req) @@ -238,14 +223,13 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { }} // Create an initial snapshot of 3 instances on different nodes - makeReg := func(index uint64, nodeNum int) *pbsubscribe.Event { - e := newEventServiceHealthRegister(index, nodeNum, "web") - return &e + registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event { + return newEventServiceHealthRegister(index, nodeNum, "web") } client.QueueEvents( - makeReg(5, 1), - makeReg(5, 2), - makeReg(5, 3), + registerServiceWeb(5, 1), + registerServiceWeb(5, 2), + registerServiceWeb(5, 3), newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) // This contains the view state so important we share it between calls. @@ -288,8 +272,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { time.Sleep(200 * time.Millisecond) // Deregister instance on node1 - healthEv := newEventServiceHealthDeregister(20, 1, "web") - client.QueueEvents(&healthEv) + client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web")) }() opts.Timeout = time.Second @@ -315,9 +298,9 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { client.QueueErr(status.Error(codes.Aborted, "reset by server")) client.QueueEvents( - makeReg(50, 3), // overlap existing node - makeReg(50, 4), - makeReg(50, 5), + registerServiceWeb(50, 3), // overlap existing node + registerServiceWeb(50, 4), + registerServiceWeb(50, 5), newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50)) // Make another blocking query with THE SAME index. It should immediately @@ -336,6 +319,29 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { opts.MinIndex = result.Index opts.LastResult = &result }) + + runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) { + client.QueueErr(tempError("temporary connection error")) + + client.QueueEvents( + newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth), + registerServiceWeb(50, 3), // overlap existing node + registerServiceWeb(50, 4), + registerServiceWeb(50, 5), + newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50)) + + start := time.Now() + opts.MinIndex = 49 + opts.Timeout = time.Second + result, err := typ.Fetch(opts, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed < time.Second, + "Fetch should have returned before the timeout") + + require.Equal(t, uint64(50), result.Index) + require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value)) + }) } func TestStreamingHealthServices_EventBatches(t *testing.T) { @@ -351,7 +357,7 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { newEventServiceHealthRegister(5, 2, "web"), newEventServiceHealthRegister(5, 3, "web")) client.QueueEvents( - &batchEv, + batchEv, newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) // This contains the view state so important we share it between calls. @@ -394,7 +400,7 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { // Register another newEventServiceHealthRegister(20, 4, "web"), ) - client.QueueEvents(&batchEv) + client.QueueEvents(batchEv) opts.Timeout = time.Second result, err := typ.Fetch(opts, req) require.NoError(t, err) @@ -421,7 +427,7 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { newEventServiceHealthRegister(5, 2, "web"), newEventServiceHealthRegister(5, 3, "web")) client.QueueEvents( - &batchEv, + batchEv, newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) // This contains the view state so important we share it between calls. @@ -451,8 +457,7 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(5), result.Index) - require.ElementsMatch(t, []string{"node2"}, - gatherNodes(result.Value)) + require.Equal(t, []string{"node2"}, gatherNodes(result.Value)) opts.MinIndex = result.Index opts.LastResult = &result @@ -467,14 +472,13 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { // Register another newEventServiceHealthRegister(20, 4, "web"), ) - client.QueueEvents(&batchEv) + client.QueueEvents(batchEv) opts.Timeout = time.Second result, err := typ.Fetch(opts, req) require.NoError(t, err) require.Equal(t, uint64(20), result.Index) - require.ElementsMatch(t, []string{"node2"}, - gatherNodes(result.Value)) + require.Equal(t, []string{"node2"}, gatherNodes(result.Value)) opts.MinIndex = result.Index opts.LastResult = &result diff --git a/agent/cache-types/streaming_test.go b/agent/cache-types/streaming_test.go index 929420e94d..09721817a0 100644 --- a/agent/cache-types/streaming_test.go +++ b/agent/cache-types/streaming_test.go @@ -4,7 +4,6 @@ import ( "context" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" "github.com/hashicorp/consul/proto/pbsubscribe" ) @@ -12,6 +11,7 @@ import ( // TestStreamingClient is a mock StreamingClient for testing that allows // for queueing up custom events to a subscriber. type TestStreamingClient struct { + pbsubscribe.StateChangeSubscription_SubscribeClient events chan eventOrErr ctx context.Context } @@ -57,15 +57,3 @@ func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) { return nil, t.ctx.Err() } } - -func (t *TestStreamingClient) Header() (metadata.MD, error) { return nil, nil } - -func (t *TestStreamingClient) Trailer() metadata.MD { return nil } - -func (t *TestStreamingClient) CloseSend() error { return nil } - -func (t *TestStreamingClient) Context() context.Context { return nil } - -func (t *TestStreamingClient) SendMsg(m interface{}) error { return nil } - -func (t *TestStreamingClient) RecvMsg(m interface{}) error { return nil } diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 661dd756b6..0852393af4 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -18,15 +18,16 @@ import ( "container/heap" "context" "fmt" - io "io" + "io" "strconv" "sync" "sync/atomic" "time" "github.com/armon/go-metrics" - "github.com/hashicorp/consul/lib" "golang.org/x/time/rate" + + "github.com/hashicorp/consul/lib" ) //go:generate mockery -all -inpkg diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 782e0b9f2b..8832572c31 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -10,11 +10,12 @@ import ( "testing" "time" - "github.com/hashicorp/consul/sdk/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "golang.org/x/time/rate" + + "github.com/hashicorp/consul/sdk/testutil" ) // Test a basic Get with no indexes (and therefore no blocking queries). @@ -841,15 +842,6 @@ func TestCacheGet_expireResetGet(t *testing.T) { typ.AssertExpectations(t) } -type testCloser struct { - closed bool -} - -func (t *testCloser) Close() error { - t.closed = true - return nil -} - // Test that entries with state that satisfies io.Closer get cleaned up func TestCacheGet_expireClose(t *testing.T) { t.Parallel() @@ -859,6 +851,7 @@ func TestCacheGet_expireClose(t *testing.T) { typ := &MockType{} defer typ.AssertExpectations(t) c := New(Options{}) + defer c.Close() typ.On("RegisterOptions").Return(RegisterOptions{ SupportsBlocking: true, LastGetTTL: 100 * time.Millisecond, @@ -877,13 +870,26 @@ func TestCacheGet_expireClose(t *testing.T) { require.NoError(err) require.Equal(42, result) require.False(meta.Hit) - require.False(state.closed) + require.False(state.isClosed()) // Sleep for the expiry time.Sleep(200 * time.Millisecond) // state.Close() should have been called - require.True(state.closed) + require.True(state.isClosed()) +} + +type testCloser struct { + closed uint32 +} + +func (t *testCloser) Close() error { + atomic.SwapUint32(&t.closed, 1) + return nil +} + +func (t *testCloser) isClosed() bool { + return atomic.LoadUint32(&t.closed) == 1 } // Test a Get with a request that returns the same cache key across diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 9512a04b5e..6109ce2d9e 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -61,6 +61,7 @@ type SnapshotHandlers map[Topic]SnapshotFunc // SnapshotFunc builds a snapshot for the subscription request, and appends the // events to the Snapshot using SnapshotAppender. +// If err is not nil the SnapshotFunc must return a non-zero index. type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error) // SnapshotAppender appends groups of events to create a Snapshot of state. diff --git a/agent/submatview/handler.go b/agent/submatview/handler.go index 7484e4dd33..900a085de8 100644 --- a/agent/submatview/handler.go +++ b/agent/submatview/handler.go @@ -1,8 +1,16 @@ package submatview -import "github.com/hashicorp/consul/proto/pbsubscribe" +import ( + "github.com/hashicorp/consul/proto/pbsubscribe" +) -type eventHandler func(events *pbsubscribe.Event) (eventHandler, error) +// eventHandler is a function which performs some operation on the received +// events, then returns the eventHandler that should be used for the next set +// of events. +// If eventHandler fails to handle the events it may return an error. If an +// error is returned the next eventHandler will be ignored. +// eventHandler is used to implement a very simple finite-state machine. +type eventHandler func(events *pbsubscribe.Event) (next eventHandler, err error) func (m *Materializer) initialHandler(index uint64) eventHandler { if index == 0 { diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 090793a1db..3a6d3d1753 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -65,7 +65,6 @@ type Deps struct { Logger hclog.Logger Waiter *retry.Waiter Request func(index uint64) pbsubscribe.SubscribeRequest - Stop func() } // StreamClient provides a subscription to state change events. @@ -94,37 +93,34 @@ func (m *Materializer) Run(ctx context.Context) { return } - m.lock.Lock() - // TODO: move this into a func - // If this is a temporary error and it's the first consecutive failure, - // retry to see if we can get a result without erroring back to clients. - // If it's non-temporary or a repeated failure return to clients while we - // retry to get back in a good state. - if _, ok := err.(temporary); !ok || m.retryWaiter.Failures() > 0 { - m.notifyUpdateLocked(err) - } - waitCh := m.retryWaiter.Failed() failures := m.retryWaiter.Failures() - m.lock.Unlock() + if isNonTemporaryOrConsecutiveFailure(err, failures) { + m.lock.Lock() + m.notifyUpdateLocked(err) + m.lock.Unlock() + } m.deps.Logger.Error("subscribe call failed", "err", err, "topic", req.Topic, "key", req.Key, - "failure_count", failures) + "failure_count", failures+1) - select { - case <-ctx.Done(): + if err := m.retryWaiter.Wait(ctx); err != nil { return - case <-waitCh: } } } -// temporary is a private interface as used by net and other std lib packages to -// show error types represent temporary/recoverable errors. -type temporary interface { - Temporary() bool +// isNonTemporaryOrConsecutiveFailure returns true if the error is a temporary error +// or if the failures > 0. +func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool { + // temporary is an interface used by net and other std lib packages to + // show error types represent temporary/recoverable errors. + _, ok := err.(interface { + Temporary() bool + }) + return !ok || failures > 0 } // runSubscription opens a new subscribe streaming call to the servers and runs