diff --git a/agent/consul/stream/event_buffer.go b/agent/consul/stream/event_buffer.go index eca2dbec10..b28d38a540 100644 --- a/agent/consul/stream/event_buffer.go +++ b/agent/consul/stream/event_buffer.go @@ -81,7 +81,7 @@ func (b *eventBuffer) Append(events []Event) { b.AppendItem(newBufferItem(events)) } -// AppendBuffer joins another buffer which may be the tail of a separate buffer +// AppendItem joins another buffer which may be the tail of a separate buffer // for example a buffer that's had the events from a snapshot appended may // finally by linked to the topic buffer for the subsequent events so // subscribers can seamlessly consume the updates. Note that Events in item must @@ -194,27 +194,19 @@ func (i *bufferItem) Next(ctx context.Context, closed <-chan struct{}) (*bufferI return next, nil } -// NextNoBlock returns the next item in the buffer without blocking. If it -// reaches the most recent item it will return nil. -func (i *bufferItem) NextNoBlock() *bufferItem { +// NextNoBlock returns true and the next item in the buffer without blocking. +// If there are no subsequent items it returns false and an empty item. +// The empty item will be ignored by subscribers, but has the same link as this +// bufferItem without the Events. +// When the link.ch of the empty item is closed, subscriptions are notified of +// the next item. +func (i *bufferItem) NextNoBlock() (*bufferItem, bool) { nextRaw := i.link.next.Load() if nextRaw == nil { - return nil - } - return nextRaw.(*bufferItem) -} - -// NextLink returns either the next item in the buffer if there is one, or -// an empty item (that will be ignored by subscribers) that has a pointer to -// the same link as this bufferItem (but none of the bufferItem content). -// When the link.ch is closed, subscriptions will be notified of the next item. -func (i *bufferItem) NextLink() *bufferItem { - next := i.NextNoBlock() - if next == nil { // Return an empty item that can be followed to the next item published. - return &bufferItem{link: i.link} + return &bufferItem{link: i.link}, false } - return next + return nextRaw.(*bufferItem), true } // HasEventIndex returns true if index matches the Event.Index of this item. Returns diff --git a/agent/consul/stream/event_buffer_test.go b/agent/consul/stream/event_buffer_test.go index d635369112..c9f737a7c9 100644 --- a/agent/consul/stream/event_buffer_test.go +++ b/agent/consul/stream/event_buffer_test.go @@ -17,10 +17,6 @@ func TestEventBufferFuzz(t *testing.T) { t.Skip("too slow for testing.Short") } - if testing.Short() { - t.Skip("too slow for short run") - } - nReaders := 1000 nMessages := 1000 diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index db9ceb6564..bfa3858b96 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -171,7 +171,8 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) subscriptionHead := buf.Head() // splice the rest of the topic buffer onto the subscription buffer so // the subscription will receive new events. - buf.AppendItem(topicHead.NextLink()) + next, _ := topicHead.NextNoBlock() + buf.AppendItem(next) return e.subscriptions.add(req, subscriptionHead), nil } diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index f85c3f9e27..c031c8c21f 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -67,12 +67,12 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u return } - next := item.NextNoBlock() - if next == nil { + next, ok := item.NextNoBlock() + if !ok { // We reached the head of the topic buffer. We don't want any of the // events in the topic buffer as they came before the snapshot. // Append a link to any future items. - s.buffer.AppendItem(item.NextLink()) + s.buffer.AppendItem(next) return } // Proceed to the next item in the topic buffer diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go index 7c8329e406..5e3b1c6d1a 100644 --- a/agent/consul/stream/event_snapshot_test.go +++ b/agent/consul/stream/event_snapshot_test.go @@ -95,7 +95,7 @@ func TestEventSnapshot(t *testing.T) { // be appended before the snapshot fn executes in another goroutine since // it's operating an a possible stale "snapshot". This is the same as // reality with the state store where updates that occur after the - // snapshot is taken but while the SnapFnis still running must be captured + // snapshot is taken but while the SnapFn is still running must be captured // correctly. for i := 0; i < tc.updatesAfterSnap; i++ { index := snapIndex + 1 + uint64(i)