stream: remove bufferItem.NextLink

Both NextLink and NextNoBlock had the same logic, with slightly
different return values. By adding a bool return value (similar to map
lookups) we can remove the duplicate method.
This commit is contained in:
Daniel Nephin 2021-06-04 18:29:04 -04:00
parent 5ef8a045f3
commit eb0c0d7740
5 changed files with 16 additions and 27 deletions

View File

@ -81,7 +81,7 @@ func (b *eventBuffer) Append(events []Event) {
b.AppendItem(newBufferItem(events)) b.AppendItem(newBufferItem(events))
} }
// AppendBuffer joins another buffer which may be the tail of a separate buffer // AppendItem joins another buffer which may be the tail of a separate buffer
// for example a buffer that's had the events from a snapshot appended may // for example a buffer that's had the events from a snapshot appended may
// finally by linked to the topic buffer for the subsequent events so // finally by linked to the topic buffer for the subsequent events so
// subscribers can seamlessly consume the updates. Note that Events in item must // subscribers can seamlessly consume the updates. Note that Events in item must
@ -194,27 +194,19 @@ func (i *bufferItem) Next(ctx context.Context, closed <-chan struct{}) (*bufferI
return next, nil return next, nil
} }
// NextNoBlock returns the next item in the buffer without blocking. If it // NextNoBlock returns true and the next item in the buffer without blocking.
// reaches the most recent item it will return nil. // If there are no subsequent items it returns false and an empty item.
func (i *bufferItem) NextNoBlock() *bufferItem { // The empty item will be ignored by subscribers, but has the same link as this
// bufferItem without the Events.
// When the link.ch of the empty item is closed, subscriptions are notified of
// the next item.
func (i *bufferItem) NextNoBlock() (*bufferItem, bool) {
nextRaw := i.link.next.Load() nextRaw := i.link.next.Load()
if nextRaw == nil { if nextRaw == nil {
return nil
}
return nextRaw.(*bufferItem)
}
// NextLink returns either the next item in the buffer if there is one, or
// an empty item (that will be ignored by subscribers) that has a pointer to
// the same link as this bufferItem (but none of the bufferItem content).
// When the link.ch is closed, subscriptions will be notified of the next item.
func (i *bufferItem) NextLink() *bufferItem {
next := i.NextNoBlock()
if next == nil {
// Return an empty item that can be followed to the next item published. // Return an empty item that can be followed to the next item published.
return &bufferItem{link: i.link} return &bufferItem{link: i.link}, false
} }
return next return nextRaw.(*bufferItem), true
} }
// HasEventIndex returns true if index matches the Event.Index of this item. Returns // HasEventIndex returns true if index matches the Event.Index of this item. Returns

View File

@ -17,10 +17,6 @@ func TestEventBufferFuzz(t *testing.T) {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
if testing.Short() {
t.Skip("too slow for short run")
}
nReaders := 1000 nReaders := 1000
nMessages := 1000 nMessages := 1000

View File

@ -171,7 +171,8 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
subscriptionHead := buf.Head() subscriptionHead := buf.Head()
// splice the rest of the topic buffer onto the subscription buffer so // splice the rest of the topic buffer onto the subscription buffer so
// the subscription will receive new events. // the subscription will receive new events.
buf.AppendItem(topicHead.NextLink()) next, _ := topicHead.NextNoBlock()
buf.AppendItem(next)
return e.subscriptions.add(req, subscriptionHead), nil return e.subscriptions.add(req, subscriptionHead), nil
} }

View File

@ -67,12 +67,12 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
return return
} }
next := item.NextNoBlock() next, ok := item.NextNoBlock()
if next == nil { if !ok {
// We reached the head of the topic buffer. We don't want any of the // We reached the head of the topic buffer. We don't want any of the
// events in the topic buffer as they came before the snapshot. // events in the topic buffer as they came before the snapshot.
// Append a link to any future items. // Append a link to any future items.
s.buffer.AppendItem(item.NextLink()) s.buffer.AppendItem(next)
return return
} }
// Proceed to the next item in the topic buffer // Proceed to the next item in the topic buffer

View File

@ -95,7 +95,7 @@ func TestEventSnapshot(t *testing.T) {
// be appended before the snapshot fn executes in another goroutine since // be appended before the snapshot fn executes in another goroutine since
// it's operating an a possible stale "snapshot". This is the same as // it's operating an a possible stale "snapshot". This is the same as
// reality with the state store where updates that occur after the // reality with the state store where updates that occur after the
// snapshot is taken but while the SnapFnis still running must be captured // snapshot is taken but while the SnapFn is still running must be captured
// correctly. // correctly.
for i := 0; i < tc.updatesAfterSnap; i++ { for i := 0; i < tc.updatesAfterSnap; i++ {
index := snapIndex + 1 + uint64(i) index := snapIndex + 1 + uint64(i)