diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index 85abe19287..cc0cb9a071 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -42,7 +42,6 @@ func registerCommand(msg structs.MessageType, fn unboundCommand) { // this outside the Server to avoid exposing this outside the package. type FSM struct { logger hclog.Logger - path string // apply is built off the commands global and is used to route apply // operations to their appropriate handlers. diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index beb46d62d8..5e6bbb604a 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -37,14 +37,10 @@ type Changes struct { // 2. Sent to the eventPublisher which will create and emit change events type changeTrackerDB struct { db *memdb.MemDB - publisher eventPublisher + publisher *stream.EventPublisher processChanges func(ReadTxn, Changes) ([]stream.Event, error) } -type eventPublisher interface { - Publish(events []stream.Event) -} - // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting // code may use it to create a read-only transaction, but it will panic if called // with write=true. diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index 3a7229607c..a02cd864da 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -168,14 +168,23 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { lockDelay: NewDelay(), stopEventPublisher: cancel, } + pub := stream.NewEventPublisher(newSnapshotHandlers(s), 10*time.Second) s.db = &changeTrackerDB{ db: db, - publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(s), 10*time.Second), + publisher: pub, processChanges: processDBChanges, } + + go pub.Run(ctx) return s, nil } +// EventPublisher returns the stream.EventPublisher used by the Store to +// publish events. +func (s *Store) EventPublisher() *stream.EventPublisher { + return s.db.publisher +} + // Snapshot is used to create a point-in-time snapshot of the entire db. func (s *Store) Snapshot() *Snapshot { tx := s.db.Txn(false) diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 14d667ce39..f9c978ed3c 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -28,7 +28,8 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) @@ -111,7 +112,8 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) @@ -227,7 +229,8 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) @@ -433,7 +436,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) + s.db.publisher = publisher sub, err := publisher.Subscribe(req) require.NoError(t, err) diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 815a68a261..1de9e10580 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -79,7 +79,7 @@ type SnapshotAppender interface { // A goroutine is run in the background to publish events to all subscribes. // Cancelling the context will shutdown the goroutine, to free resources, // and stop all publishing. -func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher { +func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, topicBuffers: make(map[Topic]*eventBuffer), @@ -91,8 +91,6 @@ func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCache snapshotHandlers: handlers, } - go e.handleUpdates(ctx) - return e } @@ -103,7 +101,9 @@ func (e *EventPublisher) Publish(events []Event) { } } -func (e *EventPublisher) handleUpdates(ctx context.Context) { +// Run the event publisher until ctx is cancelled. Run should be called from a +// goroutine to forward events from Publish to all the appropriate subscribers. +func (e *EventPublisher) Run(ctx context.Context) { for { select { case <-ctx.Done(): diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 4448e68454..63f7d763d9 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -25,7 +25,9 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0) + publisher := NewEventPublisher(newTestSnapshotHandlers(), 0) + go publisher.Run(ctx) + sub, err := publisher.Subscribe(subscription) require.NoError(t, err) eventCh := consumeSubscription(ctx, sub) @@ -123,7 +125,8 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { handlers[intTopic(22)] = fn handlers[intTopic(33)] = fn - publisher := NewEventPublisher(ctx, handlers, time.Second) + publisher := NewEventPublisher(handlers, time.Second) + go publisher.Run(ctx) sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)}) require.NoError(t, err)