diff --git a/waku/v2/protocol/relay/broadcast.go b/waku/v2/protocol/relay/broadcast.go index a2911c5c..f79eec5b 100644 --- a/waku/v2/protocol/relay/broadcast.go +++ b/waku/v2/protocol/relay/broadcast.go @@ -3,7 +3,6 @@ package relay import ( "context" "errors" - "fmt" "sync" "sync/atomic" @@ -33,7 +32,7 @@ func (s *chStore) getNewCh(topic string, chLen int) Subscription { id := s.id s.topicToChans[topic][id] = ch return Subscription{ - // read only channel , will not block forever, return once closed. + // read only channel,will not block forever, returns once closed. Ch: ch, // Unsubscribe function is safe, can be called multiple times // and even after broadcaster has stopped running. @@ -51,18 +50,28 @@ func (s *chStore) getNewCh(topic string, chLen int) Subscription { } } -func (s *chStore) broadcast(m *protocol.Envelope) { +func (s *chStore) broadcast(ctx context.Context, m *protocol.Envelope) { s.mu.RLock() defer s.mu.RUnlock() - fmt.Println(m) - fmt.Println(s.topicToChans, "msg") for _, ch := range s.topicToChans[m.PubsubTopic()] { - fmt.Println(m.PubsubTopic()) - ch <- m + select { + // using ctx.Done for returning on cancellation is needed + // reason: + // if for a channel there is no one listening to it + // the broadcast will acquire lock and wait until there is a receiver on that channel. + // this will also block the chStore close function as it uses same mutex + case <-ctx.Done(): + return + case ch <- m: + } } // send to all registered subscribers for _, ch := range s.topicToChans[""] { - ch <- m + select { + case <-ctx.Done(): + return + case ch <- m: + } } } @@ -90,6 +99,7 @@ type Broadcaster interface { // panic safe, input can't be submitted to `input` channel after stop // lock safe, only read channels are returned and later closed, calling code has guarantee Register channel will not block forever. // no opened channel leaked, all created only read channels are closed when stop +// even if there is noone listening to returned channels, guarantees to be lockfree. type broadcaster struct { bufLen int cancel context.CancelFunc @@ -124,7 +134,7 @@ func (b *broadcaster) run(ctx context.Context) { return case msg, ok := <-b.input: if ok { - b.chStore.broadcast(msg) + b.chStore.broadcast(ctx, msg) } } } @@ -134,9 +144,10 @@ func (b *broadcaster) Stop() { if !b.running.CompareAndSwap(true, false) { // if running then stop return } + // cancel must be before chStore.close(), so that broadcast releases lock before chStore.close() acquires it. + b.cancel() // exit the run loop, b.chStore.close() // close all channels that we send to close(b.input) // close input channel - b.cancel() // exit the run loop } // returned subscription is all speicfied topic diff --git a/waku/v2/protocol/relay/broadcast_test.go b/waku/v2/protocol/relay/broadcast_test.go index 07b62efc..59bd26b9 100644 --- a/waku/v2/protocol/relay/broadcast_test.go +++ b/waku/v2/protocol/relay/broadcast_test.go @@ -90,3 +90,14 @@ func TestBroadcastUnregisterSub(t *testing.T) { b.Stop() // it automatically unregister/unsubscribe all require.Nil(t, <-specificSub.Ch) } + +func TestBroadcastNoOneListening(t *testing.T) { + b := NewBroadcaster(100) + require.NoError(t, b.Start(context.Background())) + _ = b.RegisterForAll() // no one listening on channel + // + env := protocol.NewEnvelope(&pb.WakuMessage{}, utils.GetUnixEpoch(), "abc") + b.Submit(env) + b.Submit(env) + b.Stop() +} diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index e6cb1483..4dcad0d7 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -185,8 +185,10 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro return nil, err } w.relaySubs[topic] = sub - w.wg.Add(1) - go w.subscribeToTopic(topic, sub) + if w.bcaster != nil { + w.wg.Add(1) + go w.subscribeToTopic(topic, sub) + } w.log.Info("subscribing to topic", zap.String("topic", sub.Topic())) }