mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-07 16:33:08 +00:00
fix: broadcaster shouldn't block if channels are not listened to
This commit is contained in:
parent
210597f7e0
commit
60edf95c48
@ -3,7 +3,6 @@ package relay
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
@ -33,7 +32,7 @@ func (s *chStore) getNewCh(topic string, chLen int) Subscription {
|
|||||||
id := s.id
|
id := s.id
|
||||||
s.topicToChans[topic][id] = ch
|
s.topicToChans[topic][id] = ch
|
||||||
return Subscription{
|
return Subscription{
|
||||||
// read only channel , will not block forever, return once closed.
|
// read only channel,will not block forever, returns once closed.
|
||||||
Ch: ch,
|
Ch: ch,
|
||||||
// Unsubscribe function is safe, can be called multiple times
|
// Unsubscribe function is safe, can be called multiple times
|
||||||
// and even after broadcaster has stopped running.
|
// and even after broadcaster has stopped running.
|
||||||
@ -51,18 +50,28 @@ func (s *chStore) getNewCh(topic string, chLen int) Subscription {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *chStore) broadcast(m *protocol.Envelope) {
|
func (s *chStore) broadcast(ctx context.Context, m *protocol.Envelope) {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
fmt.Println(m)
|
|
||||||
fmt.Println(s.topicToChans, "msg")
|
|
||||||
for _, ch := range s.topicToChans[m.PubsubTopic()] {
|
for _, ch := range s.topicToChans[m.PubsubTopic()] {
|
||||||
fmt.Println(m.PubsubTopic())
|
select {
|
||||||
ch <- m
|
// using ctx.Done for returning on cancellation is needed
|
||||||
|
// reason:
|
||||||
|
// if for a channel there is no one listening to it
|
||||||
|
// the broadcast will acquire lock and wait until there is a receiver on that channel.
|
||||||
|
// this will also block the chStore close function as it uses same mutex
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case ch <- m:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// send to all registered subscribers
|
// send to all registered subscribers
|
||||||
for _, ch := range s.topicToChans[""] {
|
for _, ch := range s.topicToChans[""] {
|
||||||
ch <- m
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case ch <- m:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,6 +99,7 @@ type Broadcaster interface {
|
|||||||
// panic safe, input can't be submitted to `input` channel after stop
|
// panic safe, input can't be submitted to `input` channel after stop
|
||||||
// lock safe, only read channels are returned and later closed, calling code has guarantee Register channel will not block forever.
|
// lock safe, only read channels are returned and later closed, calling code has guarantee Register channel will not block forever.
|
||||||
// no opened channel leaked, all created only read channels are closed when stop
|
// no opened channel leaked, all created only read channels are closed when stop
|
||||||
|
// even if there is noone listening to returned channels, guarantees to be lockfree.
|
||||||
type broadcaster struct {
|
type broadcaster struct {
|
||||||
bufLen int
|
bufLen int
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@ -124,7 +134,7 @@ func (b *broadcaster) run(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
case msg, ok := <-b.input:
|
case msg, ok := <-b.input:
|
||||||
if ok {
|
if ok {
|
||||||
b.chStore.broadcast(msg)
|
b.chStore.broadcast(ctx, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -134,9 +144,10 @@ func (b *broadcaster) Stop() {
|
|||||||
if !b.running.CompareAndSwap(true, false) { // if running then stop
|
if !b.running.CompareAndSwap(true, false) { // if running then stop
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// cancel must be before chStore.close(), so that broadcast releases lock before chStore.close() acquires it.
|
||||||
|
b.cancel() // exit the run loop,
|
||||||
b.chStore.close() // close all channels that we send to
|
b.chStore.close() // close all channels that we send to
|
||||||
close(b.input) // close input channel
|
close(b.input) // close input channel
|
||||||
b.cancel() // exit the run loop
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// returned subscription is all speicfied topic
|
// returned subscription is all speicfied topic
|
||||||
|
|||||||
@ -90,3 +90,14 @@ func TestBroadcastUnregisterSub(t *testing.T) {
|
|||||||
b.Stop() // it automatically unregister/unsubscribe all
|
b.Stop() // it automatically unregister/unsubscribe all
|
||||||
require.Nil(t, <-specificSub.Ch)
|
require.Nil(t, <-specificSub.Ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBroadcastNoOneListening(t *testing.T) {
|
||||||
|
b := NewBroadcaster(100)
|
||||||
|
require.NoError(t, b.Start(context.Background()))
|
||||||
|
_ = b.RegisterForAll() // no one listening on channel
|
||||||
|
//
|
||||||
|
env := protocol.NewEnvelope(&pb.WakuMessage{}, utils.GetUnixEpoch(), "abc")
|
||||||
|
b.Submit(env)
|
||||||
|
b.Submit(env)
|
||||||
|
b.Stop()
|
||||||
|
}
|
||||||
|
|||||||
@ -185,8 +185,10 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
w.relaySubs[topic] = sub
|
w.relaySubs[topic] = sub
|
||||||
w.wg.Add(1)
|
if w.bcaster != nil {
|
||||||
go w.subscribeToTopic(topic, sub)
|
w.wg.Add(1)
|
||||||
|
go w.subscribeToTopic(topic, sub)
|
||||||
|
}
|
||||||
w.log.Info("subscribing to topic", zap.String("topic", sub.Topic()))
|
w.log.Info("subscribing to topic", zap.String("topic", sub.Topic()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user