mirror of https://github.com/status-im/consul.git
stream: move goroutine out of New
This change will make it easier to manage goroutine lifecycle from the caller. Also expose EventPublisher from state.Store
This commit is contained in:
parent
0fb2a5b992
commit
b7ca15e910
|
@ -42,7 +42,6 @@ func registerCommand(msg structs.MessageType, fn unboundCommand) {
|
||||||
// this outside the Server to avoid exposing this outside the package.
|
// this outside the Server to avoid exposing this outside the package.
|
||||||
type FSM struct {
|
type FSM struct {
|
||||||
logger hclog.Logger
|
logger hclog.Logger
|
||||||
path string
|
|
||||||
|
|
||||||
// apply is built off the commands global and is used to route apply
|
// apply is built off the commands global and is used to route apply
|
||||||
// operations to their appropriate handlers.
|
// operations to their appropriate handlers.
|
||||||
|
|
|
@ -37,14 +37,10 @@ type Changes struct {
|
||||||
// 2. Sent to the eventPublisher which will create and emit change events
|
// 2. Sent to the eventPublisher which will create and emit change events
|
||||||
type changeTrackerDB struct {
|
type changeTrackerDB struct {
|
||||||
db *memdb.MemDB
|
db *memdb.MemDB
|
||||||
publisher eventPublisher
|
publisher *stream.EventPublisher
|
||||||
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
|
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventPublisher interface {
|
|
||||||
Publish(events []stream.Event)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
|
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
|
||||||
// code may use it to create a read-only transaction, but it will panic if called
|
// code may use it to create a read-only transaction, but it will panic if called
|
||||||
// with write=true.
|
// with write=true.
|
||||||
|
|
|
@ -168,14 +168,23 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
|
||||||
lockDelay: NewDelay(),
|
lockDelay: NewDelay(),
|
||||||
stopEventPublisher: cancel,
|
stopEventPublisher: cancel,
|
||||||
}
|
}
|
||||||
|
pub := stream.NewEventPublisher(newSnapshotHandlers(s), 10*time.Second)
|
||||||
s.db = &changeTrackerDB{
|
s.db = &changeTrackerDB{
|
||||||
db: db,
|
db: db,
|
||||||
publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(s), 10*time.Second),
|
publisher: pub,
|
||||||
processChanges: processDBChanges,
|
processChanges: processDBChanges,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go pub.Run(ctx)
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EventPublisher returns the stream.EventPublisher used by the Store to
|
||||||
|
// publish events.
|
||||||
|
func (s *Store) EventPublisher() *stream.EventPublisher {
|
||||||
|
return s.db.publisher
|
||||||
|
}
|
||||||
|
|
||||||
// Snapshot is used to create a point-in-time snapshot of the entire db.
|
// Snapshot is used to create a point-in-time snapshot of the entire db.
|
||||||
func (s *Store) Snapshot() *Snapshot {
|
func (s *Store) Snapshot() *Snapshot {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
|
|
|
@ -28,7 +28,8 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
|
||||||
|
go publisher.Run(ctx)
|
||||||
s.db.publisher = publisher
|
s.db.publisher = publisher
|
||||||
sub, err := publisher.Subscribe(subscription)
|
sub, err := publisher.Subscribe(subscription)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
@ -111,7 +112,8 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
|
||||||
|
go publisher.Run(ctx)
|
||||||
s.db.publisher = publisher
|
s.db.publisher = publisher
|
||||||
sub, err := publisher.Subscribe(subscription)
|
sub, err := publisher.Subscribe(subscription)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
@ -227,7 +229,8 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
|
||||||
|
go publisher.Run(ctx)
|
||||||
s.db.publisher = publisher
|
s.db.publisher = publisher
|
||||||
sub, err := publisher.Subscribe(subscription)
|
sub, err := publisher.Subscribe(subscription)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
@ -433,7 +436,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
|
publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
|
||||||
|
go publisher.Run(ctx)
|
||||||
|
|
||||||
s.db.publisher = publisher
|
s.db.publisher = publisher
|
||||||
sub, err := publisher.Subscribe(req)
|
sub, err := publisher.Subscribe(req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -79,7 +79,7 @@ type SnapshotAppender interface {
|
||||||
// A goroutine is run in the background to publish events to all subscribes.
|
// A goroutine is run in the background to publish events to all subscribes.
|
||||||
// Cancelling the context will shutdown the goroutine, to free resources,
|
// Cancelling the context will shutdown the goroutine, to free resources,
|
||||||
// and stop all publishing.
|
// and stop all publishing.
|
||||||
func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
|
func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
|
||||||
e := &EventPublisher{
|
e := &EventPublisher{
|
||||||
snapCacheTTL: snapCacheTTL,
|
snapCacheTTL: snapCacheTTL,
|
||||||
topicBuffers: make(map[Topic]*eventBuffer),
|
topicBuffers: make(map[Topic]*eventBuffer),
|
||||||
|
@ -91,8 +91,6 @@ func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCache
|
||||||
snapshotHandlers: handlers,
|
snapshotHandlers: handlers,
|
||||||
}
|
}
|
||||||
|
|
||||||
go e.handleUpdates(ctx)
|
|
||||||
|
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,7 +101,9 @@ func (e *EventPublisher) Publish(events []Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EventPublisher) handleUpdates(ctx context.Context) {
|
// Run the event publisher until ctx is cancelled. Run should be called from a
|
||||||
|
// goroutine to forward events from Publish to all the appropriate subscribers.
|
||||||
|
func (e *EventPublisher) Run(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -25,7 +25,9 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0)
|
publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
|
||||||
|
go publisher.Run(ctx)
|
||||||
|
|
||||||
sub, err := publisher.Subscribe(subscription)
|
sub, err := publisher.Subscribe(subscription)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
eventCh := consumeSubscription(ctx, sub)
|
eventCh := consumeSubscription(ctx, sub)
|
||||||
|
@ -123,7 +125,8 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
|
||||||
handlers[intTopic(22)] = fn
|
handlers[intTopic(22)] = fn
|
||||||
handlers[intTopic(33)] = fn
|
handlers[intTopic(33)] = fn
|
||||||
|
|
||||||
publisher := NewEventPublisher(ctx, handlers, time.Second)
|
publisher := NewEventPublisher(handlers, time.Second)
|
||||||
|
go publisher.Run(ctx)
|
||||||
|
|
||||||
sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)})
|
sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
Loading…
Reference in New Issue