mirror of https://github.com/status-im/consul.git
Merge pull request #9772 from hashicorp/streamin-fix-bad-cached-snapshot
streaming: fix snapshot cache bug
This commit is contained in:
parent
a8de5f6e00
commit
5a8fc428bd
|
@ -0,0 +1,4 @@
|
|||
```release-note:bug
|
||||
streaming: fixes a bug caused by caching an incorrect snapshot, that would cause clients
|
||||
to error until the cache expired.
|
||||
```
|
|
@ -176,28 +176,27 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
|
|||
}
|
||||
|
||||
snapFromCache := e.getCachedSnapshotLocked(req)
|
||||
if req.Index == 0 && snapFromCache != nil {
|
||||
if snapFromCache == nil {
|
||||
snap := newEventSnapshot()
|
||||
snap.appendAndSplice(*req, handler, topicHead)
|
||||
e.setCachedSnapshotLocked(req, snap)
|
||||
snapFromCache = snap
|
||||
}
|
||||
|
||||
// If the request.Index is 0 the client has no view, send a full snapshot.
|
||||
if req.Index == 0 {
|
||||
return e.subscriptions.add(req, snapFromCache.First), nil
|
||||
}
|
||||
snap := newEventSnapshot()
|
||||
|
||||
// if the request has an Index the client view is stale and must be reset
|
||||
// otherwise the request has an Index, the client view is stale and must be reset
|
||||
// with a NewSnapshotToFollow event.
|
||||
if req.Index > 0 {
|
||||
snap.buffer.Append([]Event{{
|
||||
Topic: req.Topic,
|
||||
Payload: newSnapshotToFollow{},
|
||||
}})
|
||||
|
||||
if snapFromCache != nil {
|
||||
snap.buffer.AppendItem(snapFromCache.First)
|
||||
return e.subscriptions.add(req, snap.First), nil
|
||||
}
|
||||
}
|
||||
|
||||
snap.appendAndSplice(*req, handler, topicHead)
|
||||
e.setCachedSnapshotLocked(req, snap)
|
||||
return e.subscriptions.add(req, snap.First), nil
|
||||
result := newEventSnapshot()
|
||||
result.buffer.Append([]Event{{
|
||||
Topic: req.Topic,
|
||||
Payload: newSnapshotToFollow{},
|
||||
}})
|
||||
result.buffer.AppendItem(snapFromCache.First)
|
||||
return e.subscriptions.add(req, result.First), nil
|
||||
}
|
||||
|
||||
func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
|
||||
|
|
|
@ -392,6 +392,81 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
|
|||
})
|
||||
}
|
||||
|
||||
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) {
|
||||
req := &SubscribeRequest{
|
||||
Topic: testTopic,
|
||||
Key: "sub-key",
|
||||
Index: 1,
|
||||
}
|
||||
|
||||
nextEvent := Event{
|
||||
Topic: testTopic,
|
||||
Index: 3,
|
||||
Payload: simplePayload{key: "sub-key", value: "event-3"},
|
||||
}
|
||||
|
||||
handlers := SnapshotHandlers{
|
||||
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||
if req.Topic != testTopic {
|
||||
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
|
||||
}
|
||||
buf.Append([]Event{testSnapshotEvent})
|
||||
buf.Append([]Event{nextEvent})
|
||||
return 3, nil
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
publisher := NewEventPublisher(handlers, time.Second)
|
||||
go publisher.Run(ctx)
|
||||
// Include the same events in the topicBuffer
|
||||
publisher.publishEvent([]Event{testSnapshotEvent})
|
||||
publisher.publishEvent([]Event{nextEvent})
|
||||
|
||||
runStep(t, "start a subscription and unsub", func(t *testing.T) {
|
||||
sub, err := publisher.Subscribe(req)
|
||||
require.NoError(t, err)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
eventCh := runSubscription(ctx, sub)
|
||||
next := getNextEvent(t, eventCh)
|
||||
require.True(t, next.IsNewSnapshotToFollow(), next)
|
||||
|
||||
next = getNextEvent(t, eventCh)
|
||||
require.Equal(t, testSnapshotEvent, next)
|
||||
|
||||
next = getNextEvent(t, eventCh)
|
||||
require.Equal(t, nextEvent, next)
|
||||
|
||||
next = getNextEvent(t, eventCh)
|
||||
require.True(t, next.IsEndOfSnapshot(), next)
|
||||
require.Equal(t, uint64(3), next.Index)
|
||||
})
|
||||
|
||||
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
|
||||
return 0, fmt.Errorf("error should not be seen, cache should have been used")
|
||||
}
|
||||
|
||||
runStep(t, "resume the subscription", func(t *testing.T) {
|
||||
newReq := *req
|
||||
newReq.Index = 0
|
||||
sub, err := publisher.Subscribe(&newReq)
|
||||
require.NoError(t, err)
|
||||
|
||||
eventCh := runSubscription(ctx, sub)
|
||||
next := getNextEvent(t, eventCh)
|
||||
require.Equal(t, testSnapshotEvent, next)
|
||||
|
||||
next = getNextEvent(t, eventCh)
|
||||
require.Equal(t, nextEvent, next)
|
||||
|
||||
next = getNextEvent(t, eventCh)
|
||||
require.True(t, next.IsEndOfSnapshot())
|
||||
})
|
||||
}
|
||||
|
||||
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||
t.Helper()
|
||||
if !t.Run(name, fn) {
|
||||
|
|
Loading…
Reference in New Issue