diff --git a/agent/consul/state/acl_events.go b/agent/consul/state/acl_events.go index 22bf7e2453..7b20c9fd46 100644 --- a/agent/consul/state/acl_events.go +++ b/agent/consul/state/acl_events.go @@ -50,7 +50,8 @@ func aclChangeUnsubscribeEvent(tx ReadTxn, changes Changes) ([]stream.Event, err } } } - // TODO: should we remove duplicate IDs here, or rely on sub.Close() being idempotent + // There may be duplicate secretIDs here. We rely on this event allowing + // for duplicate IDs. return []stream.Event{stream.NewCloseSubscriptionEvent(secretIDs)}, nil } diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 5ec7e44bac..601f765bf7 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -50,7 +50,7 @@ func (c *changeTrackerDB) Txn(write bool) *txn { // memdb.Txn // // TODO: this could return a regular memdb.Txn if all the state functions accepted -// and interface +// the ReadTxn interface func (c *changeTrackerDB) ReadTxn() *txn { return &txn{Txn: c.db.Txn(false)} } @@ -109,7 +109,7 @@ func (c *changeTrackerDB) WriteTxnRestore() *txn { type txn struct { *memdb.Txn // Index in raft where the write is occurring. The value is zero for a - // read-only, and WriteTxnRestore transaction. + // read-only, or WriteTxnRestore transaction. // Index is stored so that it may be passed along to any subscribers as part // of a change event. Index uint64 diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index d78c29e43c..d45ab80182 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -19,17 +19,23 @@ type Event struct { Payload interface{} } +// IsEndOfSnapshot returns true if this is a framing event that indicates the +// snapshot has completed. Future events from Subscription.Next will be +// change events. func (e Event) IsEndOfSnapshot() bool { return e.Payload == endOfSnapshot{} } -func (e Event) IsResumeStream() bool { - return e.Payload == resumeStream{} +// IsEndOfEmptySnapshot returns true if this is a framing event that indicates +// there is no snapshot. Future events from Subscription.Next will be +// change events. +func (e Event) IsEndOfEmptySnapshot() bool { + return e.Payload == endOfEmptySnapshot{} } type endOfSnapshot struct{} -type resumeStream struct{} +type endOfEmptySnapshot struct{} type closeSubscriptionPayload struct { tokensSecretIDs []string @@ -39,6 +45,8 @@ type closeSubscriptionPayload struct { // stream package, and is never sent to subscribers. EventProcessor handles // these events, and closes any subscriptions which were created using a token // which matches any of the tokenSecretIDs. +// +// tokenSecretIDs may contain duplicate IDs. func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event { return Event{Payload: closeSubscriptionPayload{tokensSecretIDs: tokenSecretIDs}} } diff --git a/agent/consul/stream/event_buffer.go b/agent/consul/stream/event_buffer.go index d104f689d3..f908f10fcb 100644 --- a/agent/consul/stream/event_buffer.go +++ b/agent/consul/stream/event_buffer.go @@ -20,24 +20,24 @@ import ( // goroutines or deliver to O(N) separate channels. // // Because it's a linked list with atomically updated pointers, readers don't -// have to take a lock and can consume at their own pace. but we also don't have -// to have a fixed limit on the number of items which either means we don't have -// to trade off buffer length config to balance using lots of memory wastefully -// vs handling occasional slow readers. +// have to take a lock and can consume at their own pace. We also don't need a +// fixed limit on the number of items which avoids needing to configure +// buffer length to balance wasting lots of memory all the time against being able to +// tolerate occasional slow readers. // -// The buffer is used to deliver all messages broadcast toa topic for active +// The buffer is used to deliver all messages broadcast to a topic for active // subscribers to consume, but it is also an effective way to both deliver and -// optionally cache snapshots per topic and key. byt using an eventBuffer, +// optionally cache snapshots per topic and key. By using an eventBuffer, // snapshot functions don't have to read the whole snapshot into memory before // delivery - they can stream from memdb. However simply by storing a pointer to // the first event in the buffer, we can cache the buffered events for future // watchers on the same topic. Finally, once we've delivered all the snapshot // events to the buffer, we can append a next-element which is the first topic // buffer element with a higher index and so consumers can keep reading the -// same buffer. +// same buffer to have subsequent updates streamed after the snapshot is read. // // A huge benefit here is that caching snapshots becomes very simple - we don't -// have to do any additional book keeping to figure out when to truncate the +// have to do any additional book-keeping to figure out when to truncate the // topic buffer to make sure the snapshot is still usable or run into issues // where the cached snapshot is no longer useful since the buffer will keep // elements around only as long as either the cache or a subscriber need them. diff --git a/agent/consul/stream/event_buffer_test.go b/agent/consul/stream/event_buffer_test.go index bb27b91a40..6f491e62f4 100644 --- a/agent/consul/stream/event_buffer_test.go +++ b/agent/consul/stream/event_buffer_test.go @@ -51,7 +51,7 @@ func TestEventBufferFuzz(t *testing.T) { errCh := make(chan error, nReaders) // Load head here so all subscribers start from the same point or they might - // no run until several appends have already happened. + // not run until several appends have already happened. head := b.Head() for i := 0; i < nReaders; i++ { diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 0b3e7c2a7a..ce17fbe048 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -10,12 +10,6 @@ import ( // EventPublisher receives changes events from Publish, and sends them to all // registered subscribers. type EventPublisher struct { - // topicBufferSize controls how many trailing events we keep in memory for - // each topic to avoid needing to snapshot again for re-connecting clients - // that may have missed some events. It may be zero for no buffering (the most - // recent event is always kept though). TODO - topicBufferSize int - // snapCacheTTL controls how long we keep snapshots in our cache before // allowing them to be garbage collected and a new one made for subsequent // requests for that topic and key. In general this should be pretty short to @@ -34,7 +28,7 @@ type EventPublisher struct { topicBuffers map[Topic]*eventBuffer // snapCache if a cache of EventSnapshots indexed by topic and key. - // TODO: new struct for snapCache and snapFns and snapCacheTTL + // TODO(streaming): new snapshotCache struct for snapCache and snapCacheTTL snapCache map[Topic]map[string]*eventSnapshot subscriptions *subscriptions @@ -70,8 +64,8 @@ type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index // SnapshotAppender appends groups of events to create a Snapshot of state. type SnapshotAppender interface { - // Append events to the snapshot. - // TODO: document why parameter is a slice instead of a single Event + // Append events to the snapshot. Every event in the slice must have the same + // Index, indicating that it is part of the same raft transaction. Append(events []Event) } @@ -122,14 +116,13 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) { // sendEvents sends the given events to any applicable topic listeners, as well // as any ACL update events to cause affected listeners to reset their stream. func (e *EventPublisher) sendEvents(update changeEvents) { + eventsByTopic := make(map[Topic][]Event) for _, event := range update.events { if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok { e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs) + continue } - } - eventsByTopic := make(map[Topic][]Event) - for _, event := range update.events { eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) } @@ -184,14 +177,12 @@ func (e *EventPublisher) Subscribe( var sub *Subscription if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index { // No need for a snapshot, send the "resume stream" message to signal to - // client it's cache is still good. (note that this can be distinguished - // from a legitimate empty snapshot due to the index matching the one the - // client sent), then follow along from here in the topic. + // client its cache is still good, then follow along from here in the topic. e := Event{ Index: req.Index, Topic: req.Topic, Key: req.Key, - Payload: resumeStream{}, + Payload: endOfEmptySnapshot{}, } // Make a new buffer to send to the client containing the resume. buf := newEventBuffer() diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index 0744daf121..0ca24f8088 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -5,7 +5,7 @@ package stream // streamed to possibly multiple subscribers concurrently, and can be trivially // cached by retaining a reference to a Snapshot. Once the reference to eventSnapshot // is dropped from memory, any subscribers still reading from it may do so by following -// their pointers. When the last subscribe unsubscribes the snapshot is garbage +// their pointers. When the last subscriber unsubscribes, the snapshot is garbage // collected automatically by Go's runtime. This simplifies snapshot and buffer // management dramatically. type eventSnapshot struct { @@ -21,7 +21,7 @@ type eventSnapshot struct { type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) // newEventSnapshot creates a snapshot buffer based on the subscription request. -// The current buffer head for the topic in question is passed so that once the +// The current buffer head for the topic requested is passed so that once the // snapshot is complete and has been delivered into the buffer, any events // published during snapshotting can be immediately appended and won't be // missed. Once the snapshot is delivered the topic buffer is spliced onto the