stream: Improve docstrings

Also rename ResumeStrema to EndOfEmptySnapshot to be more consistent with other framing events

Co-authored-by: Paul Banks <banks@banksco.de>
This commit is contained in:
Daniel Nephin 2020-07-08 14:45:18 -04:00
parent fc1c2ae412
commit f19f8e99bb
7 changed files with 33 additions and 33 deletions

View File

@ -50,7 +50,8 @@ func aclChangeUnsubscribeEvent(tx ReadTxn, changes Changes) ([]stream.Event, err
} }
} }
} }
// TODO: should we remove duplicate IDs here, or rely on sub.Close() being idempotent // There may be duplicate secretIDs here. We rely on this event allowing
// for duplicate IDs.
return []stream.Event{stream.NewCloseSubscriptionEvent(secretIDs)}, nil return []stream.Event{stream.NewCloseSubscriptionEvent(secretIDs)}, nil
} }

View File

@ -50,7 +50,7 @@ func (c *changeTrackerDB) Txn(write bool) *txn {
// memdb.Txn // memdb.Txn
// //
// TODO: this could return a regular memdb.Txn if all the state functions accepted // TODO: this could return a regular memdb.Txn if all the state functions accepted
// and interface // the ReadTxn interface
func (c *changeTrackerDB) ReadTxn() *txn { func (c *changeTrackerDB) ReadTxn() *txn {
return &txn{Txn: c.db.Txn(false)} return &txn{Txn: c.db.Txn(false)}
} }
@ -109,7 +109,7 @@ func (c *changeTrackerDB) WriteTxnRestore() *txn {
type txn struct { type txn struct {
*memdb.Txn *memdb.Txn
// Index in raft where the write is occurring. The value is zero for a // Index in raft where the write is occurring. The value is zero for a
// read-only, and WriteTxnRestore transaction. // read-only, or WriteTxnRestore transaction.
// Index is stored so that it may be passed along to any subscribers as part // Index is stored so that it may be passed along to any subscribers as part
// of a change event. // of a change event.
Index uint64 Index uint64

View File

@ -19,17 +19,23 @@ type Event struct {
Payload interface{} Payload interface{}
} }
// IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Future events from Subscription.Next will be
// change events.
func (e Event) IsEndOfSnapshot() bool { func (e Event) IsEndOfSnapshot() bool {
return e.Payload == endOfSnapshot{} return e.Payload == endOfSnapshot{}
} }
func (e Event) IsResumeStream() bool { // IsEndOfEmptySnapshot returns true if this is a framing event that indicates
return e.Payload == resumeStream{} // there is no snapshot. Future events from Subscription.Next will be
// change events.
func (e Event) IsEndOfEmptySnapshot() bool {
return e.Payload == endOfEmptySnapshot{}
} }
type endOfSnapshot struct{} type endOfSnapshot struct{}
type resumeStream struct{} type endOfEmptySnapshot struct{}
type closeSubscriptionPayload struct { type closeSubscriptionPayload struct {
tokensSecretIDs []string tokensSecretIDs []string
@ -39,6 +45,8 @@ type closeSubscriptionPayload struct {
// stream package, and is never sent to subscribers. EventProcessor handles // stream package, and is never sent to subscribers. EventProcessor handles
// these events, and closes any subscriptions which were created using a token // these events, and closes any subscriptions which were created using a token
// which matches any of the tokenSecretIDs. // which matches any of the tokenSecretIDs.
//
// tokenSecretIDs may contain duplicate IDs.
func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event { func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event {
return Event{Payload: closeSubscriptionPayload{tokensSecretIDs: tokenSecretIDs}} return Event{Payload: closeSubscriptionPayload{tokensSecretIDs: tokenSecretIDs}}
} }

View File

@ -20,24 +20,24 @@ import (
// goroutines or deliver to O(N) separate channels. // goroutines or deliver to O(N) separate channels.
// //
// Because it's a linked list with atomically updated pointers, readers don't // Because it's a linked list with atomically updated pointers, readers don't
// have to take a lock and can consume at their own pace. but we also don't have // have to take a lock and can consume at their own pace. We also don't need a
// to have a fixed limit on the number of items which either means we don't have // fixed limit on the number of items which avoids needing to configure
// to trade off buffer length config to balance using lots of memory wastefully // buffer length to balance wasting lots of memory all the time against being able to
// vs handling occasional slow readers. // tolerate occasional slow readers.
// //
// The buffer is used to deliver all messages broadcast to a topic for active // The buffer is used to deliver all messages broadcast to a topic for active
// subscribers to consume, but it is also an effective way to both deliver and // subscribers to consume, but it is also an effective way to both deliver and
// optionally cache snapshots per topic and key. byt using an eventBuffer, // optionally cache snapshots per topic and key. By using an eventBuffer,
// snapshot functions don't have to read the whole snapshot into memory before // snapshot functions don't have to read the whole snapshot into memory before
// delivery - they can stream from memdb. However simply by storing a pointer to // delivery - they can stream from memdb. However simply by storing a pointer to
// the first event in the buffer, we can cache the buffered events for future // the first event in the buffer, we can cache the buffered events for future
// watchers on the same topic. Finally, once we've delivered all the snapshot // watchers on the same topic. Finally, once we've delivered all the snapshot
// events to the buffer, we can append a next-element which is the first topic // events to the buffer, we can append a next-element which is the first topic
// buffer element with a higher index and so consumers can keep reading the // buffer element with a higher index and so consumers can keep reading the
// same buffer. // same buffer to have subsequent updates streamed after the snapshot is read.
// //
// A huge benefit here is that caching snapshots becomes very simple - we don't // A huge benefit here is that caching snapshots becomes very simple - we don't
// have to do any additional book keeping to figure out when to truncate the // have to do any additional book-keeping to figure out when to truncate the
// topic buffer to make sure the snapshot is still usable or run into issues // topic buffer to make sure the snapshot is still usable or run into issues
// where the cached snapshot is no longer useful since the buffer will keep // where the cached snapshot is no longer useful since the buffer will keep
// elements around only as long as either the cache or a subscriber need them. // elements around only as long as either the cache or a subscriber need them.

View File

@ -51,7 +51,7 @@ func TestEventBufferFuzz(t *testing.T) {
errCh := make(chan error, nReaders) errCh := make(chan error, nReaders)
// Load head here so all subscribers start from the same point or they might // Load head here so all subscribers start from the same point or they might
// no run until several appends have already happened. // not run until several appends have already happened.
head := b.Head() head := b.Head()
for i := 0; i < nReaders; i++ { for i := 0; i < nReaders; i++ {

View File

@ -10,12 +10,6 @@ import (
// EventPublisher receives changes events from Publish, and sends them to all // EventPublisher receives changes events from Publish, and sends them to all
// registered subscribers. // registered subscribers.
type EventPublisher struct { type EventPublisher struct {
// topicBufferSize controls how many trailing events we keep in memory for
// each topic to avoid needing to snapshot again for re-connecting clients
// that may have missed some events. It may be zero for no buffering (the most
// recent event is always kept though). TODO
topicBufferSize int
// snapCacheTTL controls how long we keep snapshots in our cache before // snapCacheTTL controls how long we keep snapshots in our cache before
// allowing them to be garbage collected and a new one made for subsequent // allowing them to be garbage collected and a new one made for subsequent
// requests for that topic and key. In general this should be pretty short to // requests for that topic and key. In general this should be pretty short to
@ -34,7 +28,7 @@ type EventPublisher struct {
topicBuffers map[Topic]*eventBuffer topicBuffers map[Topic]*eventBuffer
// snapCache if a cache of EventSnapshots indexed by topic and key. // snapCache if a cache of EventSnapshots indexed by topic and key.
// TODO: new struct for snapCache and snapFns and snapCacheTTL // TODO(streaming): new snapshotCache struct for snapCache and snapCacheTTL
snapCache map[Topic]map[string]*eventSnapshot snapCache map[Topic]map[string]*eventSnapshot
subscriptions *subscriptions subscriptions *subscriptions
@ -70,8 +64,8 @@ type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index
// SnapshotAppender appends groups of events to create a Snapshot of state. // SnapshotAppender appends groups of events to create a Snapshot of state.
type SnapshotAppender interface { type SnapshotAppender interface {
// Append events to the snapshot. // Append events to the snapshot. Every event in the slice must have the same
// TODO: document why parameter is a slice instead of a single Event // Index, indicating that it is part of the same raft transaction.
Append(events []Event) Append(events []Event)
} }
@ -122,14 +116,13 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) {
// sendEvents sends the given events to any applicable topic listeners, as well // sendEvents sends the given events to any applicable topic listeners, as well
// as any ACL update events to cause affected listeners to reset their stream. // as any ACL update events to cause affected listeners to reset their stream.
func (e *EventPublisher) sendEvents(update changeEvents) { func (e *EventPublisher) sendEvents(update changeEvents) {
eventsByTopic := make(map[Topic][]Event)
for _, event := range update.events { for _, event := range update.events {
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok { if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs) e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
} continue
} }
eventsByTopic := make(map[Topic][]Event)
for _, event := range update.events {
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
} }
@ -184,14 +177,12 @@ func (e *EventPublisher) Subscribe(
var sub *Subscription var sub *Subscription
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index { if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
// No need for a snapshot, send the "resume stream" message to signal to // No need for a snapshot, send the "resume stream" message to signal to
// client it's cache is still good. (note that this can be distinguished // client its cache is still good, then follow along from here in the topic.
// from a legitimate empty snapshot due to the index matching the one the
// client sent), then follow along from here in the topic.
e := Event{ e := Event{
Index: req.Index, Index: req.Index,
Topic: req.Topic, Topic: req.Topic,
Key: req.Key, Key: req.Key,
Payload: resumeStream{}, Payload: endOfEmptySnapshot{},
} }
// Make a new buffer to send to the client containing the resume. // Make a new buffer to send to the client containing the resume.
buf := newEventBuffer() buf := newEventBuffer()

View File

@ -5,7 +5,7 @@ package stream
// streamed to possibly multiple subscribers concurrently, and can be trivially // streamed to possibly multiple subscribers concurrently, and can be trivially
// cached by retaining a reference to a Snapshot. Once the reference to eventSnapshot // cached by retaining a reference to a Snapshot. Once the reference to eventSnapshot
// is dropped from memory, any subscribers still reading from it may do so by following // is dropped from memory, any subscribers still reading from it may do so by following
// their pointers. When the last subscribe unsubscribes the snapshot is garbage // their pointers. When the last subscriber unsubscribes, the snapshot is garbage
// collected automatically by Go's runtime. This simplifies snapshot and buffer // collected automatically by Go's runtime. This simplifies snapshot and buffer
// management dramatically. // management dramatically.
type eventSnapshot struct { type eventSnapshot struct {
@ -21,7 +21,7 @@ type eventSnapshot struct {
type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error)
// newEventSnapshot creates a snapshot buffer based on the subscription request. // newEventSnapshot creates a snapshot buffer based on the subscription request.
// The current buffer head for the topic in question is passed so that once the // The current buffer head for the topic requested is passed so that once the
// snapshot is complete and has been delivered into the buffer, any events // snapshot is complete and has been delivered into the buffer, any events
// published during snapshotting can be immediately appended and won't be // published during snapshotting can be immediately appended and won't be
// missed. Once the snapshot is delivered the topic buffer is spliced onto the // missed. Once the snapshot is delivered the topic buffer is spliced onto the