diff --git a/agent/consul/state/event_publisher.go b/agent/consul/state/event_publisher.go index 8a5d97349d..800fa8a78d 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/state/event_publisher.go @@ -29,7 +29,7 @@ type EventPublisher struct { // seconds. snapCacheTTL time.Duration - // This lock protects the topicBuffers, snapCache and subsByToken maps. + // This lock protects the topicBuffers, and snapCache lock sync.RWMutex // topicBuffers stores the head of the linked-list buffer to publish events to @@ -40,10 +40,7 @@ type EventPublisher struct { // TODO: new struct for snapCache and snapFns and snapCacheTTL snapCache map[stream.Topic]map[string]*stream.EventSnapshot - // subsByToken stores a list of Subscription objects outstanding indexed by a - // hash of the ACL token they used to subscribe so we can reload them if their - // ACL permissions change. - subsByToken map[string]map[*stream.SubscribeRequest]*stream.Subscription + subscriptions *subscriptions // publishCh is used to send messages from an active txn to a goroutine which // publishes events, so that publishing can happen asynchronously from @@ -53,8 +50,16 @@ type EventPublisher struct { handlers map[stream.Topic]topicHandler } +type subscriptions struct { + lock sync.RWMutex + + // subsByToken stores a list of Subscription objects outstanding indexed by a + // hash of the ACL token they used to subscribe so we can reload them if their + // ACL permissions change. + subsByToken map[string]map[*stream.SubscribeRequest]*stream.Subscription +} + type commitUpdate struct { - tx ReadTxn events []stream.Event } @@ -64,7 +69,10 @@ func NewEventPublisher(handlers map[stream.Topic]topicHandler, snapCacheTTL time topicBuffers: make(map[stream.Topic]*stream.EventBuffer), snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), publishCh: make(chan commitUpdate, 64), - handlers: handlers, + subscriptions: &subscriptions{ + subsByToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription), + }, + handlers: handlers, } go e.handleUpdates() @@ -83,16 +91,29 @@ func (e *EventPublisher) PublishChanges(tx *txn, changes memdb.Changes) error { events = append(events, es...) } } - e.publishCh <- commitUpdate{ - // TODO: document why it must be created here, and not in the new thread - // - // Create a new transaction since it's going to be used from a different - // thread. Transactions aren't thread safe but it's OK to create it here - // since we won't try to use it in this thread and pass it straight to the - // handler which will own it exclusively. - tx: e.db.Txn(false), - events: events, + + for _, event := range events { + // If the event is an ACL update, treat it as a special case. Currently + // ACL update events are only used internally to recognize when a subscriber + // should reload its subscription. + if event.Topic == stream.Topic_ACLTokens || + event.Topic == stream.Topic_ACLPolicies || + event.Topic == stream.Topic_ACLRoles { + + if err := e.subscriptions.handleACLUpdate(tx, event); err != nil { + // This seems pretty drastic? What would be better. It's not super safe + // to continue since we might have missed some ACL update and so leak + // data to unauthorized clients but crashing whole server also seems + // bad. I wonder if we could send a "reset" to all subscribers instead + // and effectively re-start all subscriptions to be on the safe side + // without just crashing? + // TODO(banks): reset all instead of panic? + panic(err) + } + } } + + e.publishCh <- commitUpdate{events: events} return nil } @@ -106,43 +127,13 @@ func (e *EventPublisher) handleUpdates() { // sendEvents sends the given events to any applicable topic listeners, as well // as any ACL update events to cause affected listeners to reset their stream. func (e *EventPublisher) sendEvents(update commitUpdate) { - e.lock.Lock() - defer e.lock.Unlock() - - // Always abort the transaction. This is not strictly necessary with memDB - // because once we drop the reference to the Txn object, the radix nodes will - // be GCed anyway but it's hygienic incase memDB ever has a different - // implementation. - defer update.tx.Abort() - eventsByTopic := make(map[stream.Topic][]stream.Event) - for _, event := range update.events { - // If the event is an ACL update, treat it as a special case. Currently - // ACL update events are only used internally to recognize when a subscriber - // should reload its subscription. - if event.Topic == stream.Topic_ACLTokens || - event.Topic == stream.Topic_ACLPolicies || - event.Topic == stream.Topic_ACLRoles { - - if err := e.handleACLUpdate(update.tx, event); err != nil { - // This seems pretty drastic? What would be better. It's not super safe - // to continue since we might have missed some ACL update and so leak - // data to unauthorized clients but crashing whole server also seems - // bad. I wonder if we could send a "reset" to all subscribers instead - // and effectively re-start all subscriptions to be on the safe side - // without just crashing? - // TODO(banks): reset all instead of panic? - panic(err) - } - - continue - } - - // Split events by topic to deliver. eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) } + e.lock.Lock() + defer e.lock.Unlock() for topic, events := range eventsByTopic { e.getTopicBuffer(topic).Append(events) } @@ -161,13 +152,15 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer return buf } -// handleACLUpdate handles an ACL token/policy/role update. This method assumes -// the lock is held. -func (e *EventPublisher) handleACLUpdate(tx ReadTxn, event stream.Event) error { +// handleACLUpdate handles an ACL token/policy/role update. +func (s *subscriptions) handleACLUpdate(tx ReadTxn, event stream.Event) error { + s.lock.RLock() + defer s.lock.RUnlock() + switch event.Topic { case stream.Topic_ACLTokens: token := event.Payload.(*structs.ACLToken) - for _, sub := range e.subsByToken[token.SecretID] { + for _, sub := range s.subsByToken[token.SecretID] { sub.CloseReload() } @@ -177,7 +170,7 @@ func (e *EventPublisher) handleACLUpdate(tx ReadTxn, event stream.Event) error { if err != nil { return err } - e.closeSubscriptionsForTokens(tokens) + s.closeSubscriptionsForTokens(tokens) // Find any roles using this policy so tokens with those roles can be reloaded. roles, err := aclRoleListByPolicy(tx, policy.ID, &policy.EnterpriseMeta) @@ -191,7 +184,7 @@ func (e *EventPublisher) handleACLUpdate(tx ReadTxn, event stream.Event) error { if err != nil { return err } - e.closeSubscriptionsForTokens(tokens) + s.closeSubscriptionsForTokens(tokens) } case stream.Topic_ACLRoles: @@ -200,17 +193,17 @@ func (e *EventPublisher) handleACLUpdate(tx ReadTxn, event stream.Event) error { if err != nil { return err } - e.closeSubscriptionsForTokens(tokens) + s.closeSubscriptionsForTokens(tokens) } return nil } // This method requires the EventPublisher.lock is held -func (e *EventPublisher) closeSubscriptionsForTokens(tokens memdb.ResultIterator) { +func (s *subscriptions) closeSubscriptionsForTokens(tokens memdb.ResultIterator) { for token := tokens.Next(); token != nil; token = tokens.Next() { token := token.(*structs.ACLToken) - if subs, ok := e.subsByToken[token.SecretID]; ok { + if subs, ok := s.subsByToken[token.SecretID]; ok { for _, sub := range subs { sub.CloseReload() } @@ -284,30 +277,37 @@ func (e *EventPublisher) Subscribe( sub = stream.NewSubscription(ctx, req, snap.Snap) } - subsByToken, ok := e.subsByToken[req.Token] - if !ok { - subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription) - e.subsByToken[req.Token] = subsByToken - } - subsByToken[req] = sub + e.subscriptions.add(req, sub) return sub, nil } +func (s *subscriptions) add(req *stream.SubscribeRequest, sub *stream.Subscription) { + s.lock.Lock() + defer s.lock.Unlock() + + subsByToken, ok := s.subsByToken[req.Token] + if !ok { + subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription) + s.subsByToken[req.Token] = subsByToken + } + subsByToken[req] = sub +} + // Unsubscribe must be called when a client is no longer interested in a // subscription to free resources monitoring changes in it's ACL token. The same // request object passed to Subscribe must be used. -func (e *EventPublisher) Unsubscribe(req *stream.SubscribeRequest) { - e.lock.Lock() - defer e.lock.Unlock() +func (s *subscriptions) Unsubscribe(req *stream.SubscribeRequest) { + s.lock.Lock() + defer s.lock.Unlock() - subsByToken, ok := e.subsByToken[req.Token] + subsByToken, ok := s.subsByToken[req.Token] if !ok { return } delete(subsByToken, req) if len(subsByToken) == 0 { - delete(e.subsByToken, req.Token) + delete(s.subsByToken, req.Token) } }