server: Abandom state store to shutdown EventPublisher

So that we don't leak goroutines
This commit is contained in:
Daniel Nephin 2020-07-06 18:02:40 -04:00
parent 889c57fd2d
commit 48c766d2c6
2 changed files with 9 additions and 4 deletions

View File

@ -968,6 +968,8 @@ func (s *Server) Shutdown() error {
s.config.NotifyShutdown() s.config.NotifyShutdown()
} }
s.fsm.State().Abandon()
return nil return nil
} }

View File

@ -107,6 +107,10 @@ type Store struct {
// abandoned (usually during a restore). This is only ever closed. // abandoned (usually during a restore). This is only ever closed.
abandonCh chan struct{} abandonCh chan struct{}
// TODO: refactor abondonCh to use a context so that both can use the same
// cancel mechanism.
stopEventPublisher func()
// kvsGraveyard manages tombstones for the key value store. // kvsGraveyard manages tombstones for the key value store.
kvsGraveyard *Graveyard kvsGraveyard *Graveyard
@ -156,10 +160,7 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
return nil, fmt.Errorf("Failed setting up state store: %s", err) return nil, fmt.Errorf("Failed setting up state store: %s", err)
} }
// TODO: context should be cancelled when the store is Abandoned to free ctx, cancel := context.WithCancel(context.TODO())
// resources.
ctx := context.TODO()
s := &Store{ s := &Store{
schema: schema, schema: schema,
abandonCh: make(chan struct{}), abandonCh: make(chan struct{}),
@ -169,6 +170,7 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
db: db, db: db,
publisher: stream.NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second), publisher: stream.NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second),
}, },
stopEventPublisher: cancel,
} }
return s, nil return s, nil
} }
@ -241,6 +243,7 @@ func (s *Store) AbandonCh() <-chan struct{} {
// Abandon is used to signal that the given state store has been abandoned. // Abandon is used to signal that the given state store has been abandoned.
// Calling this more than one time will panic. // Calling this more than one time will panic.
func (s *Store) Abandon() { func (s *Store) Abandon() {
s.stopEventPublisher()
close(s.abandonCh) close(s.abandonCh)
} }