mirror of https://github.com/status-im/consul.git
Merge pull request #10355 from hashicorp/dnephin/stream-fix-topic-head-bug
stream: fix a bug with subscriptions and the latest item in the topic buffers.
This commit is contained in:
commit
191f49a3cf
|
@ -81,7 +81,7 @@ func (b *eventBuffer) Append(events []Event) {
|
||||||
b.AppendItem(newBufferItem(events))
|
b.AppendItem(newBufferItem(events))
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendBuffer joins another buffer which may be the tail of a separate buffer
|
// AppendItem joins another buffer which may be the tail of a separate buffer
|
||||||
// for example a buffer that's had the events from a snapshot appended may
|
// for example a buffer that's had the events from a snapshot appended may
|
||||||
// finally by linked to the topic buffer for the subsequent events so
|
// finally by linked to the topic buffer for the subsequent events so
|
||||||
// subscribers can seamlessly consume the updates. Note that Events in item must
|
// subscribers can seamlessly consume the updates. Note that Events in item must
|
||||||
|
@ -194,27 +194,19 @@ func (i *bufferItem) Next(ctx context.Context, closed <-chan struct{}) (*bufferI
|
||||||
return next, nil
|
return next, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextNoBlock returns the next item in the buffer without blocking. If it
|
// NextNoBlock returns true and the next item in the buffer without blocking.
|
||||||
// reaches the most recent item it will return nil.
|
// If there are no subsequent items it returns false and an empty item.
|
||||||
func (i *bufferItem) NextNoBlock() *bufferItem {
|
// The empty item will be ignored by subscribers, but has the same link as this
|
||||||
|
// bufferItem without the Events.
|
||||||
|
// When the link.ch of the empty item is closed, subscriptions are notified of
|
||||||
|
// the next item.
|
||||||
|
func (i *bufferItem) NextNoBlock() (*bufferItem, bool) {
|
||||||
nextRaw := i.link.next.Load()
|
nextRaw := i.link.next.Load()
|
||||||
if nextRaw == nil {
|
if nextRaw == nil {
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return nextRaw.(*bufferItem)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NextLink returns either the next item in the buffer if there is one, or
|
|
||||||
// an empty item (that will be ignored by subscribers) that has a pointer to
|
|
||||||
// the same link as this bufferItem (but none of the bufferItem content).
|
|
||||||
// When the link.ch is closed, subscriptions will be notified of the next item.
|
|
||||||
func (i *bufferItem) NextLink() *bufferItem {
|
|
||||||
next := i.NextNoBlock()
|
|
||||||
if next == nil {
|
|
||||||
// Return an empty item that can be followed to the next item published.
|
// Return an empty item that can be followed to the next item published.
|
||||||
return &bufferItem{link: i.link}
|
return &bufferItem{link: i.link}, false
|
||||||
}
|
}
|
||||||
return next
|
return nextRaw.(*bufferItem), true
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasEventIndex returns true if index matches the Event.Index of this item. Returns
|
// HasEventIndex returns true if index matches the Event.Index of this item. Returns
|
||||||
|
|
|
@ -17,10 +17,6 @@ func TestEventBufferFuzz(t *testing.T) {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
}
|
}
|
||||||
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("too slow for short run")
|
|
||||||
}
|
|
||||||
|
|
||||||
nReaders := 1000
|
nReaders := 1000
|
||||||
nMessages := 1000
|
nMessages := 1000
|
||||||
|
|
||||||
|
|
|
@ -171,7 +171,8 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
|
||||||
subscriptionHead := buf.Head()
|
subscriptionHead := buf.Head()
|
||||||
// splice the rest of the topic buffer onto the subscription buffer so
|
// splice the rest of the topic buffer onto the subscription buffer so
|
||||||
// the subscription will receive new events.
|
// the subscription will receive new events.
|
||||||
buf.AppendItem(topicHead.NextLink())
|
next, _ := topicHead.NextNoBlock()
|
||||||
|
buf.AppendItem(next)
|
||||||
return e.subscriptions.add(req, subscriptionHead), nil
|
return e.subscriptions.add(req, subscriptionHead), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,34 +49,33 @@ func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, t
|
||||||
func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) {
|
func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) {
|
||||||
item := topicBufferHead
|
item := topicBufferHead
|
||||||
for {
|
for {
|
||||||
next := item.NextNoBlock()
|
|
||||||
switch {
|
switch {
|
||||||
case next == nil:
|
case item.Err != nil:
|
||||||
// This is the head of the topic buffer (or was just now which is after
|
|
||||||
// the snapshot completed). We don't want any of the events (if any) in
|
|
||||||
// the snapshot buffer as they came before the snapshot but we do need to
|
|
||||||
// wait for the next update.
|
|
||||||
s.buffer.AppendItem(item.NextLink())
|
|
||||||
return
|
|
||||||
|
|
||||||
case next.Err != nil:
|
|
||||||
// This case is not currently possible because errors can only come
|
// This case is not currently possible because errors can only come
|
||||||
// from a snapshot func, and this is consuming events from a topic
|
// from a snapshot func, and this is consuming events from a topic
|
||||||
// buffer which does not contain a snapshot.
|
// buffer which does not contain a snapshot.
|
||||||
// Handle this case anyway in case errors can come from other places
|
// Handle this case anyway in case errors can come from other places
|
||||||
// in the future.
|
// in the future.
|
||||||
s.buffer.AppendItem(next)
|
s.buffer.AppendItem(item)
|
||||||
return
|
return
|
||||||
|
|
||||||
case len(next.Events) > 0 && next.Events[0].Index > idx:
|
case len(item.Events) > 0 && item.Events[0].Index > idx:
|
||||||
// We've found an update in the topic buffer that happened after our
|
// We've found an update in the topic buffer that happened after our
|
||||||
// snapshot was taken, splice it into the snapshot buffer so subscribers
|
// snapshot was taken, splice it into the snapshot buffer so subscribers
|
||||||
// can continue to read this and others after it.
|
// can continue to read this and others after it.
|
||||||
s.buffer.AppendItem(next)
|
s.buffer.AppendItem(item)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't need this item, continue to next
|
next, ok := item.NextNoBlock()
|
||||||
|
if !ok {
|
||||||
|
// We reached the head of the topic buffer. We don't want any of the
|
||||||
|
// events in the topic buffer as they came before the snapshot.
|
||||||
|
// Append a link to any future items.
|
||||||
|
s.buffer.AppendItem(next)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Proceed to the next item in the topic buffer
|
||||||
item = next
|
item = next
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,7 +95,7 @@ func TestEventSnapshot(t *testing.T) {
|
||||||
// be appended before the snapshot fn executes in another goroutine since
|
// be appended before the snapshot fn executes in another goroutine since
|
||||||
// it's operating an a possible stale "snapshot". This is the same as
|
// it's operating an a possible stale "snapshot". This is the same as
|
||||||
// reality with the state store where updates that occur after the
|
// reality with the state store where updates that occur after the
|
||||||
// snapshot is taken but while the SnapFnis still running must be captured
|
// snapshot is taken but while the SnapFn is still running must be captured
|
||||||
// correctly.
|
// correctly.
|
||||||
for i := 0; i < tc.updatesAfterSnap; i++ {
|
for i := 0; i < tc.updatesAfterSnap; i++ {
|
||||||
index := snapIndex + 1 + uint64(i)
|
index := snapIndex + 1 + uint64(i)
|
||||||
|
|
Loading…
Reference in New Issue