stream: refactor to support change in framing events

Removing EndOfEmptySnapshot, add NewSnapshotToFollow
This commit is contained in:
Daniel Nephin 2020-10-01 13:51:55 -04:00
parent 526d005d72
commit a5df5d17b4
8 changed files with 114 additions and 117 deletions

View File

@ -20,22 +20,22 @@ type Event struct {
} }
// IsEndOfSnapshot returns true if this is a framing event that indicates the // IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Future events from Subscription.Next will be // snapshot has completed. Subsequent events from Subscription.Next will be
// change events. // streamed as they occur.
func (e Event) IsEndOfSnapshot() bool { func (e Event) IsEndOfSnapshot() bool {
return e.Payload == endOfSnapshot{} return e.Payload == endOfSnapshot{}
} }
// IsEndOfEmptySnapshot returns true if this is a framing event that indicates // IsNewSnapshotToFollow returns true if this is a framing event that indicates
// there is no snapshot. Future events from Subscription.Next will be // that the clients view is stale, and must be reset. Subsequent events from
// change events. // Subscription.Next will be a new snapshot, followed by an EndOfSnapshot event.
func (e Event) IsEndOfEmptySnapshot() bool { func (e Event) IsNewSnapshotToFollow() bool {
return e.Payload == endOfEmptySnapshot{} return e.Payload == newSnapshotToFollow{}
} }
type endOfSnapshot struct{} type endOfSnapshot struct{}
type endOfEmptySnapshot struct{} type newSnapshotToFollow struct{}
type closeSubscriptionPayload struct { type closeSubscriptionPayload struct {
tokensSecretIDs []string tokensSecretIDs []string

View File

@ -216,3 +216,9 @@ func (i *bufferItem) NextLink() *bufferItem {
} }
return next return next
} }
// HasEventIndex returns true if index matches the Event.Index of this item. Returns
// false if there are no events stored in the item, or the index does not match.
func (i *bufferItem) HasEventIndex(index uint64) bool {
return len(i.Events) > 0 && i.Events[0].Index == index
}

View File

@ -157,8 +157,7 @@ func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
// When the caller is finished with the subscription for any reason, it must // When the caller is finished with the subscription for any reason, it must
// call Subscription.Unsubscribe to free ACL tracking resources. // call Subscription.Unsubscribe to free ACL tracking resources.
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) { func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) {
// Ensure we know how to make a snapshot for this topic handler, ok := e.snapshotHandlers[req.Topic]
_, ok := e.snapshotHandlers[req.Topic]
if !ok || req.Topic == nil { if !ok || req.Topic == nil {
return nil, fmt.Errorf("unknown topic %v", req.Topic) return nil, fmt.Errorf("unknown topic %v", req.Topic)
} }
@ -166,47 +165,48 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
// Ensure there is a topic buffer for that topic so we start capturing any topicHead := e.getTopicBuffer(req.Topic).Head()
// future published events.
buf := e.getTopicBuffer(req.Topic)
// See if we need a snapshot // If the client view is fresh, resume the stream.
topicHead := buf.Head() if req.Index > 0 && topicHead.HasEventIndex(req.Index) {
var sub *Subscription
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
// No need for a snapshot, send the "end of empty snapshot" message to signal to
// client its cache is still good, then follow along from here in the topic.
buf := newEventBuffer() buf := newEventBuffer()
subscriptionHead := buf.Head()
// splice the rest of the topic buffer onto the subscription buffer so
// the subscription will receive new events.
buf.AppendItem(topicHead.NextLink())
return e.subscriptions.add(req, subscriptionHead), nil
}
// Store the head of that buffer before we append to it to give as the snapFromCache := e.getCachedSnapshotLocked(req)
// starting point for the subscription. if req.Index == 0 && snapFromCache != nil {
subHead := buf.Head() return e.subscriptions.add(req, snapFromCache.First), nil
}
snap := newEventSnapshot()
buf.Append([]Event{{ // TODO: testcase for this case, especially the from-cache-splice case
Index: req.Index, // if the request has an Index the client view is stale and must be reset
// with a NewSnapshotToFollow event.
if req.Index > 0 {
snap.buffer.Append([]Event{{
Topic: req.Topic, Topic: req.Topic,
Key: req.Key, Key: req.Key,
Payload: endOfEmptySnapshot{}, Payload: newSnapshotToFollow{},
}}) }})
// Now splice the rest of the topic buffer on so the subscription will if snapFromCache != nil {
// continue to see future updates in the topic buffer. snap.buffer.AppendItem(snapFromCache.First)
buf.AppendItem(topicHead.NextLink()) return e.subscriptions.add(req, snap.First), nil
sub = newSubscription(req, subHead, e.subscriptions.unsubscribe(req))
} else {
snap, err := e.getSnapshotLocked(req, topicHead)
if err != nil {
return nil, err
} }
sub = newSubscription(req, snap.Head, e.subscriptions.unsubscribe(req))
} }
e.subscriptions.add(req, sub) snap.appendAndSplice(*req, handler, topicHead)
return sub, nil e.setCachedSnapshotLocked(req, snap)
return e.subscriptions.add(req, snap.First), nil
} }
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
sub := newSubscription(*req, head, s.unsubscribe(req))
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -216,6 +216,7 @@ func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
s.byToken[req.Token] = subsByToken s.byToken[req.Token] = subsByToken
} }
subsByToken[req] = sub subsByToken[req] = sub
return sub
} }
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
@ -263,7 +264,8 @@ func (s *subscriptions) closeAll() {
} }
} }
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) { // EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSnapshot {
topicSnaps, ok := e.snapCache[req.Topic] topicSnaps, ok := e.snapCache[req.Topic]
if !ok { if !ok {
topicSnaps = make(map[string]*eventSnapshot) topicSnaps = make(map[string]*eventSnapshot)
@ -272,25 +274,22 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf
snap, ok := topicSnaps[req.Key] snap, ok := topicSnaps[req.Key]
if ok && snap.err() == nil { if ok && snap.err() == nil {
return snap, nil return snap
}
return nil
} }
handler, ok := e.snapshotHandlers[req.Topic] // EventPublisher.lock must be held to call this method.
if !ok { func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *eventSnapshot) {
return nil, fmt.Errorf("unknown topic %v", req.Topic) if e.snapCacheTTL == 0 {
return
} }
e.snapCache[req.Topic][req.Key] = snap
snap = newEventSnapshot(req, topicHead, handler) // Setup a cache eviction
if e.snapCacheTTL > 0 {
topicSnaps[req.Key] = snap
// Trigger a clearout after TTL
time.AfterFunc(e.snapCacheTTL, func() { time.AfterFunc(e.snapCacheTTL, func() {
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
delete(topicSnaps, req.Key) delete(e.snapCache[req.Topic], req.Key)
}) })
} }
return snap, nil
}

