From 9b3c6da9df06237f53c62ae3d9eea21c2aeffebf Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 16 Feb 2021 11:54:51 -0500 Subject: [PATCH 1/2] stream: test the snapshot cache is saved correctly when the cache entry is created from resuming a stream. --- agent/consul/stream/event_publisher_test.go | 75 +++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 576d4ccc35..2967ef8d3d 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -392,6 +392,81 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin }) } +func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) { + req := &SubscribeRequest{ + Topic: testTopic, + Key: "sub-key", + Index: 1, + } + + nextEvent := Event{ + Topic: testTopic, + Index: 3, + Payload: simplePayload{key: "sub-key", value: "event-3"}, + } + + handlers := SnapshotHandlers{ + testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { + if req.Topic != testTopic { + return 0, fmt.Errorf("unexpected topic: %v", req.Topic) + } + buf.Append([]Event{testSnapshotEvent}) + buf.Append([]Event{nextEvent}) + return 3, nil + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher := NewEventPublisher(handlers, time.Second) + go publisher.Run(ctx) + // Include the same events in the topicBuffer + publisher.publishEvent([]Event{testSnapshotEvent}) + publisher.publishEvent([]Event{nextEvent}) + + runStep(t, "start a subscription and unsub", func(t *testing.T) { + sub, err := publisher.Subscribe(req) + require.NoError(t, err) + defer sub.Unsubscribe() + + eventCh := runSubscription(ctx, sub) + next := getNextEvent(t, eventCh) + require.True(t, next.IsNewSnapshotToFollow(), next) + + next = getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) + + next = getNextEvent(t, eventCh) + require.Equal(t, nextEvent, next) + + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot(), next) + require.Equal(t, uint64(3), next.Index) + }) + + publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) { + return 0, fmt.Errorf("error should not be seen, cache should have been used") + } + + runStep(t, "resume the subscription", func(t *testing.T) { + newReq := *req + newReq.Index = 0 + sub, err := publisher.Subscribe(&newReq) + require.NoError(t, err) + + eventCh := runSubscription(ctx, sub) + next := getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) + + next = getNextEvent(t, eventCh) + require.Equal(t, nextEvent, next) + + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot()) + }) +} + func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.Helper() if !t.Run(name, fn) { From ba3a1b95e13411eda2163178d46450d16cdca336 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 16 Feb 2021 12:20:19 -0500 Subject: [PATCH 2/2] stream: fix a snapshot cache bug Previously a snapshot created as part of a resumse-stream request could have incorrectly cached the newSnapshotToFollow event. This would cause clients to error because they received an unexpected framing event. --- .changelog/9772.txt | 4 +++ agent/consul/stream/event_publisher.go | 35 +++++++++++++------------- 2 files changed, 21 insertions(+), 18 deletions(-) create mode 100644 .changelog/9772.txt diff --git a/.changelog/9772.txt b/.changelog/9772.txt new file mode 100644 index 0000000000..6dd048f997 --- /dev/null +++ b/.changelog/9772.txt @@ -0,0 +1,4 @@ +```release-note:bug +streaming: fixes a bug caused by caching an incorrect snapshot, that would cause clients +to error until the cache expired. +``` diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 769e875d83..db9ceb6564 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -176,28 +176,27 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) } snapFromCache := e.getCachedSnapshotLocked(req) - if req.Index == 0 && snapFromCache != nil { + if snapFromCache == nil { + snap := newEventSnapshot() + snap.appendAndSplice(*req, handler, topicHead) + e.setCachedSnapshotLocked(req, snap) + snapFromCache = snap + } + + // If the request.Index is 0 the client has no view, send a full snapshot. + if req.Index == 0 { return e.subscriptions.add(req, snapFromCache.First), nil } - snap := newEventSnapshot() - // if the request has an Index the client view is stale and must be reset + // otherwise the request has an Index, the client view is stale and must be reset // with a NewSnapshotToFollow event. - if req.Index > 0 { - snap.buffer.Append([]Event{{ - Topic: req.Topic, - Payload: newSnapshotToFollow{}, - }}) - - if snapFromCache != nil { - snap.buffer.AppendItem(snapFromCache.First) - return e.subscriptions.add(req, snap.First), nil - } - } - - snap.appendAndSplice(*req, handler, topicHead) - e.setCachedSnapshotLocked(req, snap) - return e.subscriptions.add(req, snap.First), nil + result := newEventSnapshot() + result.buffer.Append([]Event{{ + Topic: req.Topic, + Payload: newSnapshotToFollow{}, + }}) + result.buffer.AppendItem(snapFromCache.First) + return e.subscriptions.add(req, result.First), nil } func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {