diff --git a/mobile/api_relay.go b/mobile/api_relay.go index 10c74e0e..6ae67dda 100644 --- a/mobile/api_relay.go +++ b/mobile/api_relay.go @@ -98,7 +98,7 @@ func relaySubscribe(topic string) error { relaySubscriptions[topicToSubscribe] = subscription go func(subscription *relay.Subscription) { - for envelope := range subscription.C { + for envelope := range subscription.Ch { send("message", toSubscriptionMessage(envelope)) } }(subscription) diff --git a/tests/connection_test.go b/tests/connection_test.go index 5928396e..77f6e4e7 100644 --- a/tests/connection_test.go +++ b/tests/connection_test.go @@ -42,7 +42,7 @@ func TestBasicSendingReceiving(t *testing.T) { sub, err := wakuNode.Relay().Subscribe(ctx) require.NoError(t, err) - value := <-sub.C + value := <-sub.Ch payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None}) require.NoError(t, err) diff --git a/waku/node.go b/waku/node.go index 6861f184..b2eed7aa 100644 --- a/waku/node.go +++ b/waku/node.go @@ -298,7 +298,7 @@ func Execute(options Options) { nodeTopic := nodeTopic sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) failOnErr(err, "Error subscring to topic") - wakuNode.Broadcaster().Unregister(&nodeTopic, sub.C) + sub.Unsubscribe() } for _, protectedTopic := range options.Relay.ProtectedTopics { diff --git a/waku/v2/broadcast.go b/waku/v2/broadcast.go deleted file mode 100644 index 0c8c12f3..00000000 --- a/waku/v2/broadcast.go +++ /dev/null @@ -1,213 +0,0 @@ -package v2 - -import ( - "context" - "errors" - - "github.com/waku-org/go-waku/waku/v2/protocol" -) - -// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 -// by Dustin Sallings (c) 2013, which was released under MIT license - -type doneCh chan struct{} - -type chOperation struct { - ch chan<- *protocol.Envelope - topic *string - done doneCh -} - -type broadcastOutputs map[chan<- *protocol.Envelope]struct{} - -type broadcaster struct { - bufLen int - cancel context.CancelFunc - - input chan *protocol.Envelope - reg chan chOperation - unreg chan chOperation - - outputs broadcastOutputs - outputsPerTopic map[string]broadcastOutputs -} - -// The Broadcaster interface describes the main entry points to -// broadcasters. -type Broadcaster interface { - // Register a new channel to receive broadcasts from a pubsubtopic - Register(topic *string, newch chan<- *protocol.Envelope) - // Register a new channel to receive broadcasts from a pubsub topic and return a channel to wait until this operation is complete - WaitRegister(topic *string, newch chan<- *protocol.Envelope) doneCh - // Unregister a channel so that it no longer receives broadcasts from a pubsub topic - Unregister(topic *string, newch chan<- *protocol.Envelope) - // Unregister a subscriptor channel and return a channel to wait until this operation is done - WaitUnregister(topic *string, newch chan<- *protocol.Envelope) doneCh - // Start - Start(ctx context.Context) error - // Shut this broadcaster down. - Stop() - // Submit a new object to all subscribers - Submit(*protocol.Envelope) -} - -func (b *broadcaster) broadcast(m *protocol.Envelope) { - for ch := range b.outputs { - ch <- m - } - - outputs, ok := b.outputsPerTopic[m.PubsubTopic()] - if !ok { - return - } - - for ch := range outputs { - ch <- m - } -} - -func (b *broadcaster) run(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case m, ok := <-b.input: - if ok { - b.broadcast(m) - } - case broadcastee, ok := <-b.reg: - if ok { - if broadcastee.topic != nil { - topicOutputs, ok := b.outputsPerTopic[*broadcastee.topic] - if !ok { - b.outputsPerTopic[*broadcastee.topic] = make(broadcastOutputs) - topicOutputs = b.outputsPerTopic[*broadcastee.topic] - } - - topicOutputs[broadcastee.ch] = struct{}{} - b.outputsPerTopic[*broadcastee.topic] = topicOutputs - } else { - b.outputs[broadcastee.ch] = struct{}{} - } - if broadcastee.done != nil { - broadcastee.done <- struct{}{} - } - } else { - if broadcastee.done != nil { - broadcastee.done <- struct{}{} - } - return - } - case broadcastee := <-b.unreg: - if broadcastee.topic != nil { - topicOutputs, ok := b.outputsPerTopic[*broadcastee.topic] - if !ok { - continue - } - delete(topicOutputs, broadcastee.ch) - b.outputsPerTopic[*broadcastee.topic] = topicOutputs - } else { - delete(b.outputs, broadcastee.ch) - } - - if broadcastee.done != nil { - broadcastee.done <- struct{}{} - } - } - } -} - -// NewBroadcaster creates a Broadcaster with an specified length -// It's used to register subscriptors that will need to receive -// an Envelope containing a WakuMessage -func NewBroadcaster(buflen int) Broadcaster { - return &broadcaster{ - bufLen: buflen, - } -} - -func (b *broadcaster) Start(ctx context.Context) error { - if b.cancel != nil { - return errors.New("already started") - } - - ctx, cancel := context.WithCancel(ctx) - - b.cancel = cancel - b.input = make(chan *protocol.Envelope, b.bufLen) - b.reg = make(chan chOperation) - b.unreg = make(chan chOperation) - b.outputs = make(broadcastOutputs) - b.outputsPerTopic = make(map[string]broadcastOutputs) - - go b.run(ctx) - - return nil -} - -func (b *broadcaster) Stop() { - if b.cancel != nil { - return - } - - b.cancel() - - close(b.input) - close(b.reg) - close(b.unreg) - b.outputs = nil - b.outputsPerTopic = nil - b.cancel = nil -} - -// Register a subscriptor channel and return a channel to wait until this operation is done -func (b *broadcaster) WaitRegister(topic *string, newch chan<- *protocol.Envelope) doneCh { - d := make(doneCh) - b.reg <- chOperation{ - ch: newch, - topic: topic, - done: d, - } - return d -} - -// Register a subscriptor channel -func (b *broadcaster) Register(topic *string, newch chan<- *protocol.Envelope) { - b.reg <- chOperation{ - ch: newch, - topic: topic, - done: nil, - } -} - -// Unregister a subscriptor channel and return a channel to wait until this operation is done -func (b *broadcaster) WaitUnregister(topic *string, newch chan<- *protocol.Envelope) doneCh { - d := make(doneCh) - b.unreg <- chOperation{ - ch: newch, - topic: topic, - done: d, - } - return d -} - -// Unregister a subscriptor channel -func (b *broadcaster) Unregister(topic *string, newch chan<- *protocol.Envelope) { - b.unreg <- chOperation{ - ch: newch, - topic: topic, - done: nil, - } -} - -// Closes the broadcaster. Used to stop receiving new subscribers -func (b *broadcaster) Close() { - close(b.reg) -} - -// Submits an Envelope to be broadcasted among all registered subscriber channels -func (b *broadcaster) Submit(m *protocol.Envelope) { - if b != nil { - b.input <- m - } -} diff --git a/waku/v2/node/service.go b/waku/v2/node/service.go index dd66e860..12376820 100644 --- a/waku/v2/node/service.go +++ b/waku/v2/node/service.go @@ -5,18 +5,19 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" ) type Service interface { SetHost(h host.Host) - Start(ctx context.Context) error + Start(context.Context) error Stop() } type ReceptorService interface { - Service - MessageChannel() chan *protocol.Envelope + SetHost(h host.Host) + Stop() + Start(context.Context, relay.Subscription) error } type PeerConnectorService interface { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index fb14f323..c86672ce 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -93,7 +93,7 @@ type WakuNode struct { localNode *enode.LocalNode - bcaster v2.Broadcaster + bcaster relay.Broadcaster connectionNotif ConnectionNotifier protocolEventSub event.Subscription @@ -171,7 +171,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { var err error w := new(WakuNode) - w.bcaster = v2.NewBroadcaster(1024) + w.bcaster = relay.NewBroadcaster(1024) w.opts = params w.log = params.logger.Named("node2") w.wg = &sync.WaitGroup{} @@ -223,7 +223,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log) w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...) - w.filterFullnode = filter.NewWakuFilterFullnode(w.bcaster, w.timesource, w.log, w.opts.filterOpts...) + w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...) w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.timesource, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.log) @@ -345,21 +345,19 @@ func (w *WakuNode) Start(ctx context.Context) error { if err != nil { return err } - - w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C) + sub.Unsubscribe() } } w.store = w.storeFactory(w) w.store.SetHost(host) if w.opts.enableStore { - err := w.startStore(ctx) + sub := w.bcaster.RegisterForAll() + err := w.startStore(ctx, sub) if err != nil { return err } - w.log.Info("Subscribing store to broadcaster") - w.bcaster.Register(nil, w.store.MessageChannel()) } w.lightPush.SetHost(host) @@ -371,24 +369,23 @@ func (w *WakuNode) Start(ctx context.Context) error { w.legacyFilter.SetHost(host) if w.opts.enableLegacyFilter { - err := w.legacyFilter.Start(ctx) + sub := w.bcaster.RegisterForAll() + err := w.legacyFilter.Start(ctx, sub) if err != nil { return err } - w.log.Info("Subscribing filter to broadcaster") - w.bcaster.Register(nil, w.legacyFilter.MessageChannel()) } w.filterFullnode.SetHost(host) if w.opts.enableFilterFullNode { - err := w.filterFullnode.Start(ctx) + sub := w.bcaster.RegisterForAll() + err := w.filterFullnode.Start(ctx, sub) if err != nil { return err } - w.log.Info("Subscribing filterV2 to broadcaster") - w.bcaster.Register(nil, w.filterFullnode.MessageChannel()) + } w.filterLightnode.SetHost(host) @@ -593,7 +590,7 @@ func (w *WakuNode) PeerExchange() *peer_exchange.WakuPeerExchange { // Broadcaster is used to access the message broadcaster that is used to push // messages to different protocols -func (w *WakuNode) Broadcaster() v2.Broadcaster { +func (w *WakuNode) Broadcaster() relay.Broadcaster { return w.bcaster } @@ -642,8 +639,8 @@ func (w *WakuNode) mountDiscV5() error { return err } -func (w *WakuNode) startStore(ctx context.Context) error { - err := w.store.Start(ctx) +func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error { + err := w.store.Start(ctx, sub) if err != nil { w.log.Error("starting store", zap.Error(err)) return err diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index d8a09610..e2204c65 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -116,7 +116,7 @@ func Test500(t *testing.T) { select { case <-ticker.C: require.Fail(t, "Timeout Sub1") - case msg := <-sub1.C: + case msg := <-sub1.Ch: if msg == nil { return } @@ -137,7 +137,7 @@ func Test500(t *testing.T) { select { case <-ticker.C: require.Fail(t, "Timeout Sub2") - case msg := <-sub2.C: + case msg := <-sub2.Ch: if msg == nil { return } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index d49b5b9d..d85d8d1e 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -15,11 +15,11 @@ import ( libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-msgio/pbio" "github.com/waku-org/go-waku/logging" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "go.opencensus.io/tag" "go.uber.org/zap" @@ -37,7 +37,7 @@ type WakuFilterLightnode struct { cancel context.CancelFunc ctx context.Context h host.Host - broadcaster v2.Broadcaster + broadcaster relay.Broadcaster timesource timesource.Timesource wg *sync.WaitGroup log *zap.Logger @@ -55,7 +55,7 @@ type WakuFilterPushResult struct { } // NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterLightnode(broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode { +func NewWakuFilterLightnode(broadcaster relay.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode { wf := new(WakuFilterLightnode) wf.log = log.Named("filterv2-lightnode") wf.broadcaster = broadcaster diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index a63817ca..4204502e 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -12,13 +12,12 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) -func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { +func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -43,7 +42,7 @@ func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - b := v2.NewBroadcaster(10) + b := relay.NewBroadcaster(10) require.NoError(t, b.Start(context.Background())) filterPush := NewWakuFilterLightnode(b, timesource.NewDefaultClock(), utils.Logger()) filterPush.SetHost(host) @@ -73,19 +72,18 @@ func TestWakuFilter(t *testing.T) { node1, host1 := makeWakuFilterLightNode(t) defer node1.Stop() - broadcaster := v2.NewBroadcaster(10) + broadcaster := relay.NewBroadcaster(10) require.NoError(t, broadcaster.Start(context.Background())) node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) defer node2.Stop() defer sub2.Unsubscribe() - node2Filter := NewWakuFilterFullnode(broadcaster, timesource.NewDefaultClock(), utils.Logger()) + node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger()) node2Filter.SetHost(host2) - err := node2Filter.Start(ctx) + sub := broadcaster.Register(testTopic) + err := node2Filter.Start(ctx, sub) require.NoError(t, err) - broadcaster.Register(&testTopic, node2Filter.MessageChannel()) - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) err = host1.Peerstore().AddProtocols(host2.ID(), FilterSubscribeID_v20beta1) require.NoError(t, err) @@ -163,15 +161,15 @@ func TestSubscriptionPing(t *testing.T) { node1, host1 := makeWakuFilterLightNode(t) defer node1.Stop() - broadcaster := v2.NewBroadcaster(10) + broadcaster := relay.NewBroadcaster(10) require.NoError(t, broadcaster.Start(context.Background())) node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) defer node2.Stop() defer sub2.Unsubscribe() - node2Filter := NewWakuFilterFullnode(broadcaster, timesource.NewDefaultClock(), utils.Logger()) + node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger()) node2Filter.SetHost(host2) - err := node2Filter.Start(ctx) + err := node2Filter.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) @@ -204,21 +202,20 @@ func TestWakuFilterPeerFailure(t *testing.T) { node1, host1 := makeWakuFilterLightNode(t) - broadcaster := v2.NewBroadcaster(10) + broadcaster := relay.NewBroadcaster(10) require.NoError(t, broadcaster.Start(context.Background())) node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) defer node2.Stop() defer sub2.Unsubscribe() - broadcaster2 := v2.NewBroadcaster(10) + broadcaster2 := relay.NewBroadcaster(10) require.NoError(t, broadcaster2.Start(context.Background())) - node2Filter := NewWakuFilterFullnode(broadcaster2, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second)) + node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second)) node2Filter.SetHost(host2) - err := node2Filter.Start(ctx) + sub := broadcaster.Register(testTopic) + err := node2Filter.Start(ctx, sub) require.NoError(t, err) - broadcaster.Register(&testTopic, node2Filter.MessageChannel()) - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) err = host1.Peerstore().AddProtocols(host2.ID(), FilterPushID_v20beta1) require.NoError(t, err) diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 126dd159..9181f3b2 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -15,10 +15,10 @@ import ( libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-msgio/pbio" "github.com/waku-org/go-waku/logging" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -35,7 +35,7 @@ type ( WakuFilterFullNode struct { cancel context.CancelFunc h host.Host - msgC chan *protocol.Envelope + msgSub relay.Subscription wg *sync.WaitGroup log *zap.Logger @@ -46,7 +46,7 @@ type ( ) // NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterFullnode(broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode { +func NewWakuFilterFullnode(timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode { wf := new(WakuFilterFullNode) wf.log = log.Named("filterv2-fullnode") @@ -69,7 +69,7 @@ func (wf *WakuFilterFullNode) SetHost(h host.Host) { wf.h = h } -func (wf *WakuFilterFullNode) Start(ctx context.Context) error { +func (wf *WakuFilterFullNode) Start(ctx context.Context, sub relay.Subscription) error { wf.wg.Wait() // Wait for any goroutines to stop ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) @@ -83,8 +83,7 @@ func (wf *WakuFilterFullNode) Start(ctx context.Context) error { wf.h.SetStreamHandlerMatch(FilterSubscribeID_v20beta1, protocol.PrefixTextMatch(string(FilterSubscribeID_v20beta1)), wf.onRequest(ctx)) wf.cancel = cancel - wf.msgC = make(chan *protocol.Envelope, 1024) - + wf.msgSub = sub wf.wg.Add(1) go wf.filterListener(ctx) @@ -268,7 +267,7 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) { return nil } - for m := range wf.msgC { + for m := range wf.msgSub.Ch { if err := handle(m); err != nil { wf.log.Error("handling message", zap.Error(err)) } @@ -339,11 +338,7 @@ func (wf *WakuFilterFullNode) Stop() { wf.cancel() - close(wf.msgC) + wf.msgSub.Unsubscribe() wf.wg.Wait() } - -func (wf *WakuFilterFullNode) MessageChannel() chan *protocol.Envelope { - return wf.msgC -} diff --git a/waku/v2/protocol/legacy_filter/filter_map.go b/waku/v2/protocol/legacy_filter/filter_map.go index b254ab4a..019be8e2 100644 --- a/waku/v2/protocol/legacy_filter/filter_map.go +++ b/waku/v2/protocol/legacy_filter/filter_map.go @@ -3,9 +3,9 @@ package legacy_filter import ( "sync" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" ) @@ -13,7 +13,7 @@ type FilterMap struct { sync.RWMutex timesource timesource.Timesource items map[string]Filter - broadcaster v2.Broadcaster + broadcaster relay.Broadcaster } type FilterMapItem struct { @@ -21,7 +21,7 @@ type FilterMapItem struct { Value Filter } -func NewFilterMap(broadcaster v2.Broadcaster, timesource timesource.Timesource) *FilterMap { +func NewFilterMap(broadcaster relay.Broadcaster, timesource timesource.Timesource) *FilterMap { return &FilterMap{ timesource: timesource, items: make(map[string]Filter), diff --git a/waku/v2/protocol/legacy_filter/filter_map_test.go b/waku/v2/protocol/legacy_filter/filter_map_test.go index d369ab8a..1a38c8f7 100644 --- a/waku/v2/protocol/legacy_filter/filter_map_test.go +++ b/waku/v2/protocol/legacy_filter/filter_map_test.go @@ -5,13 +5,13 @@ import ( "testing" "github.com/stretchr/testify/require" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" ) func TestFilterMap(t *testing.T) { - b := v2.NewBroadcaster(100) + b := relay.NewBroadcaster(100) require.NoError(t, b.Start(context.Background())) fmap := NewFilterMap(b, timesource.NewDefaultClock()) diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 1d70caa1..07537dbc 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -13,7 +13,6 @@ import ( libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-msgio/pbio" "github.com/waku-org/go-waku/logging" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" @@ -53,7 +52,7 @@ type ( cancel context.CancelFunc h host.Host isFullNode bool - msgC chan *protocol.Envelope + msgSub relay.Subscription wg *sync.WaitGroup log *zap.Logger @@ -66,7 +65,7 @@ type ( const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") // NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilter(broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilter { +func NewWakuFilter(broadcaster relay.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilter { wf := new(WakuFilter) wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode)) @@ -90,7 +89,7 @@ func (wf *WakuFilter) SetHost(h host.Host) { wf.h = h } -func (wf *WakuFilter) Start(ctx context.Context) error { +func (wf *WakuFilter) Start(ctx context.Context, sub relay.Subscription) error { wf.wg.Wait() // Wait for any goroutines to stop ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) @@ -104,7 +103,7 @@ func (wf *WakuFilter) Start(ctx context.Context) error { wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest(ctx)) wf.cancel = cancel - wf.msgC = make(chan *protocol.Envelope, 1024) + wf.msgSub = sub wf.wg.Add(1) go wf.filterListener(ctx) @@ -239,7 +238,7 @@ func (wf *WakuFilter) filterListener(ctx context.Context) { return g.Wait() } - for m := range wf.msgC { + for m := range wf.msgSub.Ch { if err := handle(m); err != nil { wf.log.Error("handling message", zap.Error(err)) } @@ -362,7 +361,7 @@ func (wf *WakuFilter) Stop() { wf.cancel() - close(wf.msgC) + wf.msgSub.Unsubscribe() wf.h.RemoveStreamHandler(FilterID_v20beta1) wf.filters.RemoveAll() @@ -480,7 +479,3 @@ func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) e return nil } - -func (wf *WakuFilter) MessageChannel() chan *protocol.Envelope { - return wf.msgC -} diff --git a/waku/v2/protocol/legacy_filter/waku_filter_test.go b/waku/v2/protocol/legacy_filter/waku_filter_test.go index 8bc11792..3d488a9b 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_test.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_test.go @@ -11,13 +11,12 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) -func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { +func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -42,11 +41,11 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - b := v2.NewBroadcaster(10) + b := relay.NewBroadcaster(10) require.NoError(t, b.Start(context.Background())) filter := NewWakuFilter(b, false, timesource.NewDefaultClock(), utils.Logger()) filter.SetHost(host) - err = filter.Start(context.Background()) + err = filter.Start(context.Background(), relay.NoopSubscription()) require.NoError(t, err) return filter, host @@ -72,7 +71,7 @@ func TestWakuFilter(t *testing.T) { node1, host1 := makeWakuFilter(t) defer node1.Stop() - broadcaster := v2.NewBroadcaster(10) + broadcaster := relay.NewBroadcaster(10) require.NoError(t, broadcaster.Start(context.Background())) node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) defer node2.Stop() @@ -80,11 +79,10 @@ func TestWakuFilter(t *testing.T) { node2Filter := NewWakuFilter(broadcaster, true, timesource.NewDefaultClock(), utils.Logger()) node2Filter.SetHost(host2) - err := node2Filter.Start(ctx) + sub := broadcaster.Register(testTopic) + err := node2Filter.Start(ctx, sub) require.NoError(t, err) - broadcaster.Register(&testTopic, node2Filter.MessageChannel()) - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) err = host1.Peerstore().AddProtocols(host2.ID(), FilterID_v20beta1) require.NoError(t, err) @@ -162,21 +160,20 @@ func TestWakuFilterPeerFailure(t *testing.T) { node1, host1 := makeWakuFilter(t) - broadcaster := v2.NewBroadcaster(10) + broadcaster := relay.NewBroadcaster(10) require.NoError(t, broadcaster.Start(context.Background())) node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) defer node2.Stop() defer sub2.Unsubscribe() - broadcaster2 := v2.NewBroadcaster(10) + broadcaster2 := relay.NewBroadcaster(10) require.NoError(t, broadcaster2.Start(context.Background())) node2Filter := NewWakuFilter(broadcaster2, true, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(3*time.Second)) node2Filter.SetHost(host2) - err := node2Filter.Start(ctx) + sub := broadcaster.Register(testTopic) + err := node2Filter.Start(ctx, sub) require.NoError(t, err) - broadcaster.Register(&testTopic, node2Filter.MessageChannel()) - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) err = host1.Peerstore().AddProtocols(host2.ID(), FilterID_v20beta1) require.NoError(t, err) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 6ff6e7d2..3b0390a3 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -11,7 +11,6 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -26,7 +25,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - b := v2.NewBroadcaster(10) + b := relay.NewBroadcaster(10) require.NoError(t, b.Start(context.Background())) relay := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger()) relay.SetHost(host) @@ -101,15 +100,15 @@ func TestWakuLightPush(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - <-sub1.C - <-sub1.C + <-sub1.Ch + <-sub1.Ch }() wg.Add(1) go func() { defer wg.Done() - <-sub2.C - <-sub2.C + <-sub2.Ch + <-sub2.Ch }() // Verifying successful request diff --git a/waku/v2/protocol/noise/pairing_relay_messenger.go b/waku/v2/protocol/noise/pairing_relay_messenger.go index 281ce445..e5554d50 100644 --- a/waku/v2/protocol/noise/pairing_relay_messenger.go +++ b/waku/v2/protocol/noise/pairing_relay_messenger.go @@ -4,8 +4,6 @@ import ( "context" n "github.com/waku-org/go-noise" - v2 "github.com/waku-org/go-waku/waku/v2" - "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -18,15 +16,15 @@ type NoiseMessenger interface { } type contentTopicSubscription struct { - envChan chan *protocol.Envelope - msgChan chan *pb.WakuMessage + broadcastSub relay.Subscription + msgChan chan *pb.WakuMessage } type NoiseWakuRelay struct { NoiseMessenger relay *relay.WakuRelay relaySub *relay.Subscription - broadcaster v2.Broadcaster + broadcaster relay.Broadcaster cancel context.CancelFunc timesource timesource.Timesource pubsubTopic string @@ -53,7 +51,7 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic relaySub: subs, cancel: cancel, timesource: timesource, - broadcaster: v2.NewBroadcaster(1024), + broadcaster: relay.NewBroadcaster(1024), pubsubTopic: topic, subscriptionChPerContentTopic: make(map[string][]contentTopicSubscription), } @@ -70,7 +68,7 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic subs.Unsubscribe() wr.broadcaster.Stop() return - case envelope := <-subs.C: + case envelope := <-subs.Ch: if envelope != nil { wr.broadcaster.Submit(envelope) } @@ -83,11 +81,11 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic func (r *NoiseWakuRelay) Subscribe(ctx context.Context, contentTopic string) <-chan *pb.WakuMessage { sub := contentTopicSubscription{ - envChan: make(chan *protocol.Envelope, 1024), msgChan: make(chan *pb.WakuMessage, 1024), } - r.broadcaster.Register(&r.pubsubTopic, sub.envChan) + broadcastSub := r.broadcaster.RegisterForAll(1024) + sub.broadcastSub = broadcastSub subscriptionCh := r.subscriptionChPerContentTopic[contentTopic] subscriptionCh = append(subscriptionCh, sub) @@ -98,7 +96,7 @@ func (r *NoiseWakuRelay) Subscribe(ctx context.Context, contentTopic string) <-c select { case <-ctx.Done(): return - case env := <-sub.envChan: + case env := <-sub.broadcastSub.Ch: if env == nil { return } @@ -138,7 +136,7 @@ func (r *NoiseWakuRelay) Stop() { r.cancel() for _, contentTopicSubscriptions := range r.subscriptionChPerContentTopic { for _, c := range contentTopicSubscriptions { - close(c.envChan) + c.broadcastSub.Unsubscribe() close(c.msgChan) } } diff --git a/waku/v2/protocol/noise/pairing_test.go b/waku/v2/protocol/noise/pairing_test.go index 506f9f46..d6692aca 100644 --- a/waku/v2/protocol/noise/pairing_test.go +++ b/waku/v2/protocol/noise/pairing_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" n "github.com/waku-org/go-noise" "github.com/waku-org/go-waku/tests" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" @@ -26,7 +25,7 @@ func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - b := v2.NewBroadcaster(1024) + b := relay.NewBroadcaster(1024) require.NoError(t, b.Start(context.Background())) relay := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger()) relay.SetHost(host) diff --git a/waku/v2/protocol/relay/broadcast.go b/waku/v2/protocol/relay/broadcast.go new file mode 100644 index 00000000..38fe27e8 --- /dev/null +++ b/waku/v2/protocol/relay/broadcast.go @@ -0,0 +1,160 @@ +package relay + +import ( + "context" + "errors" + "sync" + "sync/atomic" + + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +type chStore struct { + mu sync.RWMutex + topicToChans map[string]map[int]chan *protocol.Envelope + id int +} + +func newChStore() chStore { + return chStore{ + topicToChans: make(map[string]map[int]chan *protocol.Envelope), + } +} +func (s *chStore) getNewCh(topic string, chLen int) Subscription { + ch := make(chan *protocol.Envelope, chLen) + s.mu.Lock() + defer s.mu.Unlock() + s.id++ + // + if s.topicToChans[topic] == nil { + s.topicToChans[topic] = make(map[int]chan *protocol.Envelope) + } + id := s.id + s.topicToChans[topic][id] = ch + return Subscription{ + // read only channel , will not block forever, return once closed. + Ch: ch, + // Unsubscribe function is safe, can be called multiple times + // and even after broadcaster has stopped running. + Unsubscribe: func() { + s.mu.Lock() + defer s.mu.Unlock() + if s.topicToChans[topic] == nil { + return + } + if ch := s.topicToChans[topic][id]; ch != nil { + close(ch) + delete(s.topicToChans[topic], id) + } + }, + } +} + +func (s *chStore) broadcast(m *protocol.Envelope) { + s.mu.RLock() + defer s.mu.RUnlock() + for _, ch := range s.topicToChans[m.PubsubTopic()] { + ch <- m + } + // send to all registered subscribers + for _, ch := range s.topicToChans[""] { + ch <- m + } +} + +func (b *chStore) close() { + b.mu.Lock() + defer b.mu.Unlock() + for _, chans := range b.topicToChans { + for _, ch := range chans { + close(ch) + } + } + b.topicToChans = nil +} + +type Broadcaster interface { + Start(ctx context.Context) error + Stop() + Register(topic string, chLen ...int) Subscription + RegisterForAll(chLen ...int) Subscription + Submit(*protocol.Envelope) +} + +// //// +// thread safe +// panic safe, input can't be submitted to `input` channel after stop +// lock safe, only read channels are returned and later closed, calling code has guarantee Register channel will not block forever. +// no opened channel leaked, all created only read channels are closed when stop +type broadcaster struct { + bufLen int + cancel context.CancelFunc + input chan *protocol.Envelope + // + chStore chStore + running atomic.Bool +} + +func NewBroadcaster(bufLen int) *broadcaster { + return &broadcaster{ + bufLen: bufLen, + } +} + +func (b *broadcaster) Start(ctx context.Context) error { + if !b.running.CompareAndSwap(false, true) { // if not running then start + return errors.New("already started") + } + ctx, cancel := context.WithCancel(ctx) + b.cancel = cancel + b.chStore = newChStore() + b.input = make(chan *protocol.Envelope, b.bufLen) + go b.run(ctx) + return nil +} + +func (b *broadcaster) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case msg := <-b.input: + b.chStore.broadcast(msg) + } + } +} + +func (b *broadcaster) Stop() { + if !b.running.CompareAndSwap(true, false) { // if running then stop + return + } + b.chStore.close() // close all channels that we send to + close(b.input) // close input channel + b.cancel() // exit the run loop +} + +// returned subscription is all speicfied topic +func (b *broadcaster) Register(topic string, chLen ...int) Subscription { + return b.chStore.getNewCh(topic, getChLen(chLen)) +} + +// return subscription is for all topic +func (b *broadcaster) RegisterForAll(chLen ...int) Subscription { + + return b.chStore.getNewCh("", getChLen(chLen)) +} + +func getChLen(chLen []int) int { + l := 0 + if len(chLen) > 0 { + l = chLen[0] + } + return l +} + +// only accepts value when running. +func (b *broadcaster) Submit(m *protocol.Envelope) { + if b.running.Load() { + b.input <- m + } +} diff --git a/waku/v2/broadcast_test.go b/waku/v2/protocol/relay/broadcast_test.go similarity index 53% rename from waku/v2/broadcast_test.go rename to waku/v2/protocol/relay/broadcast_test.go index 367a6624..d662c473 100644 --- a/waku/v2/broadcast_test.go +++ b/waku/v2/protocol/relay/broadcast_test.go @@ -1,4 +1,4 @@ -package v2 +package relay import ( "context" @@ -11,8 +11,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" ) -// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 -// by Dustin Sallings (c) 2013, which was released under MIT license func TestBroadcast(t *testing.T) { wg := sync.WaitGroup{} @@ -23,12 +21,11 @@ func TestBroadcast(t *testing.T) { for i := 0; i < 5; i++ { wg.Add(1) - cch := make(chan *protocol.Envelope) - b.Register(nil, cch) + sub := b.RegisterForAll() go func() { defer wg.Done() - defer b.Unregister(nil, cch) - <-cch + defer sub.Unsubscribe() + <-sub.Ch }() } @@ -39,7 +36,7 @@ func TestBroadcast(t *testing.T) { wg.Wait() } -func TestBroadcastWait(t *testing.T) { +func TestBroadcastSpecificTopic(t *testing.T) { wg := sync.WaitGroup{} b := NewBroadcaster(100) @@ -49,14 +46,12 @@ func TestBroadcastWait(t *testing.T) { for i := 0; i < 5; i++ { wg.Add(1) - cch := make(chan *protocol.Envelope) - <-b.WaitRegister(nil, cch) + sub := b.Register("abc") go func() { defer wg.Done() - - <-cch - <-b.WaitUnregister(nil, cch) + <-sub.Ch + sub.Unsubscribe() }() } @@ -67,10 +62,31 @@ func TestBroadcastWait(t *testing.T) { wg.Wait() } +// check return from channel after Stop and multiple unregister func TestBroadcastCleanup(t *testing.T) { b := NewBroadcaster(100) require.NoError(t, b.Start(context.Background())) - topic := "test" - b.Register(&topic, make(chan *protocol.Envelope)) + sub := b.Register("test") b.Stop() + <-sub.Ch + sub.Unsubscribe() + sub.Unsubscribe() +} + +func TestBroadcastUnregisterSub(t *testing.T) { + b := NewBroadcaster(100) + require.NoError(t, b.Start(context.Background())) + subForAll := b.RegisterForAll() + // unregister before submit + specificSub := b.Register("abc") + specificSub.Unsubscribe() + // + env := protocol.NewEnvelope(&pb.WakuMessage{}, utils.GetUnixEpoch(), "abc") + b.Submit(env) + // no message on specific sub + require.Nil(t, <-specificSub.Ch) + // msg on subForAll + require.Equal(t, env, <-subForAll.Ch) + b.Stop() // it automatically unregister/unsubscribe all + require.Equal(t, nil, <-specificSub.Ch) } diff --git a/waku/v2/protocol/relay/subscription.go b/waku/v2/protocol/relay/subscription.go index 79e10379..cac07a0c 100644 --- a/waku/v2/protocol/relay/subscription.go +++ b/waku/v2/protocol/relay/subscription.go @@ -1,34 +1,26 @@ package relay -import ( - "sync" +import "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol" -) - -// Subscription handles the subscrition to a particular pubsub topic type Subscription struct { - sync.RWMutex - - // C is channel used for receiving envelopes - C chan *protocol.Envelope - - closed bool - once sync.Once - quit chan struct{} + Unsubscribe func() + Ch <-chan *protocol.Envelope } -// Unsubscribe will close a subscription from a pubsub topic. Will close the message channel -func (subs *Subscription) Unsubscribe() { - subs.once.Do(func() { - close(subs.quit) - }) - +func NoopSubscription() Subscription { + ch := make(chan *protocol.Envelope) + close(ch) + return Subscription{ + Unsubscribe: func() {}, + Ch: ch, + } } -// IsClosed determine whether a Subscription is still open for receiving messages -func (subs *Subscription) IsClosed() bool { - subs.RLock() - defer subs.RUnlock() - return subs.closed +func ArraySubscription(msgs []*protocol.Envelope) Subscription { + ch := make(chan *protocol.Envelope, len(msgs)) + close(ch) + return Subscription{ + Unsubscribe: func() {}, + Ch: ch, + } } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index bb61e31b..93ba61a1 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -16,7 +16,6 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/waku-org/go-waku/logging" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/hash" "github.com/waku-org/go-waku/waku/v2/metrics" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" @@ -36,7 +35,7 @@ type WakuRelay struct { log *zap.Logger - bcaster v2.Broadcaster + bcaster Broadcaster minPeersToPublish int @@ -59,7 +58,7 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { } // NewWakuRelay returns a new instance of a WakuRelay struct -func NewWakuRelay(bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay { +func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay { w := new(WakuRelay) w.timesource = timesource w.wakuRelayTopics = make(map[string]*pubsub.Topic) @@ -191,7 +190,8 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro return nil, err } w.relaySubs[topic] = sub - + w.wg.Add(1) + go w.subscribeToTopic(topic, sub) w.log.Info("subscribing to topic", zap.String("topic", sub.Topic())) } @@ -275,30 +275,21 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool { // SubscribeToTopic returns a Subscription to receive messages from a pubsub topic func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) { - sub, err := w.subscribe(topic) + _, err := w.subscribe(topic) if err != nil { return nil, err } // Create client subscription - subscription := new(Subscription) - subscription.closed = false - subscription.C = make(chan *waku_proto.Envelope, 1024) // To avoid blocking - subscription.quit = make(chan struct{}) - - w.subscriptionsMutex.Lock() - defer w.subscriptionsMutex.Unlock() - - w.subscriptions[topic] = append(w.subscriptions[topic], subscription) - + subscription := NoopSubscription() if w.bcaster != nil { - w.bcaster.Register(&topic, subscription.C) + subscription = w.bcaster.Register(topic) } - - w.wg.Add(1) - go w.subscribeToTopic(ctx, topic, subscription, sub) - - return subscription, nil + go func() { + <-ctx.Done() + subscription.Unsubscribe() + }() + return &subscription, nil } // SubscribeToTopic returns a Subscription to receive messages from the default waku pubsub topic @@ -332,34 +323,27 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message { msgChannel := make(chan *pubsub.Message, 1024) - go func(msgChannel chan *pubsub.Message) { - defer func() { - if r := recover(); r != nil { - w.log.Debug("recovered msgChannel") - } - }() - + go func() { + defer close(msgChannel) for { msg, err := sub.Next(ctx) if err != nil { if !errors.Is(err, context.Canceled) { w.log.Error("getting message from subscription", zap.Error(err)) } - sub.Cancel() - close(msgChannel) for _, subscription := range w.subscriptions[sub.Topic()] { subscription.Unsubscribe() } + return } - msgChannel <- msg } - }(msgChannel) + }() return msgChannel } -func (w *WakuRelay) subscribeToTopic(userCtx context.Context, pubsubTopic string, subscription *Subscription, sub *pubsub.Subscription) { +func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscription) { defer w.wg.Done() ctx, err := tag.New(w.ctx, tag.Insert(metrics.KeyType, "relay")) @@ -371,28 +355,11 @@ func (w *WakuRelay) subscribeToTopic(userCtx context.Context, pubsubTopic string subChannel := w.nextMessage(w.ctx, sub) for { select { - case <-userCtx.Done(): - return case <-ctx.Done(): return - case <-subscription.quit: - func(topic string) { - subscription.Lock() - defer subscription.Unlock() - - if subscription.closed { - return - } - subscription.closed = true - if w.bcaster != nil { - <-w.bcaster.WaitUnregister(&topic, subscription.C) // Remove from broadcast list - } - - close(subscription.C) - }(pubsubTopic) // TODO: if there are no more relay subscriptions, close the pubsub subscription - case msg := <-subChannel: - if msg == nil { + case msg, ok := <-subChannel: + if !ok { return } wakuMessage := &pb.WakuMessage{} diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index 4f0203ed..c7fa7382 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -45,7 +46,7 @@ func TestResume(t *testing.T) { s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx) + err = s1.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) defer s1.Stop() @@ -67,7 +68,7 @@ func TestResume(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx) + err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) defer s2.Stop() @@ -105,7 +106,7 @@ func TestResumeWithListOfPeers(t *testing.T) { s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx) + err = s1.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) defer s1.Stop() @@ -119,7 +120,7 @@ func TestResumeWithListOfPeers(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx) + err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) defer s2.Stop() @@ -146,7 +147,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx) + err = s1.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) defer s1.Stop() @@ -160,7 +161,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx) + err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/store/waku_store_common.go index e6a863de..ffbe7cf8 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/store/waku_store_common.go @@ -7,7 +7,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" ) @@ -49,7 +49,7 @@ type WakuStore struct { ctx context.Context cancel context.CancelFunc timesource timesource.Timesource - MsgC chan *protocol.Envelope + MsgC relay.Subscription wg *sync.WaitGroup log *zap.Logger diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/store/waku_store_protocol.go index c6db5c39..8e560132 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/store/waku_store_protocol.go @@ -18,6 +18,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/protocol" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" ) @@ -85,12 +86,11 @@ type MessageProvider interface { type Store interface { SetHost(h host.Host) - Start(ctx context.Context) error + Start(context.Context, relay.Subscription) error Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*wpb.WakuMessage, error) Next(ctx context.Context, r *Result) (*Result, error) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) - MessageChannel() chan *protocol.Envelope Stop() } @@ -105,7 +105,7 @@ func (store *WakuStore) SetHost(h host.Host) { } // Start initializes the WakuStore by enabling the protocol and fetching records from a message provider -func (store *WakuStore) Start(ctx context.Context) error { +func (store *WakuStore) Start(ctx context.Context, sub relay.Subscription) error { if store.started { return nil } @@ -123,7 +123,7 @@ func (store *WakuStore) Start(ctx context.Context) error { store.started = true store.ctx, store.cancel = context.WithCancel(ctx) - store.MsgC = make(chan *protocol.Envelope, 1024) + store.MsgC = sub store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest) @@ -158,7 +158,7 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) error { func (store *WakuStore) storeIncomingMessages(ctx context.Context) { defer store.wg.Done() - for envelope := range store.MsgC { + for envelope := range store.MsgC.Ch { go func(env *protocol.Envelope) { _ = store.storeMessage(env) }(envelope) @@ -207,10 +207,6 @@ func (store *WakuStore) onRequest(s network.Stream) { } } -func (store *WakuStore) MessageChannel() chan *protocol.Envelope { - return store.MsgC -} - // TODO: queryWithAccounting // Stop closes the store message channel and removes the protocol stream handler @@ -223,9 +219,7 @@ func (store *WakuStore) Stop() { store.started = false - if store.MsgC != nil { - close(store.MsgC) - } + store.MsgC.Unsubscribe() if store.msgProvider != nil { store.msgProvider.Stop() // TODO: StoreProtocol should not stop a message provider diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 2c57cbdc..cc03eca3 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -24,10 +25,6 @@ func TestWakuStoreProtocolQuery(t *testing.T) { s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx) - require.NoError(t, err) - - defer s1.Stop() topic1 := "1" pubsubTopic1 := "topic1" @@ -38,15 +35,18 @@ func TestWakuStoreProtocolQuery(t *testing.T) { Version: 0, Timestamp: utils.GetUnixEpoch(), } - host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) // Simulate a message has been received via relay protocol - s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1) + sub := relay.ArraySubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)}) + err = s1.Start(ctx, sub) + require.NoError(t, err) + defer s1.Stop() s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) s2.SetHost(host2) - err = s2.Start(ctx) + err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) defer s2.Stop() @@ -75,10 +75,6 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) { s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx) - require.NoError(t, err) - - defer s1.Stop() topic1 := "1" pubsubTopic1 := "topic1" @@ -91,7 +87,10 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) { } // Simulate a message has been received via relay protocol - s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1) + sub := relay.ArraySubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)}) + err = s1.Start(ctx, sub) + require.NoError(t, err) + defer s1.Stop() time.Sleep(100 * time.Millisecond) @@ -116,8 +115,6 @@ func TestWakuStoreProtocolNext(t *testing.T) { db := MemoryDB(t) s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx) - require.NoError(t, err) topic1 := "1" pubsubTopic1 := "topic1" @@ -129,11 +126,15 @@ func TestWakuStoreProtocolNext(t *testing.T) { msg4 := tests.CreateWakuMessage(topic1, now+4) msg5 := tests.CreateWakuMessage(topic1, now+5) - s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1) + sub := relay.ArraySubscription([]*protocol.Envelope{ + protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1), + }) + err = s1.Start(ctx, sub) + require.NoError(t, err) host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) @@ -144,7 +145,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx) + err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) defer s2.Stop() @@ -189,8 +190,6 @@ func TestWakuStoreResult(t *testing.T) { db := MemoryDB(t) s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx) - require.NoError(t, err) topic1 := "1" pubsubTopic1 := "topic1" @@ -202,11 +201,15 @@ func TestWakuStoreResult(t *testing.T) { msg4 := tests.CreateWakuMessage(topic1, now+4) msg5 := tests.CreateWakuMessage(topic1, now+5) - s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1) + sub := relay.ArraySubscription([]*protocol.Envelope{ + protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1), + }) + err = s1.Start(ctx, sub) + require.NoError(t, err) host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) @@ -217,7 +220,7 @@ func TestWakuStoreResult(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx) + err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) defer s2.Stop() @@ -277,9 +280,6 @@ func TestWakuStoreProtocolFind(t *testing.T) { s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) - err = s1.Start(ctx) - require.NoError(t, err) - defer s1.Stop() topic1 := "1" pubsubTopic1 := "topic1" @@ -295,15 +295,20 @@ func TestWakuStoreProtocolFind(t *testing.T) { msg8 := tests.CreateWakuMessage(topic1, now+8) msg9 := tests.CreateWakuMessage(topic1, now+9) - s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg6, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg7, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg8, utils.GetUnixEpoch(), pubsubTopic1) - s1.MsgC <- protocol.NewEnvelope(msg9, utils.GetUnixEpoch(), pubsubTopic1) + sub := relay.ArraySubscription([]*protocol.Envelope{ + protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg6, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg7, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg8, utils.GetUnixEpoch(), pubsubTopic1), + protocol.NewEnvelope(msg9, utils.GetUnixEpoch(), pubsubTopic1), + }) + err = s1.Start(ctx, sub) + require.NoError(t, err) + defer s1.Stop() host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) @@ -314,7 +319,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) - err = s2.Start(ctx) + err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) defer s2.Stop() diff --git a/waku/v2/rest/relay.go b/waku/v2/rest/relay.go index 12d81be6..4cce1daa 100644 --- a/waku/v2/rest/relay.go +++ b/waku/v2/rest/relay.go @@ -135,7 +135,7 @@ func (d *RelayService) postV1Subscriptions(w http.ResponseWriter, r *http.Reques if err != nil { d.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) } else { - d.node.Broadcaster().Unregister(&topicToSubscribe, sub.C) + sub.Unsubscribe() d.messagesMutex.Lock() d.messages[topic] = []*pb.WakuMessage{} d.messagesMutex.Unlock() diff --git a/waku/v2/rest/runner.go b/waku/v2/rest/runner.go index d3455948..e369f3a6 100644 --- a/waku/v2/rest/runner.go +++ b/waku/v2/rest/runner.go @@ -3,20 +3,20 @@ package rest import ( "context" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" ) type Adder func(msg *protocol.Envelope) type runnerService struct { - broadcaster v2.Broadcaster - ch chan *protocol.Envelope + broadcaster relay.Broadcaster + sub relay.Subscription cancel context.CancelFunc adder Adder } -func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService { +func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService { return &runnerService{ broadcaster: broadcaster, adder: adder, @@ -25,15 +25,16 @@ func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService { func (r *runnerService) Start(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) - r.ch = make(chan *protocol.Envelope, 1024) r.cancel = cancel - r.broadcaster.Register(nil, r.ch) + r.sub = r.broadcaster.RegisterForAll(1024) for { select { case <-ctx.Done(): return - case envelope := <-r.ch: - r.adder(envelope) + case envelope, ok := <-r.sub.Ch: + if !ok { + r.adder(envelope) + } } } } @@ -42,7 +43,6 @@ func (r *runnerService) Stop() { if r.cancel == nil { return } + r.sub.Unsubscribe() r.cancel() - r.broadcaster.Unregister(nil, r.ch) - close(r.ch) } diff --git a/waku/v2/rest/store_test.go b/waku/v2/rest/store_test.go index 7f9da8b1..ec1b3a85 100644 --- a/waku/v2/rest/store_test.go +++ b/waku/v2/rest/store_test.go @@ -37,9 +37,9 @@ func TestGetMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic1, now+2) msg3 := tests.CreateWakuMessage(topic1, now+3) - node1.Store().MessageChannel() <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) - node1.Store().MessageChannel() <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1) - node1.Store().MessageChannel() <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1) + node1.Broadcaster().Submit(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)) + node1.Broadcaster().Submit(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)) + node1.Broadcaster().Submit(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)) n1HostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", node1.Host().ID().Pretty())) n1Addr := node1.ListenAddresses()[0].Encapsulate(n1HostInfo) diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 5fbbc897..7da4c526 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -10,7 +10,6 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" @@ -50,7 +49,7 @@ func TestFilterSubscription(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - b := v2.NewBroadcaster(10) + b := relay.NewBroadcaster(10) require.NoError(t, b.Start(context.Background())) node := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger()) node.SetHost(host) @@ -60,11 +59,11 @@ func TestFilterSubscription(t *testing.T) { _, err = node.SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - b2 := v2.NewBroadcaster(10) + b2 := relay.NewBroadcaster(10) require.NoError(t, b2.Start(context.Background())) f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), utils.Logger()) f.SetHost(host) - err = f.Start(context.Background()) + err = f.Start(context.Background(), relay.NoopSubscription()) require.NoError(t, err) d := makeFilterService(t, true) diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index 180b59d4..a56de54c 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -106,11 +106,11 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r if topic == "" { var sub *relay.Subscription sub, err = r.node.Relay().Subscribe(ctx) - r.node.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C) + sub.Unsubscribe() } else { var sub *relay.Subscription sub, err = r.node.Relay().SubscribeToTopic(ctx, topic) - r.node.Broadcaster().Unregister(&topic, sub.C) + sub.Unsubscribe() } if err != nil { r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) diff --git a/waku/v2/rpc/runner.go b/waku/v2/rpc/runner.go index 3695a5b8..d41340ce 100644 --- a/waku/v2/rpc/runner.go +++ b/waku/v2/rpc/runner.go @@ -1,43 +1,32 @@ package rpc import ( - v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" ) type Adder func(msg *protocol.Envelope) type runnerService struct { - broadcaster v2.Broadcaster - ch chan *protocol.Envelope - quit chan bool + broadcaster relay.Broadcaster + sub relay.Subscription adder Adder } -func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService { +func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService { return &runnerService{ broadcaster: broadcaster, - quit: make(chan bool), adder: adder, } } func (r *runnerService) Start() { - r.ch = make(chan *protocol.Envelope, 1024) - r.broadcaster.Register(nil, r.ch) - - for { - select { - case <-r.quit: - return - case envelope := <-r.ch: - r.adder(envelope) - } + r.broadcaster.RegisterForAll(1024) + for envelope := range r.sub.Ch { + r.adder(envelope) } } func (r *runnerService) Stop() { - r.quit <- true - r.broadcaster.Unregister(nil, r.ch) - close(r.ch) + r.sub.Unsubscribe() }