EventPublisher: docstrings and getTopicBuffer

also rename commitCh -> publishCh
This commit is contained in:
Daniel Nephin 2020-06-15 16:59:09 -04:00
parent 555cfe52d9
commit f5ecd5de5f
1 changed files with 25 additions and 21 deletions

View File

@ -28,7 +28,7 @@ type EventPublisher struct {
// not that expensive, but having a cache for a few seconds can help
// de-duplicate building the same snapshot over and over again when a
// thundering herd of watchers all subscribe to the same topic within a few
// seconds. TODO
// seconds.
snapCacheTTL time.Duration
// This lock protects the topicBuffers, snapCache and subsByToken maps.
@ -38,8 +38,8 @@ type EventPublisher struct {
// for a topic.
topicBuffers map[stream.Topic]*stream.EventBuffer
// snapCache stores the head of any snapshot buffers still in cache if caching
// is enabled.
// snapCache if a cache of EventSnapshots indexed by topic and key.
// TODO: new struct for snapCache and snapFns and snapCacheTTL
snapCache map[stream.Topic]map[string]*stream.EventSnapshot
// snapFns is the set of snapshot functions that were registered bound to the
@ -51,9 +51,10 @@ type EventPublisher struct {
// ACL permissions change.
subsByToken map[string]map[*stream.SubscribeRequest]*stream.Subscription
// commitCh decouples the Commit call in the FSM hot path from distributing
// the resulting events.
commitCh chan commitUpdate
// publishCh is used to send messages from an active txn to a goroutine which
// publishes events, so that publishing can happen asynchronously from
// the Commit call in the FSM hot path.
publishCh chan commitUpdate
}
type commitUpdate struct {
@ -70,7 +71,7 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura
snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot),
snapFns: make(map[stream.Topic]stream.SnapFn),
subsByToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription),
commitCh: make(chan commitUpdate, 64),
publishCh: make(chan commitUpdate, 64),
}
// create a local handler table
@ -98,7 +99,7 @@ func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error {
events = append(events, es...)
}
}
e.commitCh <- commitUpdate{
e.publishCh <- commitUpdate{
// TODO: document why it must be created here, and not in the new thread
//
// Create a new transaction since it's going to be used from a different
@ -113,7 +114,7 @@ func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error {
func (e *EventPublisher) handleUpdates() {
for {
update := <-e.commitCh
update := <-e.publishCh
e.sendEvents(update)
}
}
@ -158,15 +159,22 @@ func (e *EventPublisher) sendEvents(update commitUpdate) {
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
}
// Deliver events
for topic, events := range eventsByTopic {
e.getTopicBuffer(topic).Append(events)
}
}
// getTopicBuffer for the topic. Creates a new event buffer if one does not
// already exist.
//
// EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer {
buf, ok := e.topicBuffers[topic]
if !ok {
buf = stream.NewEventBuffer()
e.topicBuffers[topic] = buf
}
buf.Append(events)
}
return buf
}
// handleACLUpdate handles an ACL token/policy/role update. This method assumes
@ -250,11 +258,7 @@ func (e *EventPublisher) Subscribe(
// Ensure there is a topic buffer for that topic so we start capturing any
// future published events.
buf, ok := e.topicBuffers[req.Topic]
if !ok {
buf = stream.NewEventBuffer()
e.topicBuffers[req.Topic] = buf
}
buf := e.getTopicBuffer(req.Topic)
// See if we need a snapshot
topicHead := buf.Head()