From 4b52983fc483ffc2e90b8ac3ab88dc05d44759fe Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Sat, 4 Mar 2023 13:26:47 -0400 Subject: [PATCH] fix: limit number of subscribers and criteria --- waku/node.go | 3 +- waku/v2/node/wakunode2.go | 2 +- waku/v2/node/wakuoptions.go | 6 ++-- waku/v2/protocol/filterv2/common.go | 4 +++ waku/v2/protocol/filterv2/filter_test.go | 3 +- waku/v2/protocol/filterv2/options.go | 27 +++++++++++++++ waku/v2/protocol/filterv2/server.go | 35 ++++++++++++++++---- waku/v2/protocol/filterv2/subscribers_map.go | 7 ++++ waku/v2/rest/relay.go | 6 ++-- 9 files changed, 76 insertions(+), 17 deletions(-) create mode 100644 waku/v2/protocol/filterv2/common.go diff --git a/waku/node.go b/waku/node.go index a58cbe6b..74cb447c 100644 --- a/waku/node.go +++ b/waku/node.go @@ -41,6 +41,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/filterv2" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -164,7 +165,7 @@ func Execute(options Options) { if !options.Filter.DisableFullNode { nodeOpts = append(nodeOpts, node.WithWakuFilterV2LightNode()) } else { - nodeOpts = append(nodeOpts, node.WithWakuFilterV2FullNode(filter.WithTimeout(options.Filter.Timeout))) + nodeOpts = append(nodeOpts, node.WithWakuFilterV2FullNode(filterv2.WithTimeout(options.Filter.Timeout))) } } else { nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode, filter.WithTimeout(options.Filter.Timeout))) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 5983c474..3e9671ed 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -214,7 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) - w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...) + w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...) w.filterV2Light = filterv2.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log) w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index ae59ec9e..c164056e 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -26,6 +26,7 @@ import ( ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/filterv2" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -68,6 +69,7 @@ type WakuNodeParameters struct { enableFilterV2LightNode bool enableFilterV2FullNode bool filterOpts []filter.Option + filterV2Opts []filterv2.Option wOpts []pubsub.Option minRelayPeersToPublish int @@ -335,10 +337,10 @@ func WithWakuFilterV2LightNode() WakuNodeOption { // WithWakuFilterV2FullNode enables the Waku Filter V2 protocol full node functionality. // This WakuNodeOption accepts a list of WakuFilter options to setup the protocol -func WithWakuFilterV2FullNode(filterOpts ...filter.Option) WakuNodeOption { +func WithWakuFilterV2FullNode(filterOpts ...filterv2.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableFilterV2FullNode = true - params.filterOpts = filterOpts + params.filterV2Opts = filterOpts return nil } } diff --git a/waku/v2/protocol/filterv2/common.go b/waku/v2/protocol/filterv2/common.go new file mode 100644 index 00000000..50046f63 --- /dev/null +++ b/waku/v2/protocol/filterv2/common.go @@ -0,0 +1,4 @@ +package filterv2 + +const DefaultMaxSubscriptions = 1000 +const MaxCriteriaPerSubscription = 1000 diff --git a/waku/v2/protocol/filterv2/filter_test.go b/waku/v2/protocol/filterv2/filter_test.go index 3fd573a9..66acdab9 100644 --- a/waku/v2/protocol/filterv2/filter_test.go +++ b/waku/v2/protocol/filterv2/filter_test.go @@ -12,7 +12,6 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" v2 "github.com/waku-org/go-waku/waku/v2" - "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" @@ -162,7 +161,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter := NewWakuFilterFullnode(host2, v2.NewBroadcaster(10), timesource.NewDefaultClock(), utils.Logger(), filter.WithTimeout(5*time.Second)) + node2Filter := NewWakuFilterFullnode(host2, v2.NewBroadcaster(10), timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second)) err := node2Filter.Start(ctx) require.NoError(t, err) diff --git a/waku/v2/protocol/filterv2/options.go b/waku/v2/protocol/filterv2/options.go index 56d582ac..104b84a8 100644 --- a/waku/v2/protocol/filterv2/options.go +++ b/waku/v2/protocol/filterv2/options.go @@ -2,6 +2,7 @@ package filterv2 import ( "context" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -25,10 +26,23 @@ type ( log *zap.Logger } + FilterParameters struct { + Timeout time.Duration + MaxSubscribers int + } + + Option func(*FilterParameters) + FilterSubscribeOption func(*FilterSubscribeParameters) FilterUnsubscribeOption func(*FilterUnsubscribeParameters) ) +func WithTimeout(timeout time.Duration) Option { + return func(params *FilterParameters) { + params.Timeout = timeout + } +} + func WithPeer(p peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { params.selectedPeer = p @@ -112,3 +126,16 @@ func DefaultUnsubscribeOptions() []FilterUnsubscribeOption { AutomaticRequestId(), } } + +func WithMaxSubscribers(maxSubscribers int) Option { + return func(params *FilterParameters) { + params.MaxSubscribers = maxSubscribers + } +} + +func DefaultOptions() []Option { + return []Option{ + WithTimeout(24 * time.Hour), + WithMaxSubscribers(DefaultMaxSubscriptions), + } +} diff --git a/waku/v2/protocol/filterv2/server.go b/waku/v2/protocol/filterv2/server.go index 761b52e0..23d00f85 100644 --- a/waku/v2/protocol/filterv2/server.go +++ b/waku/v2/protocol/filterv2/server.go @@ -16,7 +16,6 @@ import ( v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filterv2/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "go.opencensus.io/tag" @@ -27,6 +26,8 @@ import ( // allow filter clients to subscribe, modify, refresh and unsubscribe a desired set of filter criteria const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1") +const peerHasNoSubscription = "peer has no subscriptions" + type ( WakuFilterFull struct { cancel context.CancelFunc @@ -36,16 +37,18 @@ type ( log *zap.Logger subscriptions *SubscribersMap + + maxSubscriptions int } ) // NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...filter.Option) *WakuFilterFull { +func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFull { wf := new(WakuFilterFull) wf.log = log.Named("filterv2-fullnode") - params := new(filter.FilterParameters) - optList := filter.DefaultOptions() + params := new(FilterParameters) + optList := DefaultOptions() optList = append(optList, opts...) for _, opt := range optList { opt(params) @@ -54,6 +57,7 @@ func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesourc wf.wg = &sync.WaitGroup{} wf.h = host wf.subscriptions = NewSubscribersMap(params.Timeout) + wf.maxSubscriptions = params.MaxSubscribers return wf } @@ -138,7 +142,7 @@ func (wf *WakuFilterFull) ping(s network.Stream, logger *zap.Logger, request *pb if exists { reply(s, logger, request, http.StatusOK) } else { - reply(s, logger, request, http.StatusNotFound) + reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription) } } @@ -153,8 +157,25 @@ func (wf *WakuFilterFull) subscribe(s network.Stream, logger *zap.Logger, reques return } + if wf.subscriptions.Count() >= wf.maxSubscriptions { + reply(s, logger, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions") + return + } + peerID := s.Conn().RemotePeer() + if totalSubs, exists := wf.subscriptions.Get(peerID); exists { + ctTotal := 0 + for _, contentTopicSet := range totalSubs { + ctTotal += len(contentTopicSet) + } + + if ctTotal+len(request.ContentTopics) > MaxCriteriaPerSubscription { + reply(s, logger, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria") + return + } + } + wf.subscriptions.Set(peerID, request.PubsubTopic, request.ContentTopics) reply(s, logger, request, http.StatusOK) @@ -173,7 +194,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics) if err != nil { - reply(s, logger, request, http.StatusNotFound) + reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription) } else { reply(s, logger, request, http.StatusOK) } @@ -182,7 +203,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ func (wf *WakuFilterFull) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer()) if err != nil { - reply(s, logger, request, http.StatusNotFound) + reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription) } else { reply(s, logger, request, http.StatusOK) } diff --git a/waku/v2/protocol/filterv2/subscribers_map.go b/waku/v2/protocol/filterv2/subscribers_map.go index 767056f5..04e65cb3 100644 --- a/waku/v2/protocol/filterv2/subscribers_map.go +++ b/waku/v2/protocol/filterv2/subscribers_map.go @@ -161,6 +161,13 @@ func (sub *SubscribersMap) RemoveAll() { sub.items = make(map[peer.ID]PubsubTopics) } +func (sub *SubscribersMap) Count() int { + sub.RLock() + defer sub.RUnlock() + + return len(sub.items) +} + func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID { c := make(chan peer.ID) diff --git a/waku/v2/rest/relay.go b/waku/v2/rest/relay.go index 6273621e..12d81be6 100644 --- a/waku/v2/rest/relay.go +++ b/waku/v2/rest/relay.go @@ -164,10 +164,8 @@ func (d *RelayService) getV1Messages(w http.ResponseWriter, r *http.Request) { return } - var response []*pb.WakuMessage - for i := range d.messages[topic] { - response = append(response, d.messages[topic][i]) - } + response := d.messages[topic] + d.messages[topic] = []*pb.WakuMessage{} writeErrOrResponse(w, nil, response) }