diff --git a/waku/v2/broadcast.go b/waku/v2/broadcast.go index 88d99efb..f734aafa 100644 --- a/waku/v2/broadcast.go +++ b/waku/v2/broadcast.go @@ -7,10 +7,17 @@ import ( // Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 // by Dustin Sallings (c) 2013, which was released under MIT license +type doneCh chan struct{} + +type chOperation struct { + ch chan<- *protocol.Envelope + done doneCh +} + type broadcaster struct { input chan *protocol.Envelope - reg chan chan<- *protocol.Envelope - unreg chan chan<- *protocol.Envelope + reg chan chOperation + unreg chan chOperation outputs map[chan<- *protocol.Envelope]bool } @@ -20,8 +27,12 @@ type broadcaster struct { type Broadcaster interface { // Register a new channel to receive broadcasts Register(chan<- *protocol.Envelope) + // Register a new channel to receive broadcasts and return a channel to wait until this operation is complete + WaitRegister(newch chan<- *protocol.Envelope) doneCh // Unregister a channel so that it no longer receives broadcasts. Unregister(chan<- *protocol.Envelope) + // Unregister a subscriptor channel and return a channel to wait until this operation is done + WaitUnregister(newch chan<- *protocol.Envelope) doneCh // Shut this broadcaster down. Close() // Submit a new object to all subscribers @@ -39,14 +50,23 @@ func (b *broadcaster) run() { select { case m := <-b.input: b.broadcast(m) - case ch, ok := <-b.reg: + case broadcastee, ok := <-b.reg: if ok { - b.outputs[ch] = true + b.outputs[broadcastee.ch] = true + if broadcastee.done != nil { + broadcastee.done <- struct{}{} + } } else { + if broadcastee.done != nil { + broadcastee.done <- struct{}{} + } return } - case ch := <-b.unreg: - delete(b.outputs, ch) + case broadcastee := <-b.unreg: + delete(b.outputs, broadcastee.ch) + if broadcastee.done != nil { + broadcastee.done <- struct{}{} + } } } } @@ -57,8 +77,8 @@ func (b *broadcaster) run() { func NewBroadcaster(buflen int) Broadcaster { b := &broadcaster{ input: make(chan *protocol.Envelope, buflen), - reg: make(chan chan<- *protocol.Envelope), - unreg: make(chan chan<- *protocol.Envelope), + reg: make(chan chOperation), + unreg: make(chan chOperation), outputs: make(map[chan<- *protocol.Envelope]bool), } @@ -67,14 +87,40 @@ func NewBroadcaster(buflen int) Broadcaster { return b } +// Register a subscriptor channel and return a channel to wait until this operation is done +func (b *broadcaster) WaitRegister(newch chan<- *protocol.Envelope) doneCh { + d := make(doneCh) + b.reg <- chOperation{ + ch: newch, + done: d, + } + return d +} + // Register a subscriptor channel func (b *broadcaster) Register(newch chan<- *protocol.Envelope) { - b.reg <- newch + b.reg <- chOperation{ + ch: newch, + done: nil, + } +} + +// Unregister a subscriptor channel and return a channel to wait until this operation is done +func (b *broadcaster) WaitUnregister(newch chan<- *protocol.Envelope) doneCh { + d := make(doneCh) + b.unreg <- chOperation{ + ch: newch, + done: d, + } + return d } // Unregister a subscriptor channel func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) { - b.unreg <- newch + b.unreg <- chOperation{ + ch: newch, + done: nil, + } } // Closes the broadcaster. Used to stop receiving new subscribers diff --git a/waku/v2/broadcast_test.go b/waku/v2/broadcast_test.go index aba0139e..05ef69aa 100644 --- a/waku/v2/broadcast_test.go +++ b/waku/v2/broadcast_test.go @@ -37,6 +37,33 @@ func TestBroadcast(t *testing.T) { wg.Wait() } +func TestBroadcastWait(t *testing.T) { + wg := sync.WaitGroup{} + + b := NewBroadcaster(100) + defer b.Close() + + for i := 0; i < 5; i++ { + wg.Add(1) + + cch := make(chan *protocol.Envelope) + <-b.WaitRegister(cch) + + go func() { + defer wg.Done() + + <-cch + <-b.WaitUnregister(cch) + }() + + } + + env := new(protocol.Envelope) + b.Submit(env) + + wg.Wait() +} + func TestBroadcastCleanup(t *testing.T) { b := NewBroadcaster(100) b.Register(make(chan *protocol.Envelope)) diff --git a/waku/v2/protocol/relay/subscription.go b/waku/v2/protocol/relay/subscription.go index 4adc0a83..b5b310de 100644 --- a/waku/v2/protocol/relay/subscription.go +++ b/waku/v2/protocol/relay/subscription.go @@ -8,6 +8,8 @@ import ( // Subscription handles the subscrition to a particular pubsub topic type Subscription struct { + sync.RWMutex + // C is channel used for receiving envelopes C chan *protocol.Envelope @@ -19,14 +21,14 @@ type Subscription struct { // Unsubscribe will close a subscription from a pubsub topic. Will close the message channel func (subs *Subscription) Unsubscribe() { subs.once.Do(func() { - subs.closed = true close(subs.quit) - close(subs.C) - }) + } // IsClosed determine whether a Subscription is still open for receiving messages func (subs *Subscription) IsClosed() bool { + subs.RLock() + defer subs.RUnlock() return subs.closed } diff --git a/waku/v2/protocol/relay/subscription_test.go b/waku/v2/protocol/relay/subscription_test.go deleted file mode 100644 index 3abbdfae..00000000 --- a/waku/v2/protocol/relay/subscription_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package relay - -import ( - "testing" - - waku_proto "github.com/status-im/go-waku/waku/v2/protocol" - "github.com/stretchr/testify/require" -) - -func TestSubscription(t *testing.T) { - e := Subscription{ - closed: false, - C: make(chan *waku_proto.Envelope, 10), - quit: make(chan struct{}), - } - e.Unsubscribe() - require.True(t, e.closed) -} diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index abd09b2f..377c112d 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -300,9 +300,20 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub * for { select { case <-subscription.quit: - if w.bcaster != nil { - w.bcaster.Unregister(subscription.C) // Remove from broadcast list - } + func() { + subscription.Lock() + defer subscription.Unlock() + + if subscription.closed { + return + } + subscription.closed = true + if w.bcaster != nil { + <-w.bcaster.WaitUnregister(subscription.C) // Remove from broadcast list + } + + close(subscription.C) + }() // TODO: if there are no more relay subscriptions, close the pubsub subscription case msg := <-subChannel: if msg == nil {