View File

@ -9,50 +9,45 @@ package stream
// collected automatically by Go's runtime. This simplifies snapshot and buffer // collected automatically by Go's runtime. This simplifies snapshot and buffer
// management dramatically. // management dramatically.
type eventSnapshot struct { type eventSnapshot struct {
// Head is the first item in the buffer containing the snapshot. Once the // First item in the buffer. Used as the Head of a subscription, or to
// snapshot is complete, subsequent BufferItems are appended to snapBuffer, // splice this snapshot onto another one.
// so that subscribers receive all the events from the same buffer. First *bufferItem
Head *bufferItem
// snapBuffer is the Head of the snapshot buffer the fn should write to. // buffer is the Head of the snapshot buffer the fn should write to.
snapBuffer *eventBuffer buffer *eventBuffer
} }
// newEventSnapshot creates a snapshot buffer based on the subscription request. // newEventSnapshot creates an empty snapshot buffer.
// The current buffer head for the topic requested is passed so that once the func newEventSnapshot() *eventSnapshot {
// snapshot is complete and has been delivered into the buffer, any events snapBuffer := newEventBuffer()
// published during snapshotting can be immediately appended and won't be return &eventSnapshot{
// missed. Once the snapshot is delivered the topic buffer is spliced onto the First: snapBuffer.Head(),
// snapshot buffer so that subscribers will naturally follow from the snapshot buffer: snapBuffer,
// to wait for any subsequent updates. }
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn SnapshotFunc) *eventSnapshot {
buf := newEventBuffer()
s := &eventSnapshot{
Head: buf.Head(),
snapBuffer: buf,
} }
go func() { // appendAndSlice populates the snapshot buffer by calling the SnapshotFunc,
idx, err := fn(*req, s.snapBuffer) // then adding an endOfSnapshot framing event, and finally by splicing in
// events from the topicBuffer.
func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, topicBufferHead *bufferItem) {
idx, err := fn(req, s.buffer)
if err != nil { if err != nil {
s.snapBuffer.AppendItem(&bufferItem{Err: err}) s.buffer.AppendItem(&bufferItem{Err: err})
return return
} }
// We wrote the snapshot events to the buffer, send the "end of snapshot" event s.buffer.Append([]Event{{
s.snapBuffer.Append([]Event{{
Topic: req.Topic, Topic: req.Topic,
Key: req.Key, Key: req.Key,
Index: idx, Index: idx,
Payload: endOfSnapshot{}, Payload: endOfSnapshot{},
}}) }})
s.spliceFromTopicBuffer(topicBufferHead, idx) s.spliceFromTopicBuffer(topicBufferHead, idx)
}()
return s
} }
// spliceFromTopicBuffer traverses the topicBuffer looking for the last item
// in the buffer, or the first item where the index is greater than idx. Once
// the item is found it is appended to the snapshot buffer.
func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) { func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) {
// Now splice on the topic buffer. We need to iterate through the buffer to
// find the first event after the current snapshot.
item := topicBufferHead item := topicBufferHead
for { for {
next := item.NextNoBlock() next := item.NextNoBlock()
@ -62,7 +57,7 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
// the snapshot completed). We don't want any of the events (if any) in // the snapshot completed). We don't want any of the events (if any) in
// the snapshot buffer as they came before the snapshot but we do need to // the snapshot buffer as they came before the snapshot but we do need to
// wait for the next update. // wait for the next update.
s.snapBuffer.AppendItem(item.NextLink()) s.buffer.AppendItem(item.NextLink())
return return
case next.Err != nil: case next.Err != nil:
@ -71,14 +66,14 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
// buffer which does not contain a snapshot. // buffer which does not contain a snapshot.
// Handle this case anyway in case errors can come from other places // Handle this case anyway in case errors can come from other places
// in the future. // in the future.
s.snapBuffer.AppendItem(next) s.buffer.AppendItem(next)
return return
case len(next.Events) > 0 && next.Events[0].Index > idx: case len(next.Events) > 0 && next.Events[0].Index > idx:
// We've found an update in the topic buffer that happened after our // We've found an update in the topic buffer that happened after our
// snapshot was taken, splice it into the snapshot buffer so subscribers // snapshot was taken, splice it into the snapshot buffer so subscribers
// can continue to read this and others after it. // can continue to read this and others after it.
s.snapBuffer.AppendItem(next) s.buffer.AppendItem(next)
return return
} }
@ -93,6 +88,6 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
func (s *eventSnapshot) err() error { func (s *eventSnapshot) err() error {
// Fetch the head of the buffer, this is atomic. If the snapshot func errored // Fetch the head of the buffer, this is atomic. If the snapshot func errored
// then the last event will be an error. // then the last event will be an error.
head := s.snapBuffer.Head() head := s.buffer.Head()
return head.Err return head.Err
} }

View File

@ -87,9 +87,8 @@ func TestEventSnapshot(t *testing.T) {
tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)}) tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)})
} }
// Create eventSnapshot, (will call snFn in another goroutine). The es := newEventSnapshot()
// Request is ignored by the snapFunc so doesn't matter for now. es.appendAndSplice(SubscribeRequest{}, snFn, tbHead)
es := newEventSnapshot(&SubscribeRequest{}, tbHead, snFn)
// Deliver any post-snapshot events simulating updates that occur // Deliver any post-snapshot events simulating updates that occur
// logically after snapshot. It doesn't matter that these might actually // logically after snapshot. It doesn't matter that these might actually
@ -112,7 +111,7 @@ func TestEventSnapshot(t *testing.T) {
snapIDs := make([]string, 0, tc.snapshotSize) snapIDs := make([]string, 0, tc.snapshotSize)
updateIDs := make([]string, 0, tc.updatesAfterSnap) updateIDs := make([]string, 0, tc.updatesAfterSnap)
snapDone := false snapDone := false
curItem := es.Head curItem := es.First
var err error var err error
RECV: RECV:
for { for {

View File

@ -11,7 +11,7 @@ func TestEvent_IsEndOfSnapshot(t *testing.T) {
require.True(t, e.IsEndOfSnapshot()) require.True(t, e.IsEndOfSnapshot())
t.Run("not EndOfSnapshot", func(t *testing.T) { t.Run("not EndOfSnapshot", func(t *testing.T) {
e := Event{Payload: endOfEmptySnapshot{}} e := Event{Payload: newSnapshotToFollow{}}
require.False(t, e.IsEndOfSnapshot()) require.False(t, e.IsEndOfSnapshot())
}) })
} }

View File

@ -28,7 +28,7 @@ type Subscription struct {
state uint32 state uint32
// req is the requests that we are responding to // req is the requests that we are responding to
req *SubscribeRequest req SubscribeRequest
// currentItem stores the current snapshot or topic buffer item we are on. It // currentItem stores the current snapshot or topic buffer item we are on. It
// is mutated by calls to Next. // is mutated by calls to Next.
@ -56,7 +56,7 @@ type SubscribeRequest struct {
// newSubscription return a new subscription. The caller is responsible for // newSubscription return a new subscription. The caller is responsible for
// calling Unsubscribe when it is done with the subscription, to free resources. // calling Unsubscribe when it is done with the subscription, to free resources.
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription { func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
return &Subscription{ return &Subscription{
forceClosed: make(chan struct{}), forceClosed: make(chan struct{}),
req: req, req: req,

View File

@ -23,8 +23,7 @@ func TestSubscription(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
// Create a subscription req := SubscribeRequest{
req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "test", Key: "test",
} }
@ -103,8 +102,7 @@ func TestSubscription_Close(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
// Create a subscription req := SubscribeRequest{
req := &SubscribeRequest{
Topic: testTopic, Topic: testTopic,
Key: "test", Key: "test",
} }