From 2ae370ca41ac4241183c97d972e8011eede62e87 Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Thu, 18 Nov 2021 10:45:40 +0100 Subject: [PATCH] test: Add test/refactor filter option --- waku/v2/protocol/filter/waku_filter.go | 81 +++++-------------- waku/v2/protocol/filter/waku_filter_option.go | 52 ++++++++++++ .../filter/waku_filter_option_test.go | 34 ++++++++ 3 files changed, 105 insertions(+), 62 deletions(-) create mode 100644 waku/v2/protocol/filter/waku_filter_option.go create mode 100644 waku/v2/protocol/filter/waku_filter_option_test.go diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 161a9782..efafb5ed 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -15,7 +15,6 @@ import ( "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" - "github.com/status-im/go-waku/waku/v2/utils" "go.opencensus.io/stats" "go.opencensus.io/tag" ) @@ -27,13 +26,6 @@ var ( ) type ( - FilterSubscribeParameters struct { - host host.Host - selectedPeer peer.ID - } - - FilterSubscribeOption func(*FilterSubscribeParameters) - Filter struct { PeerID peer.ID Topic string @@ -65,41 +57,32 @@ type ( // NOTE This is just a start, the design of this protocol isn't done yet. It // should be direct payload exchange (a la req-resp), not be coupled with the // relay protocol. - const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") -func WithPeer(p peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { - params.selectedPeer = p +func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter { + ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) + if err != nil { + log.Error(err) } -} -func WithAutomaticPeerSelection() FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1)) - if err == nil { - params.selectedPeer = *p - } else { - log.Info("Error selecting peer: ", err) - } - } -} + wf := new(WakuFilter) + wf.ctx = ctx + wf.MsgC = make(chan *protocol.Envelope) + wf.h = host + wf.isFullNode = isFullNode + wf.filters = NewFilterMap() + wf.subscribers = NewSubscribers() -func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1)) - if err == nil { - params.selectedPeer = *p - } else { - log.Info("Error selecting peer: ", err) - } - } -} + wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) + go wf.FilterListener() -func DefaultOptions() []FilterSubscribeOption { - return []FilterSubscribeOption{ - WithAutomaticPeerSelection(), + if wf.isFullNode { + log.Info("Filter protocol started") + } else { + log.Info("Filter protocol started (only client mode)") } + + return wf } func (wf *WakuFilter) onRequest(s network.Stream) { @@ -148,32 +131,6 @@ func (wf *WakuFilter) onRequest(s network.Stream) { } } -func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter { - ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) - if err != nil { - log.Error(err) - } - - wf := new(WakuFilter) - wf.ctx = ctx - wf.MsgC = make(chan *protocol.Envelope) - wf.h = host - wf.isFullNode = isFullNode - wf.filters = NewFilterMap() - wf.subscribers = NewSubscribers() - - wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) - go wf.FilterListener() - - if wf.isFullNode { - log.Info("Filter protocol started") - } else { - log.Info("Filter protocol started (only client mode)") - } - - return wf -} - func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error { pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}} diff --git a/waku/v2/protocol/filter/waku_filter_option.go b/waku/v2/protocol/filter/waku_filter_option.go new file mode 100644 index 00000000..11c35953 --- /dev/null +++ b/waku/v2/protocol/filter/waku_filter_option.go @@ -0,0 +1,52 @@ +package filter + +import ( + "context" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/status-im/go-waku/waku/v2/utils" +) + +type ( + FilterSubscribeParameters struct { + host host.Host + selectedPeer peer.ID + } + + FilterSubscribeOption func(*FilterSubscribeParameters) +) + +func WithPeer(p peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + params.selectedPeer = p + } +} + +func WithAutomaticPeerSelection() FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + +func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + +func DefaultOptions() []FilterSubscribeOption { + return []FilterSubscribeOption{ + WithAutomaticPeerSelection(), + } +} diff --git a/waku/v2/protocol/filter/waku_filter_option_test.go b/waku/v2/protocol/filter/waku_filter_option_test.go new file mode 100644 index 00000000..c324f749 --- /dev/null +++ b/waku/v2/protocol/filter/waku_filter_option_test.go @@ -0,0 +1,34 @@ +package filter + +import ( + "context" + "crypto/rand" + "testing" + + "github.com/status-im/go-waku/tests" + "github.com/stretchr/testify/require" +) + +func TestFilterOption(t *testing.T) { + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + options := []FilterSubscribeOption{ + WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), + WithAutomaticPeerSelection(), + WithFastestPeerSelection(context.Background()), + } + + params := new(FilterSubscribeParameters) + params.host = host + + for _, opt := range options { + opt(params) + } + + require.Equal(t, host, params.host) + require.NotNil(t, params.selectedPeer) +}