From a66bf8a8938293ab4c12c0352dd4208b7d9f6d1d Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 8 Dec 2022 23:09:15 -0400 Subject: [PATCH] fix: filter can submit broadcaster envelopes --- waku/v2/protocol/filter/filter_map.go | 46 +++++++++++++-------- waku/v2/protocol/filter/filter_map_test.go | 4 +- waku/v2/protocol/filter/waku_filter.go | 13 ++++-- waku/v2/protocol/filter/waku_filter_test.go | 9 ++-- waku/v2/rpc/filter_test.go | 28 +++++++++---- 5 files changed, 65 insertions(+), 35 deletions(-) diff --git a/waku/v2/protocol/filter/filter_map.go b/waku/v2/protocol/filter/filter_map.go index d15823a7..dff3cea3 100644 --- a/waku/v2/protocol/filter/filter_map.go +++ b/waku/v2/protocol/filter/filter_map.go @@ -3,14 +3,17 @@ package filter import ( "sync" + v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/go-waku/waku/v2/timesource" ) type FilterMap struct { sync.RWMutex - items map[string]Filter + timesource timesource.Timesource + items map[string]Filter + broadcaster v2.Broadcaster } type FilterMapItem struct { @@ -18,9 +21,11 @@ type FilterMapItem struct { Value Filter } -func NewFilterMap() *FilterMap { +func NewFilterMap(broadcaster v2.Broadcaster, timesource timesource.Timesource) *FilterMap { return &FilterMap{ - items: make(map[string]Filter), + timesource: timesource, + items: make(map[string]Filter), + broadcaster: broadcaster, } } @@ -79,24 +84,29 @@ func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestId string) { fm.RLock() defer fm.RUnlock() - for key, filter := range fm.items { - envelope := protocol.NewEnvelope(msg, utils.GetUnixEpoch(), filter.Topic) - + filter, ok := fm.items[requestId] + if !ok { // We do this because the key for the filter is set to the requestId received from the filter protocol. // This means we do not need to check the content filter explicitly as all MessagePushs already contain // the requestId of the coresponding filter. - if requestId != "" && requestId == key { - filter.Chan <- envelope - continue - } + return + } - // TODO: In case of no topics we should either trigger here for all messages, - // or we should not allow such filter to exist in the first place. - for _, contentTopic := range filter.ContentFilters { - if msg.ContentTopic == contentTopic { - filter.Chan <- envelope - break - } + envelope := protocol.NewEnvelope(msg, fm.timesource.Now().UnixNano(), filter.Topic) + + // Broadcasting message so it's stored + fm.broadcaster.Submit(envelope) + + if msg.ContentTopic == "" { + filter.Chan <- envelope + } + + // TODO: In case of no topics we should either trigger here for all messages, + // or we should not allow such filter to exist in the first place. + for _, contentTopic := range filter.ContentFilters { + if msg.ContentTopic == contentTopic { + filter.Chan <- envelope + break } } } diff --git a/waku/v2/protocol/filter/filter_map_test.go b/waku/v2/protocol/filter/filter_map_test.go index daf8be73..60ba1da8 100644 --- a/waku/v2/protocol/filter/filter_map_test.go +++ b/waku/v2/protocol/filter/filter_map_test.go @@ -4,11 +4,13 @@ import ( "testing" "github.com/stretchr/testify/require" + v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/timesource" ) func TestFilterMap(t *testing.T) { - fmap := NewFilterMap() + fmap := NewFilterMap(v2.NewBroadcaster(100), timesource.NewDefaultClock()) filter := Filter{ PeerID: "id", diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index a606368d..cef1c403 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -13,9 +13,12 @@ import ( libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-msgio/protoio" "github.com/waku-org/go-waku/logging" + v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" "go.opencensus.io/stats" "go.opencensus.io/tag" "go.uber.org/zap" @@ -62,7 +65,7 @@ type ( const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") // NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.Logger, opts ...Option) (*WakuFilter, error) { +func NewWakuFilter(ctx context.Context, host host.Host, broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) (*WakuFilter, error) { wf := new(WakuFilter) wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode)) @@ -84,7 +87,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *za wf.MsgC = make(chan *protocol.Envelope, 1024) wf.h = host wf.isFullNode = isFullNode - wf.filters = NewFilterMap() + wf.filters = NewFilterMap(broadcaster, timesource) wf.subscribers = NewSubscribers(params.timeout) wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) @@ -126,6 +129,10 @@ func (wf *WakuFilter) onRequest(s network.Stream) { // This is a filter request coming from a light node. if filterRPCRequest.Request.Subscribe { subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request} + if subscriber.filter.Topic == "" { // @TODO: review if empty topic is possible + subscriber.filter.Topic = relay.DefaultWakuTopic + } + len := wf.subscribers.Append(subscriber) logger.Info("adding subscriber") @@ -192,7 +199,7 @@ func (wf *WakuFilter) filterListener() { for subscriber := range wf.subscribers.Items(&(msg.ContentTopic)) { logger := logger.With(logging.HostID("subscriber", subscriber.peer)) subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines - if subscriber.filter.Topic != "" && subscriber.filter.Topic != pubsubTopic { + if subscriber.filter.Topic != pubsubTopic { logger.Info("pubsub topic mismatch", zap.String("subscriberTopic", subscriber.filter.Topic), zap.String("messageTopic", pubsubTopic)) diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index 65ef6b79..a504c5cb 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -13,6 +13,7 @@ import ( "github.com/waku-org/go-waku/tests" v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -23,7 +24,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, utils.Logger()) + relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, timesource.NewDefaultClock(), utils.Logger()) require.NoError(t, err) sub, err := relay.SubscribeToTopic(context.Background(), topic) @@ -39,7 +40,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - filter, _ := NewWakuFilter(context.Background(), host, false, utils.Logger()) + filter, _ := NewWakuFilter(context.Background(), host, v2.NewBroadcaster(10), false, timesource.NewDefaultClock(), utils.Logger()) return filter, host } @@ -69,7 +70,7 @@ func TestWakuFilter(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger()) + node2Filter, _ := NewWakuFilter(ctx, host2, broadcaster, true, timesource.NewDefaultClock(), utils.Logger()) broadcaster.Register(&testTopic, node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) @@ -154,7 +155,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger(), WithTimeout(3*time.Second)) + node2Filter, _ := NewWakuFilter(ctx, host2, v2.NewBroadcaster(10), true, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(3*time.Second)) broadcaster.Register(&testTopic, node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 0e4faac7..37cabc81 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -15,19 +15,29 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) var testTopic = "test" -func makeFilterService(t *testing.T) *FilterService { - n, err := node.New(context.Background(), node.WithWakuFilter(true), node.WithWakuRelay()) +func makeFilterService(t *testing.T, isFullNode bool) *FilterService { + var nodeOpts []node.WakuNodeOption + + nodeOpts = append(nodeOpts, node.WithWakuFilter(isFullNode)) + if isFullNode { + nodeOpts = append(nodeOpts, node.WithWakuRelay()) + } + + n, err := node.New(context.Background(), nodeOpts...) require.NoError(t, err) err = n.Start() require.NoError(t, err) - _, err = n.Relay().SubscribeToTopic(context.Background(), testTopic) - require.NoError(t, err) + if isFullNode { + _, err = n.Relay().SubscribeToTopic(context.Background(), testTopic) + require.NoError(t, err) + } return NewFilterService(n, 30, utils.Logger()) } @@ -39,15 +49,15 @@ func TestFilterSubscription(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger()) + node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, timesource.NewDefaultClock(), utils.Logger()) require.NoError(t, err) _, err = node.SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - _, _ = filter.NewWakuFilter(context.Background(), host, false, utils.Logger()) + _, _ = filter.NewWakuFilter(context.Background(), host, v2.NewBroadcaster(10), false, timesource.NewDefaultClock(), utils.Logger()) - d := makeFilterService(t) + d := makeFilterService(t, true) defer d.node.Stop() hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().Pretty())) @@ -83,10 +93,10 @@ func TestFilterSubscription(t *testing.T) { } func TestFilterGetV1Messages(t *testing.T) { - serviceA := makeFilterService(t) + serviceA := makeFilterService(t, true) var reply SuccessReply - serviceB := makeFilterService(t) + serviceB := makeFilterService(t, false) go serviceB.Start() defer serviceB.Stop()