feat: safe broadcaster

This commit is contained in:
harsh-98 2023-05-05 15:19:15 +05:30 committed by RichΛrd
parent 02fdf916d8
commit 31c8035589
31 changed files with 399 additions and 511 deletions

View File

@ -98,7 +98,7 @@ func relaySubscribe(topic string) error {
relaySubscriptions[topicToSubscribe] = subscription
go func(subscription *relay.Subscription) {
for envelope := range subscription.C {
for envelope := range subscription.Ch {
send("message", toSubscriptionMessage(envelope))
}
}(subscription)

View File

@ -42,7 +42,7 @@ func TestBasicSendingReceiving(t *testing.T) {
sub, err := wakuNode.Relay().Subscribe(ctx)
require.NoError(t, err)
value := <-sub.C
value := <-sub.Ch
payload, err := payload.DecodePayload(value.Message(), &payload.KeyInfo{Kind: payload.None})
require.NoError(t, err)

View File

@ -298,7 +298,7 @@ func Execute(options Options) {
nodeTopic := nodeTopic
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic)
failOnErr(err, "Error subscring to topic")
wakuNode.Broadcaster().Unregister(&nodeTopic, sub.C)
sub.Unsubscribe()
}
for _, protectedTopic := range options.Relay.ProtectedTopics {

View File

@ -1,213 +0,0 @@
package v2
import (
"context"
"errors"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120
// by Dustin Sallings (c) 2013, which was released under MIT license
type doneCh chan struct{}
type chOperation struct {
ch chan<- *protocol.Envelope
topic *string
done doneCh
}
type broadcastOutputs map[chan<- *protocol.Envelope]struct{}
type broadcaster struct {
bufLen int
cancel context.CancelFunc
input chan *protocol.Envelope
reg chan chOperation
unreg chan chOperation
outputs broadcastOutputs
outputsPerTopic map[string]broadcastOutputs
}
// The Broadcaster interface describes the main entry points to
// broadcasters.
type Broadcaster interface {
// Register a new channel to receive broadcasts from a pubsubtopic
Register(topic *string, newch chan<- *protocol.Envelope)
// Register a new channel to receive broadcasts from a pubsub topic and return a channel to wait until this operation is complete
WaitRegister(topic *string, newch chan<- *protocol.Envelope) doneCh
// Unregister a channel so that it no longer receives broadcasts from a pubsub topic
Unregister(topic *string, newch chan<- *protocol.Envelope)
// Unregister a subscriptor channel and return a channel to wait until this operation is done
WaitUnregister(topic *string, newch chan<- *protocol.Envelope) doneCh
// Start
Start(ctx context.Context) error
// Shut this broadcaster down.
Stop()
// Submit a new object to all subscribers
Submit(*protocol.Envelope)
}
func (b *broadcaster) broadcast(m *protocol.Envelope) {
for ch := range b.outputs {
ch <- m
}
outputs, ok := b.outputsPerTopic[m.PubsubTopic()]
if !ok {
return
}
for ch := range outputs {
ch <- m
}
}
func (b *broadcaster) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case m, ok := <-b.input:
if ok {
b.broadcast(m)
}
case broadcastee, ok := <-b.reg:
if ok {
if broadcastee.topic != nil {
topicOutputs, ok := b.outputsPerTopic[*broadcastee.topic]
if !ok {
b.outputsPerTopic[*broadcastee.topic] = make(broadcastOutputs)
topicOutputs = b.outputsPerTopic[*broadcastee.topic]
}
topicOutputs[broadcastee.ch] = struct{}{}
b.outputsPerTopic[*broadcastee.topic] = topicOutputs
} else {
b.outputs[broadcastee.ch] = struct{}{}
}
if broadcastee.done != nil {
broadcastee.done <- struct{}{}
}
} else {
if broadcastee.done != nil {
broadcastee.done <- struct{}{}
}
return
}
case broadcastee := <-b.unreg:
if broadcastee.topic != nil {
topicOutputs, ok := b.outputsPerTopic[*broadcastee.topic]
if !ok {
continue
}
delete(topicOutputs, broadcastee.ch)
b.outputsPerTopic[*broadcastee.topic] = topicOutputs
} else {
delete(b.outputs, broadcastee.ch)
}
if broadcastee.done != nil {
broadcastee.done <- struct{}{}
}
}
}
}
// NewBroadcaster creates a Broadcaster with an specified length
// It's used to register subscriptors that will need to receive
// an Envelope containing a WakuMessage
func NewBroadcaster(buflen int) Broadcaster {
return &broadcaster{
bufLen: buflen,
}
}
func (b *broadcaster) Start(ctx context.Context) error {
if b.cancel != nil {
return errors.New("already started")
}
ctx, cancel := context.WithCancel(ctx)
b.cancel = cancel
b.input = make(chan *protocol.Envelope, b.bufLen)
b.reg = make(chan chOperation)
b.unreg = make(chan chOperation)
b.outputs = make(broadcastOutputs)
b.outputsPerTopic = make(map[string]broadcastOutputs)
go b.run(ctx)
return nil
}
func (b *broadcaster) Stop() {
if b.cancel != nil {
return
}
b.cancel()
close(b.input)
close(b.reg)
close(b.unreg)
b.outputs = nil
b.outputsPerTopic = nil
b.cancel = nil
}
// Register a subscriptor channel and return a channel to wait until this operation is done
func (b *broadcaster) WaitRegister(topic *string, newch chan<- *protocol.Envelope) doneCh {
d := make(doneCh)
b.reg <- chOperation{
ch: newch,
topic: topic,
done: d,
}
return d
}
// Register a subscriptor channel
func (b *broadcaster) Register(topic *string, newch chan<- *protocol.Envelope) {
b.reg <- chOperation{
ch: newch,
topic: topic,
done: nil,
}
}
// Unregister a subscriptor channel and return a channel to wait until this operation is done
func (b *broadcaster) WaitUnregister(topic *string, newch chan<- *protocol.Envelope) doneCh {
d := make(doneCh)
b.unreg <- chOperation{
ch: newch,
topic: topic,
done: d,
}
return d
}
// Unregister a subscriptor channel
func (b *broadcaster) Unregister(topic *string, newch chan<- *protocol.Envelope) {
b.unreg <- chOperation{
ch: newch,
topic: topic,
done: nil,
}
}
// Closes the broadcaster. Used to stop receiving new subscribers
func (b *broadcaster) Close() {
close(b.reg)
}
// Submits an Envelope to be broadcasted among all registered subscriber channels
func (b *broadcaster) Submit(m *protocol.Envelope) {
if b != nil {
b.input <- m
}
}

View File

@ -5,18 +5,19 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
type Service interface {
SetHost(h host.Host)
Start(ctx context.Context) error
Start(context.Context) error
Stop()
}
type ReceptorService interface {
Service
MessageChannel() chan *protocol.Envelope
SetHost(h host.Host)
Stop()
Start(context.Context, relay.Subscription) error
}
type PeerConnectorService interface {

View File

@ -93,7 +93,7 @@ type WakuNode struct {
localNode *enode.LocalNode
bcaster v2.Broadcaster
bcaster relay.Broadcaster
connectionNotif ConnectionNotifier
protocolEventSub event.Subscription
@ -171,7 +171,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
var err error
w := new(WakuNode)
w.bcaster = v2.NewBroadcaster(1024)
w.bcaster = relay.NewBroadcaster(1024)
w.opts = params
w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{}
@ -223,7 +223,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...)
w.filterFullnode = filter.NewWakuFilterFullnode(w.bcaster, w.timesource, w.log, w.opts.filterOpts...)
w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...)
w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.log)
@ -345,21 +345,19 @@ func (w *WakuNode) Start(ctx context.Context) error {
if err != nil {
return err
}
w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
sub.Unsubscribe()
}
}
w.store = w.storeFactory(w)
w.store.SetHost(host)
if w.opts.enableStore {
err := w.startStore(ctx)
sub := w.bcaster.RegisterForAll()
err := w.startStore(ctx, sub)
if err != nil {
return err
}
w.log.Info("Subscribing store to broadcaster")
w.bcaster.Register(nil, w.store.MessageChannel())
}
w.lightPush.SetHost(host)
@ -371,24 +369,23 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.legacyFilter.SetHost(host)
if w.opts.enableLegacyFilter {
err := w.legacyFilter.Start(ctx)
sub := w.bcaster.RegisterForAll()
err := w.legacyFilter.Start(ctx, sub)
if err != nil {
return err
}
w.log.Info("Subscribing filter to broadcaster")
w.bcaster.Register(nil, w.legacyFilter.MessageChannel())
}
w.filterFullnode.SetHost(host)
if w.opts.enableFilterFullNode {
err := w.filterFullnode.Start(ctx)
sub := w.bcaster.RegisterForAll()
err := w.filterFullnode.Start(ctx, sub)
if err != nil {
return err
}
w.log.Info("Subscribing filterV2 to broadcaster")
w.bcaster.Register(nil, w.filterFullnode.MessageChannel())
}
w.filterLightnode.SetHost(host)
@ -593,7 +590,7 @@ func (w *WakuNode) PeerExchange() *peer_exchange.WakuPeerExchange {
// Broadcaster is used to access the message broadcaster that is used to push
// messages to different protocols
func (w *WakuNode) Broadcaster() v2.Broadcaster {
func (w *WakuNode) Broadcaster() relay.Broadcaster {
return w.bcaster
}
@ -642,8 +639,8 @@ func (w *WakuNode) mountDiscV5() error {
return err
}
func (w *WakuNode) startStore(ctx context.Context) error {
err := w.store.Start(ctx)
func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error {
err := w.store.Start(ctx, sub)
if err != nil {
w.log.Error("starting store", zap.Error(err))
return err

View File

@ -116,7 +116,7 @@ func Test500(t *testing.T) {
select {
case <-ticker.C:
require.Fail(t, "Timeout Sub1")
case msg := <-sub1.C:
case msg := <-sub1.Ch:
if msg == nil {
return
}
@ -137,7 +137,7 @@ func Test500(t *testing.T) {
select {
case <-ticker.C:
require.Fail(t, "Timeout Sub2")
case msg := <-sub2.C:
case msg := <-sub2.Ch:
if msg == nil {
return
}

View File

@ -15,11 +15,11 @@ import (
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/tag"
"go.uber.org/zap"
@ -37,7 +37,7 @@ type WakuFilterLightnode struct {
cancel context.CancelFunc
ctx context.Context
h host.Host
broadcaster v2.Broadcaster
broadcaster relay.Broadcaster
timesource timesource.Timesource
wg *sync.WaitGroup
log *zap.Logger
@ -55,7 +55,7 @@ type WakuFilterPushResult struct {
}
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterLightnode(broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode {
func NewWakuFilterLightnode(broadcaster relay.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode {
wf := new(WakuFilterLightnode)
wf.log = log.Named("filterv2-lightnode")
wf.broadcaster = broadcaster

View File

@ -12,13 +12,12 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) {
func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
@ -43,7 +42,7 @@ func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
b := v2.NewBroadcaster(10)
b := relay.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
filterPush := NewWakuFilterLightnode(b, timesource.NewDefaultClock(), utils.Logger())
filterPush.SetHost(host)
@ -73,19 +72,18 @@ func TestWakuFilter(t *testing.T) {
node1, host1 := makeWakuFilterLightNode(t)
defer node1.Stop()
broadcaster := v2.NewBroadcaster(10)
broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter := NewWakuFilterFullnode(broadcaster, timesource.NewDefaultClock(), utils.Logger())
node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger())
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx)
sub := broadcaster.Register(testTopic)
err := node2Filter.Start(ctx, sub)
require.NoError(t, err)
broadcaster.Register(&testTopic, node2Filter.MessageChannel())
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), FilterSubscribeID_v20beta1)
require.NoError(t, err)
@ -163,15 +161,15 @@ func TestSubscriptionPing(t *testing.T) {
node1, host1 := makeWakuFilterLightNode(t)
defer node1.Stop()
broadcaster := v2.NewBroadcaster(10)
broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter := NewWakuFilterFullnode(broadcaster, timesource.NewDefaultClock(), utils.Logger())
node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger())
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx)
err := node2Filter.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
@ -204,21 +202,20 @@ func TestWakuFilterPeerFailure(t *testing.T) {
node1, host1 := makeWakuFilterLightNode(t)
broadcaster := v2.NewBroadcaster(10)
broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
broadcaster2 := v2.NewBroadcaster(10)
broadcaster2 := relay.NewBroadcaster(10)
require.NoError(t, broadcaster2.Start(context.Background()))
node2Filter := NewWakuFilterFullnode(broadcaster2, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second))
node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second))
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx)
sub := broadcaster.Register(testTopic)
err := node2Filter.Start(ctx, sub)
require.NoError(t, err)
broadcaster.Register(&testTopic, node2Filter.MessageChannel())
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), FilterPushID_v20beta1)
require.NoError(t, err)

View File

@ -15,10 +15,10 @@ import (
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
@ -35,7 +35,7 @@ type (
WakuFilterFullNode struct {
cancel context.CancelFunc
h host.Host
msgC chan *protocol.Envelope
msgSub relay.Subscription
wg *sync.WaitGroup
log *zap.Logger
@ -46,7 +46,7 @@ type (
)
// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterFullnode(broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode {
func NewWakuFilterFullnode(timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode {
wf := new(WakuFilterFullNode)
wf.log = log.Named("filterv2-fullnode")
@ -69,7 +69,7 @@ func (wf *WakuFilterFullNode) SetHost(h host.Host) {
wf.h = h
}
func (wf *WakuFilterFullNode) Start(ctx context.Context) error {
func (wf *WakuFilterFullNode) Start(ctx context.Context, sub relay.Subscription) error {
wf.wg.Wait() // Wait for any goroutines to stop
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
@ -83,8 +83,7 @@ func (wf *WakuFilterFullNode) Start(ctx context.Context) error {
wf.h.SetStreamHandlerMatch(FilterSubscribeID_v20beta1, protocol.PrefixTextMatch(string(FilterSubscribeID_v20beta1)), wf.onRequest(ctx))
wf.cancel = cancel
wf.msgC = make(chan *protocol.Envelope, 1024)
wf.msgSub = sub
wf.wg.Add(1)
go wf.filterListener(ctx)
@ -268,7 +267,7 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
return nil
}
for m := range wf.msgC {
for m := range wf.msgSub.Ch {
if err := handle(m); err != nil {
wf.log.Error("handling message", zap.Error(err))
}
@ -339,11 +338,7 @@ func (wf *WakuFilterFullNode) Stop() {
wf.cancel()
close(wf.msgC)
wf.msgSub.Unsubscribe()
wf.wg.Wait()
}
func (wf *WakuFilterFullNode) MessageChannel() chan *protocol.Envelope {
return wf.msgC
}

View File

@ -3,9 +3,9 @@ package legacy_filter
import (
"sync"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
@ -13,7 +13,7 @@ type FilterMap struct {
sync.RWMutex
timesource timesource.Timesource
items map[string]Filter
broadcaster v2.Broadcaster
broadcaster relay.Broadcaster
}
type FilterMapItem struct {
@ -21,7 +21,7 @@ type FilterMapItem struct {
Value Filter
}
func NewFilterMap(broadcaster v2.Broadcaster, timesource timesource.Timesource) *FilterMap {
func NewFilterMap(broadcaster relay.Broadcaster, timesource timesource.Timesource) *FilterMap {
return &FilterMap{
timesource: timesource,
items: make(map[string]Filter),

View File

@ -5,13 +5,13 @@ import (
"testing"
"github.com/stretchr/testify/require"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
func TestFilterMap(t *testing.T) {
b := v2.NewBroadcaster(100)
b := relay.NewBroadcaster(100)
require.NoError(t, b.Start(context.Background()))
fmap := NewFilterMap(b, timesource.NewDefaultClock())

View File

@ -13,7 +13,6 @@ import (
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
@ -53,7 +52,7 @@ type (
cancel context.CancelFunc
h host.Host
isFullNode bool
msgC chan *protocol.Envelope
msgSub relay.Subscription
wg *sync.WaitGroup
log *zap.Logger
@ -66,7 +65,7 @@ type (
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilter(broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilter {
func NewWakuFilter(broadcaster relay.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilter {
wf := new(WakuFilter)
wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode))
@ -90,7 +89,7 @@ func (wf *WakuFilter) SetHost(h host.Host) {
wf.h = h
}
func (wf *WakuFilter) Start(ctx context.Context) error {
func (wf *WakuFilter) Start(ctx context.Context, sub relay.Subscription) error {
wf.wg.Wait() // Wait for any goroutines to stop
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
@ -104,7 +103,7 @@ func (wf *WakuFilter) Start(ctx context.Context) error {
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest(ctx))
wf.cancel = cancel
wf.msgC = make(chan *protocol.Envelope, 1024)
wf.msgSub = sub
wf.wg.Add(1)
go wf.filterListener(ctx)
@ -239,7 +238,7 @@ func (wf *WakuFilter) filterListener(ctx context.Context) {
return g.Wait()
}
for m := range wf.msgC {
for m := range wf.msgSub.Ch {
if err := handle(m); err != nil {
wf.log.Error("handling message", zap.Error(err))
}
@ -362,7 +361,7 @@ func (wf *WakuFilter) Stop() {
wf.cancel()
close(wf.msgC)
wf.msgSub.Unsubscribe()
wf.h.RemoveStreamHandler(FilterID_v20beta1)
wf.filters.RemoveAll()
@ -480,7 +479,3 @@ func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) e
return nil
}
func (wf *WakuFilter) MessageChannel() chan *protocol.Envelope {
return wf.msgC
}

View File

@ -11,13 +11,12 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) {
func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
@ -42,11 +41,11 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
b := v2.NewBroadcaster(10)
b := relay.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
filter := NewWakuFilter(b, false, timesource.NewDefaultClock(), utils.Logger())
filter.SetHost(host)
err = filter.Start(context.Background())
err = filter.Start(context.Background(), relay.NoopSubscription())
require.NoError(t, err)
return filter, host
@ -72,7 +71,7 @@ func TestWakuFilter(t *testing.T) {
node1, host1 := makeWakuFilter(t)
defer node1.Stop()
broadcaster := v2.NewBroadcaster(10)
broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
@ -80,11 +79,10 @@ func TestWakuFilter(t *testing.T) {
node2Filter := NewWakuFilter(broadcaster, true, timesource.NewDefaultClock(), utils.Logger())
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx)
sub := broadcaster.Register(testTopic)
err := node2Filter.Start(ctx, sub)
require.NoError(t, err)
broadcaster.Register(&testTopic, node2Filter.MessageChannel())
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), FilterID_v20beta1)
require.NoError(t, err)
@ -162,21 +160,20 @@ func TestWakuFilterPeerFailure(t *testing.T) {
node1, host1 := makeWakuFilter(t)
broadcaster := v2.NewBroadcaster(10)
broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
broadcaster2 := v2.NewBroadcaster(10)
broadcaster2 := relay.NewBroadcaster(10)
require.NoError(t, broadcaster2.Start(context.Background()))
node2Filter := NewWakuFilter(broadcaster2, true, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(3*time.Second))
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx)
sub := broadcaster.Register(testTopic)
err := node2Filter.Start(ctx, sub)
require.NoError(t, err)
broadcaster.Register(&testTopic, node2Filter.MessageChannel())
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), FilterID_v20beta1)
require.NoError(t, err)

View File

@ -11,7 +11,6 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
@ -26,7 +25,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
b := v2.NewBroadcaster(10)
b := relay.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
relay := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger())
relay.SetHost(host)
@ -101,15 +100,15 @@ func TestWakuLightPush(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
<-sub1.C
<-sub1.C
<-sub1.Ch
<-sub1.Ch
}()
wg.Add(1)
go func() {
defer wg.Done()
<-sub2.C
<-sub2.C
<-sub2.Ch
<-sub2.Ch
}()
// Verifying successful request

View File

@ -4,8 +4,6 @@ import (
"context"
n "github.com/waku-org/go-noise"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
@ -18,15 +16,15 @@ type NoiseMessenger interface {
}
type contentTopicSubscription struct {
envChan chan *protocol.Envelope
msgChan chan *pb.WakuMessage
broadcastSub relay.Subscription
msgChan chan *pb.WakuMessage
}
type NoiseWakuRelay struct {
NoiseMessenger
relay *relay.WakuRelay
relaySub *relay.Subscription
broadcaster v2.Broadcaster
broadcaster relay.Broadcaster
cancel context.CancelFunc
timesource timesource.Timesource
pubsubTopic string
@ -53,7 +51,7 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic
relaySub: subs,
cancel: cancel,
timesource: timesource,
broadcaster: v2.NewBroadcaster(1024),
broadcaster: relay.NewBroadcaster(1024),
pubsubTopic: topic,
subscriptionChPerContentTopic: make(map[string][]contentTopicSubscription),
}
@ -70,7 +68,7 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic
subs.Unsubscribe()
wr.broadcaster.Stop()
return
case envelope := <-subs.C:
case envelope := <-subs.Ch:
if envelope != nil {
wr.broadcaster.Submit(envelope)
}
@ -83,11 +81,11 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic
func (r *NoiseWakuRelay) Subscribe(ctx context.Context, contentTopic string) <-chan *pb.WakuMessage {
sub := contentTopicSubscription{
envChan: make(chan *protocol.Envelope, 1024),
msgChan: make(chan *pb.WakuMessage, 1024),
}
r.broadcaster.Register(&r.pubsubTopic, sub.envChan)
broadcastSub := r.broadcaster.RegisterForAll(1024)
sub.broadcastSub = broadcastSub
subscriptionCh := r.subscriptionChPerContentTopic[contentTopic]
subscriptionCh = append(subscriptionCh, sub)
@ -98,7 +96,7 @@ func (r *NoiseWakuRelay) Subscribe(ctx context.Context, contentTopic string) <-c
select {
case <-ctx.Done():
return
case env := <-sub.envChan:
case env := <-sub.broadcastSub.Ch:
if env == nil {
return
}
@ -138,7 +136,7 @@ func (r *NoiseWakuRelay) Stop() {
r.cancel()
for _, contentTopicSubscriptions := range r.subscriptionChPerContentTopic {
for _, c := range contentTopicSubscriptions {
close(c.envChan)
c.broadcastSub.Unsubscribe()
close(c.msgChan)
}
}

View File

@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"
n "github.com/waku-org/go-noise"
"github.com/waku-org/go-waku/tests"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
@ -26,7 +25,7 @@ func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
b := v2.NewBroadcaster(1024)
b := relay.NewBroadcaster(1024)
require.NoError(t, b.Start(context.Background()))
relay := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger())
relay.SetHost(host)

View File

@ -0,0 +1,160 @@
package relay
import (
"context"
"errors"
"sync"
"sync/atomic"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
type chStore struct {
mu sync.RWMutex
topicToChans map[string]map[int]chan *protocol.Envelope
id int
}
func newChStore() chStore {
return chStore{
topicToChans: make(map[string]map[int]chan *protocol.Envelope),
}
}
func (s *chStore) getNewCh(topic string, chLen int) Subscription {
ch := make(chan *protocol.Envelope, chLen)
s.mu.Lock()
defer s.mu.Unlock()
s.id++
//
if s.topicToChans[topic] == nil {
s.topicToChans[topic] = make(map[int]chan *protocol.Envelope)
}
id := s.id
s.topicToChans[topic][id] = ch
return Subscription{
// read only channel , will not block forever, return once closed.
Ch: ch,
// Unsubscribe function is safe, can be called multiple times
// and even after broadcaster has stopped running.
Unsubscribe: func() {
s.mu.Lock()
defer s.mu.Unlock()
if s.topicToChans[topic] == nil {
return
}
if ch := s.topicToChans[topic][id]; ch != nil {
close(ch)
delete(s.topicToChans[topic], id)
}
},
}
}
func (s *chStore) broadcast(m *protocol.Envelope) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, ch := range s.topicToChans[m.PubsubTopic()] {
ch <- m
}
// send to all registered subscribers
for _, ch := range s.topicToChans[""] {
ch <- m
}
}
func (b *chStore) close() {
b.mu.Lock()
defer b.mu.Unlock()
for _, chans := range b.topicToChans {
for _, ch := range chans {
close(ch)
}
}
b.topicToChans = nil
}
type Broadcaster interface {
Start(ctx context.Context) error
Stop()
Register(topic string, chLen ...int) Subscription
RegisterForAll(chLen ...int) Subscription
Submit(*protocol.Envelope)
}
// ////
// thread safe
// panic safe, input can't be submitted to `input` channel after stop
// lock safe, only read channels are returned and later closed, calling code has guarantee Register channel will not block forever.
// no opened channel leaked, all created only read channels are closed when stop
type broadcaster struct {
bufLen int
cancel context.CancelFunc
input chan *protocol.Envelope
//
chStore chStore
running atomic.Bool
}
func NewBroadcaster(bufLen int) *broadcaster {
return &broadcaster{
bufLen: bufLen,
}
}
func (b *broadcaster) Start(ctx context.Context) error {
if !b.running.CompareAndSwap(false, true) { // if not running then start
return errors.New("already started")
}
ctx, cancel := context.WithCancel(ctx)
b.cancel = cancel
b.chStore = newChStore()
b.input = make(chan *protocol.Envelope, b.bufLen)
go b.run(ctx)
return nil
}
func (b *broadcaster) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-b.input:
b.chStore.broadcast(msg)
}
}
}
func (b *broadcaster) Stop() {
if !b.running.CompareAndSwap(true, false) { // if running then stop
return
}
b.chStore.close() // close all channels that we send to
close(b.input) // close input channel
b.cancel() // exit the run loop
}
// returned subscription is all speicfied topic
func (b *broadcaster) Register(topic string, chLen ...int) Subscription {
return b.chStore.getNewCh(topic, getChLen(chLen))
}
// return subscription is for all topic
func (b *broadcaster) RegisterForAll(chLen ...int) Subscription {
return b.chStore.getNewCh("", getChLen(chLen))
}
func getChLen(chLen []int) int {
l := 0
if len(chLen) > 0 {
l = chLen[0]
}
return l
}
// only accepts value when running.
func (b *broadcaster) Submit(m *protocol.Envelope) {
if b.running.Load() {
b.input <- m
}
}

View File

@ -1,4 +1,4 @@
package v2
package relay
import (
"context"
@ -11,8 +11,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils"
)
// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120
// by Dustin Sallings (c) 2013, which was released under MIT license
func TestBroadcast(t *testing.T) {
wg := sync.WaitGroup{}
@ -23,12 +21,11 @@ func TestBroadcast(t *testing.T) {
for i := 0; i < 5; i++ {
wg.Add(1)
cch := make(chan *protocol.Envelope)
b.Register(nil, cch)
sub := b.RegisterForAll()
go func() {
defer wg.Done()
defer b.Unregister(nil, cch)
<-cch
defer sub.Unsubscribe()
<-sub.Ch
}()
}
@ -39,7 +36,7 @@ func TestBroadcast(t *testing.T) {
wg.Wait()
}
func TestBroadcastWait(t *testing.T) {
func TestBroadcastSpecificTopic(t *testing.T) {
wg := sync.WaitGroup{}
b := NewBroadcaster(100)
@ -49,14 +46,12 @@ func TestBroadcastWait(t *testing.T) {
for i := 0; i < 5; i++ {
wg.Add(1)
cch := make(chan *protocol.Envelope)
<-b.WaitRegister(nil, cch)
sub := b.Register("abc")
go func() {
defer wg.Done()
<-cch
<-b.WaitUnregister(nil, cch)
<-sub.Ch
sub.Unsubscribe()
}()
}
@ -67,10 +62,31 @@ func TestBroadcastWait(t *testing.T) {
wg.Wait()
}
// check return from channel after Stop and multiple unregister
func TestBroadcastCleanup(t *testing.T) {
b := NewBroadcaster(100)
require.NoError(t, b.Start(context.Background()))
topic := "test"
b.Register(&topic, make(chan *protocol.Envelope))
sub := b.Register("test")
b.Stop()
<-sub.Ch
sub.Unsubscribe()
sub.Unsubscribe()
}
func TestBroadcastUnregisterSub(t *testing.T) {
b := NewBroadcaster(100)
require.NoError(t, b.Start(context.Background()))
subForAll := b.RegisterForAll()
// unregister before submit
specificSub := b.Register("abc")
specificSub.Unsubscribe()
//
env := protocol.NewEnvelope(&pb.WakuMessage{}, utils.GetUnixEpoch(), "abc")
b.Submit(env)
// no message on specific sub
require.Nil(t, <-specificSub.Ch)
// msg on subForAll
require.Equal(t, env, <-subForAll.Ch)
b.Stop() // it automatically unregister/unsubscribe all
require.Equal(t, nil, <-specificSub.Ch)
}

View File

@ -1,34 +1,26 @@
package relay
import (
"sync"
import "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
// Subscription handles the subscrition to a particular pubsub topic
type Subscription struct {
sync.RWMutex
// C is channel used for receiving envelopes
C chan *protocol.Envelope
closed bool
once sync.Once
quit chan struct{}
Unsubscribe func()
Ch <-chan *protocol.Envelope
}
// Unsubscribe will close a subscription from a pubsub topic. Will close the message channel
func (subs *Subscription) Unsubscribe() {
subs.once.Do(func() {
close(subs.quit)
})
func NoopSubscription() Subscription {
ch := make(chan *protocol.Envelope)
close(ch)
return Subscription{
Unsubscribe: func() {},
Ch: ch,
}
}
// IsClosed determine whether a Subscription is still open for receiving messages
func (subs *Subscription) IsClosed() bool {
subs.RLock()
defer subs.RUnlock()
return subs.closed
func ArraySubscription(msgs []*protocol.Envelope) Subscription {
ch := make(chan *protocol.Envelope, len(msgs))
close(ch)
return Subscription{
Unsubscribe: func() {},
Ch: ch,
}
}

View File

@ -16,7 +16,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/hash"
"github.com/waku-org/go-waku/waku/v2/metrics"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
@ -36,7 +35,7 @@ type WakuRelay struct {
log *zap.Logger
bcaster v2.Broadcaster
bcaster Broadcaster
minPeersToPublish int
@ -59,7 +58,7 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
}
// NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
w := new(WakuRelay)
w.timesource = timesource
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
@ -191,7 +190,8 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro
return nil, err
}
w.relaySubs[topic] = sub
w.wg.Add(1)
go w.subscribeToTopic(topic, sub)
w.log.Info("subscribing to topic", zap.String("topic", sub.Topic()))
}
@ -275,30 +275,21 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
// SubscribeToTopic returns a Subscription to receive messages from a pubsub topic
func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) {
sub, err := w.subscribe(topic)
_, err := w.subscribe(topic)
if err != nil {
return nil, err
}
// Create client subscription
subscription := new(Subscription)
subscription.closed = false
subscription.C = make(chan *waku_proto.Envelope, 1024) // To avoid blocking
subscription.quit = make(chan struct{})
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
w.subscriptions[topic] = append(w.subscriptions[topic], subscription)
subscription := NoopSubscription()
if w.bcaster != nil {
w.bcaster.Register(&topic, subscription.C)
subscription = w.bcaster.Register(topic)
}
w.wg.Add(1)
go w.subscribeToTopic(ctx, topic, subscription, sub)
return subscription, nil
go func() {
<-ctx.Done()
subscription.Unsubscribe()
}()
return &subscription, nil
}
// SubscribeToTopic returns a Subscription to receive messages from the default waku pubsub topic
@ -332,34 +323,27 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
msgChannel := make(chan *pubsub.Message, 1024)
go func(msgChannel chan *pubsub.Message) {
defer func() {
if r := recover(); r != nil {
w.log.Debug("recovered msgChannel")
}
}()
go func() {
defer close(msgChannel)
for {
msg, err := sub.Next(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
w.log.Error("getting message from subscription", zap.Error(err))
}
sub.Cancel()
close(msgChannel)
for _, subscription := range w.subscriptions[sub.Topic()] {
subscription.Unsubscribe()
}
return
}
msgChannel <- msg
}
}(msgChannel)
}()
return msgChannel
}
func (w *WakuRelay) subscribeToTopic(userCtx context.Context, pubsubTopic string, subscription *Subscription, sub *pubsub.Subscription) {
func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscription) {
defer w.wg.Done()
ctx, err := tag.New(w.ctx, tag.Insert(metrics.KeyType, "relay"))
@ -371,28 +355,11 @@ func (w *WakuRelay) subscribeToTopic(userCtx context.Context, pubsubTopic string
subChannel := w.nextMessage(w.ctx, sub)
for {
select {
case <-userCtx.Done():
return
case <-ctx.Done():
return
case <-subscription.quit:
func(topic string) {
subscription.Lock()
defer subscription.Unlock()
if subscription.closed {
return
}
subscription.closed = true
if w.bcaster != nil {
<-w.bcaster.WaitUnregister(&topic, subscription.C) // Remove from broadcast list
}
close(subscription.C)
}(pubsubTopic)
// TODO: if there are no more relay subscriptions, close the pubsub subscription
case msg := <-subChannel:
if msg == nil {
case msg, ok := <-subChannel:
if !ok {
return
}
wakuMessage := &pb.WakuMessage{}

View File

@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -45,7 +46,7 @@ func TestResume(t *testing.T) {
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
err = s1.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
defer s1.Stop()
@ -67,7 +68,7 @@ func TestResume(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
defer s2.Stop()
@ -105,7 +106,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
err = s1.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
defer s1.Stop()
@ -119,7 +120,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
defer s2.Stop()
@ -146,7 +147,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
err = s1.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
defer s1.Stop()
@ -160,7 +161,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
defer s2.Stop()

View File

@ -7,7 +7,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
)
@ -49,7 +49,7 @@ type WakuStore struct {
ctx context.Context
cancel context.CancelFunc
timesource timesource.Timesource
MsgC chan *protocol.Envelope
MsgC relay.Subscription
wg *sync.WaitGroup
log *zap.Logger

View File

@ -18,6 +18,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
@ -85,12 +86,11 @@ type MessageProvider interface {
type Store interface {
SetHost(h host.Host)
Start(ctx context.Context) error
Start(context.Context, relay.Subscription) error
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*wpb.WakuMessage, error)
Next(ctx context.Context, r *Result) (*Result, error)
Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error)
MessageChannel() chan *protocol.Envelope
Stop()
}
@ -105,7 +105,7 @@ func (store *WakuStore) SetHost(h host.Host) {
}
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
func (store *WakuStore) Start(ctx context.Context) error {
func (store *WakuStore) Start(ctx context.Context, sub relay.Subscription) error {
if store.started {
return nil
}
@ -123,7 +123,7 @@ func (store *WakuStore) Start(ctx context.Context) error {
store.started = true
store.ctx, store.cancel = context.WithCancel(ctx)
store.MsgC = make(chan *protocol.Envelope, 1024)
store.MsgC = sub
store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest)
@ -158,7 +158,7 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
defer store.wg.Done()
for envelope := range store.MsgC {
for envelope := range store.MsgC.Ch {
go func(env *protocol.Envelope) {
_ = store.storeMessage(env)
}(envelope)
@ -207,10 +207,6 @@ func (store *WakuStore) onRequest(s network.Stream) {
}
}
func (store *WakuStore) MessageChannel() chan *protocol.Envelope {
return store.MsgC
}
// TODO: queryWithAccounting
// Stop closes the store message channel and removes the protocol stream handler
@ -223,9 +219,7 @@ func (store *WakuStore) Stop() {
store.started = false
if store.MsgC != nil {
close(store.MsgC)
}
store.MsgC.Unsubscribe()
if store.msgProvider != nil {
store.msgProvider.Stop() // TODO: StoreProtocol should not stop a message provider

View File

@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -24,10 +25,6 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
defer s1.Stop()
topic1 := "1"
pubsubTopic1 := "topic1"
@ -38,15 +35,18 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
// Simulate a message has been received via relay protocol
s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)
sub := relay.ArraySubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)})
err = s1.Start(ctx, sub)
require.NoError(t, err)
defer s1.Stop()
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
s2.SetHost(host2)
err = s2.Start(ctx)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
defer s2.Stop()
@ -75,10 +75,6 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) {
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
defer s1.Stop()
topic1 := "1"
pubsubTopic1 := "topic1"
@ -91,7 +87,10 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) {
}
// Simulate a message has been received via relay protocol
s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)
sub := relay.ArraySubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)})
err = s1.Start(ctx, sub)
require.NoError(t, err)
defer s1.Stop()
time.Sleep(100 * time.Millisecond)
@ -116,8 +115,6 @@ func TestWakuStoreProtocolNext(t *testing.T) {
db := MemoryDB(t)
s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
topic1 := "1"
pubsubTopic1 := "topic1"
@ -129,11 +126,15 @@ func TestWakuStoreProtocolNext(t *testing.T) {
msg4 := tests.CreateWakuMessage(topic1, now+4)
msg5 := tests.CreateWakuMessage(topic1, now+5)
s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1)
sub := relay.ArraySubscription([]*protocol.Envelope{
protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1),
})
err = s1.Start(ctx, sub)
require.NoError(t, err)
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
@ -144,7 +145,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
defer s2.Stop()
@ -189,8 +190,6 @@ func TestWakuStoreResult(t *testing.T) {
db := MemoryDB(t)
s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
topic1 := "1"
pubsubTopic1 := "topic1"
@ -202,11 +201,15 @@ func TestWakuStoreResult(t *testing.T) {
msg4 := tests.CreateWakuMessage(topic1, now+4)
msg5 := tests.CreateWakuMessage(topic1, now+5)
s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1)
sub := relay.ArraySubscription([]*protocol.Envelope{
protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1),
})
err = s1.Start(ctx, sub)
require.NoError(t, err)
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
@ -217,7 +220,7 @@ func TestWakuStoreResult(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
defer s2.Stop()
@ -277,9 +280,6 @@ func TestWakuStoreProtocolFind(t *testing.T) {
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
defer s1.Stop()
topic1 := "1"
pubsubTopic1 := "topic1"
@ -295,15 +295,20 @@ func TestWakuStoreProtocolFind(t *testing.T) {
msg8 := tests.CreateWakuMessage(topic1, now+8)
msg9 := tests.CreateWakuMessage(topic1, now+9)
s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg6, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg7, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg8, utils.GetUnixEpoch(), pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg9, utils.GetUnixEpoch(), pubsubTopic1)
sub := relay.ArraySubscription([]*protocol.Envelope{
protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg6, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg7, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg8, utils.GetUnixEpoch(), pubsubTopic1),
protocol.NewEnvelope(msg9, utils.GetUnixEpoch(), pubsubTopic1),
})
err = s1.Start(ctx, sub)
require.NoError(t, err)
defer s1.Stop()
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
@ -314,7 +319,7 @@ func TestWakuStoreProtocolFind(t *testing.T) {
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
defer s2.Stop()

View File

@ -135,7 +135,7 @@ func (d *RelayService) postV1Subscriptions(w http.ResponseWriter, r *http.Reques
if err != nil {
d.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
} else {
d.node.Broadcaster().Unregister(&topicToSubscribe, sub.C)
sub.Unsubscribe()
d.messagesMutex.Lock()
d.messages[topic] = []*pb.WakuMessage{}
d.messagesMutex.Unlock()

View File

@ -3,20 +3,20 @@ package rest
import (
"context"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
type Adder func(msg *protocol.Envelope)
type runnerService struct {
broadcaster v2.Broadcaster
ch chan *protocol.Envelope
broadcaster relay.Broadcaster
sub relay.Subscription
cancel context.CancelFunc
adder Adder
}
func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService {
func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService {
return &runnerService{
broadcaster: broadcaster,
adder: adder,
@ -25,15 +25,16 @@ func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService {
func (r *runnerService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.ch = make(chan *protocol.Envelope, 1024)
r.cancel = cancel
r.broadcaster.Register(nil, r.ch)
r.sub = r.broadcaster.RegisterForAll(1024)
for {
select {
case <-ctx.Done():
return
case envelope := <-r.ch:
r.adder(envelope)
case envelope, ok := <-r.sub.Ch:
if !ok {
r.adder(envelope)
}
}
}
}
@ -42,7 +43,6 @@ func (r *runnerService) Stop() {
if r.cancel == nil {
return
}
r.sub.Unsubscribe()
r.cancel()
r.broadcaster.Unregister(nil, r.ch)
close(r.ch)
}

View File

@ -37,9 +37,9 @@ func TestGetMessages(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic1, now+2)
msg3 := tests.CreateWakuMessage(topic1, now+3)
node1.Store().MessageChannel() <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)
node1.Store().MessageChannel() <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)
node1.Store().MessageChannel() <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)
node1.Broadcaster().Submit(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1))
n1HostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", node1.Host().ID().Pretty()))
n1Addr := node1.ListenAddresses()[0].Encapsulate(n1HostInfo)

View File

@ -10,7 +10,6 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
@ -50,7 +49,7 @@ func TestFilterSubscription(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
b := v2.NewBroadcaster(10)
b := relay.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
node := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger())
node.SetHost(host)
@ -60,11 +59,11 @@ func TestFilterSubscription(t *testing.T) {
_, err = node.SubscribeToTopic(context.Background(), testTopic)
require.NoError(t, err)
b2 := v2.NewBroadcaster(10)
b2 := relay.NewBroadcaster(10)
require.NoError(t, b2.Start(context.Background()))
f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), utils.Logger())
f.SetHost(host)
err = f.Start(context.Background())
err = f.Start(context.Background(), relay.NoopSubscription())
require.NoError(t, err)
d := makeFilterService(t, true)

View File

@ -106,11 +106,11 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
if topic == "" {
var sub *relay.Subscription
sub, err = r.node.Relay().Subscribe(ctx)
r.node.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
sub.Unsubscribe()
} else {
var sub *relay.Subscription
sub, err = r.node.Relay().SubscribeToTopic(ctx, topic)
r.node.Broadcaster().Unregister(&topic, sub.C)
sub.Unsubscribe()
}
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))

View File

@ -1,43 +1,32 @@
package rpc
import (
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
type Adder func(msg *protocol.Envelope)
type runnerService struct {
broadcaster v2.Broadcaster
ch chan *protocol.Envelope
quit chan bool
broadcaster relay.Broadcaster
sub relay.Subscription
adder Adder
}
func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService {
func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService {
return &runnerService{
broadcaster: broadcaster,
quit: make(chan bool),
adder: adder,
}
}
func (r *runnerService) Start() {
r.ch = make(chan *protocol.Envelope, 1024)
r.broadcaster.Register(nil, r.ch)
for {
select {
case <-r.quit:
return
case envelope := <-r.ch:
r.adder(envelope)
}
r.broadcaster.RegisterForAll(1024)
for envelope := range r.sub.Ch {
r.adder(envelope)
}
}
func (r *runnerService) Stop() {
r.quit <- true
r.broadcaster.Unregister(nil, r.ch)
close(r.ch)
r.sub.Unsubscribe()
}