diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index e20e53a4a1..f85c3f9e27 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -49,34 +49,33 @@ func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, t func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) { item := topicBufferHead for { - next := item.NextNoBlock() switch { - case next == nil: - // This is the head of the topic buffer (or was just now which is after - // the snapshot completed). We don't want any of the events (if any) in - // the snapshot buffer as they came before the snapshot but we do need to - // wait for the next update. - s.buffer.AppendItem(item.NextLink()) - return - - case next.Err != nil: + case item.Err != nil: // This case is not currently possible because errors can only come // from a snapshot func, and this is consuming events from a topic // buffer which does not contain a snapshot. // Handle this case anyway in case errors can come from other places // in the future. - s.buffer.AppendItem(next) + s.buffer.AppendItem(item) return - case len(next.Events) > 0 && next.Events[0].Index > idx: + case len(item.Events) > 0 && item.Events[0].Index > idx: // We've found an update in the topic buffer that happened after our // snapshot was taken, splice it into the snapshot buffer so subscribers // can continue to read this and others after it. - s.buffer.AppendItem(next) + s.buffer.AppendItem(item) return } - // We don't need this item, continue to next + next := item.NextNoBlock() + if next == nil { + // We reached the head of the topic buffer. We don't want any of the + // events in the topic buffer as they came before the snapshot. + // Append a link to any future items. + s.buffer.AppendItem(item.NextLink()) + return + } + // Proceed to the next item in the topic buffer item = next } }