diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 6af44f4a..23a02a76 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -41,10 +41,11 @@ type MessagePair struct { } type Subscription struct { - C chan *protocol.WakuMessage - closed bool - mutex sync.Mutex - quit chan struct{} + C chan *protocol.WakuMessage + closed bool + mutex sync.Mutex + pubSubscription *pubsub.Subscription + quit chan struct{} } type WakuNode struct { @@ -175,6 +176,7 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { subscription := new(Subscription) subscription.closed = false + subscription.pubSubscription = sub subscription.C = make(chan *protocol.WakuMessage) subscription.quit = make(chan struct{}) @@ -182,6 +184,8 @@ func (node *WakuNode) Subscribe(topic *Topic) (*Subscription, error) { for { select { case <-subscription.quit: + subscription.mutex.Lock() + defer subscription.mutex.Unlock() close(subscription.C) subscription.closed = true return @@ -215,6 +219,12 @@ func (subs *Subscription) Unsubscribe() { } } +func (subs *Subscription) IsClosed() bool { + subs.mutex.Lock() + defer subs.mutex.Unlock() + return subs.closed +} + func (node *WakuNode) upsertTopic(topic *Topic) (*pubsub.Topic, error) { defer node.topicsLock.Unlock() node.topicsLock.Lock()