fix: lock subscription before closing it

This commit is contained in:
Richard Ramos 2021-03-15 17:43:26 -04:00
parent cac9aa7b37
commit 32cdc4cadd
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F

View File

@ -41,10 +41,11 @@ type MessagePair struct {
} }
type Subscription struct { type Subscription struct {
C chan *protocol.WakuMessage C chan *protocol.WakuMessage
closed bool closed bool
mutex sync.Mutex mutex sync.Mutex
quit chan struct{} pubSubscription *pubsub.Subscription
quit chan struct{}
} }
type WakuNode struct { type WakuNode struct {
@ -175,6 +176,7 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
subscription := new(Subscription) subscription := new(Subscription)
subscription.closed = false subscription.closed = false
subscription.pubSubscription = sub
subscription.C = make(chan *protocol.WakuMessage) subscription.C = make(chan *protocol.WakuMessage)
subscription.quit = make(chan struct{}) subscription.quit = make(chan struct{})
@ -182,6 +184,8 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) {
for { for {
select { select {
case <-subscription.quit: case <-subscription.quit:
subscription.mutex.Lock()
defer subscription.mutex.Unlock()
close(subscription.C) close(subscription.C)
subscription.closed = true subscription.closed = true
return return
@ -215,6 +219,12 @@ func (subs *Subscription) Unsubscribe() {
} }
} }
func (subs *Subscription) IsClosed() bool {
subs.mutex.Lock()
defer subs.mutex.Unlock()
return subs.closed
}
func (node *WakuNode) upsertTopic(topic *Topic) (*pubsub.Topic, error) { func (node *WakuNode) upsertTopic(topic *Topic) (*pubsub.Topic, error) {
defer node.topicsLock.Unlock() defer node.topicsLock.Unlock()
node.topicsLock.Lock() node.topicsLock.Lock()