fix: limit number of subscribers and criteria

This commit is contained in:
Richard Ramos 2023-03-04 13:26:47 -04:00 committed by RichΛrd
parent 45cc06a683
commit 4b52983fc4
9 changed files with 76 additions and 17 deletions

View File

@ -41,6 +41,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
@ -164,7 +165,7 @@ func Execute(options Options) {
if !options.Filter.DisableFullNode { if !options.Filter.DisableFullNode {
nodeOpts = append(nodeOpts, node.WithWakuFilterV2LightNode()) nodeOpts = append(nodeOpts, node.WithWakuFilterV2LightNode())
} else { } else {
nodeOpts = append(nodeOpts, node.WithWakuFilterV2FullNode(filter.WithTimeout(options.Filter.Timeout))) nodeOpts = append(nodeOpts, node.WithWakuFilterV2FullNode(filterv2.WithTimeout(options.Filter.Timeout)))
} }
} else { } else {
nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode, filter.WithTimeout(options.Filter.Timeout))) nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode, filter.WithTimeout(options.Filter.Timeout)))

View File

@ -214,7 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...) w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...)
w.filterV2Light = filterv2.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log) w.filterV2Light = filterv2.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log) w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log)

View File

@ -26,6 +26,7 @@ import (
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net" manet "github.com/multiformats/go-multiaddr/net"
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
@ -68,6 +69,7 @@ type WakuNodeParameters struct {
enableFilterV2LightNode bool enableFilterV2LightNode bool
enableFilterV2FullNode bool enableFilterV2FullNode bool
filterOpts []filter.Option filterOpts []filter.Option
filterV2Opts []filterv2.Option
wOpts []pubsub.Option wOpts []pubsub.Option
minRelayPeersToPublish int minRelayPeersToPublish int
@ -335,10 +337,10 @@ func WithWakuFilterV2LightNode() WakuNodeOption {
// WithWakuFilterV2FullNode enables the Waku Filter V2 protocol full node functionality. // WithWakuFilterV2FullNode enables the Waku Filter V2 protocol full node functionality.
// This WakuNodeOption accepts a list of WakuFilter options to setup the protocol // This WakuNodeOption accepts a list of WakuFilter options to setup the protocol
func WithWakuFilterV2FullNode(filterOpts ...filter.Option) WakuNodeOption { func WithWakuFilterV2FullNode(filterOpts ...filterv2.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableFilterV2FullNode = true params.enableFilterV2FullNode = true
params.filterOpts = filterOpts params.filterV2Opts = filterOpts
return nil return nil
} }
} }

View File

@ -0,0 +1,4 @@
package filterv2
const DefaultMaxSubscriptions = 1000
const MaxCriteriaPerSubscription = 1000

View File

@ -12,7 +12,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/tests"
v2 "github.com/waku-org/go-waku/waku/v2" v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
@ -162,7 +161,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
defer node2.Stop() defer node2.Stop()
defer sub2.Unsubscribe() defer sub2.Unsubscribe()
node2Filter := NewWakuFilterFullnode(host2, v2.NewBroadcaster(10), timesource.NewDefaultClock(), utils.Logger(), filter.WithTimeout(5*time.Second)) node2Filter := NewWakuFilterFullnode(host2, v2.NewBroadcaster(10), timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second))
err := node2Filter.Start(ctx) err := node2Filter.Start(ctx)
require.NoError(t, err) require.NoError(t, err)

View File

@ -2,6 +2,7 @@ package filterv2
import ( import (
"context" "context"
"time"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
@ -25,10 +26,23 @@ type (
log *zap.Logger log *zap.Logger
} }
FilterParameters struct {
Timeout time.Duration
MaxSubscribers int
}
Option func(*FilterParameters)
FilterSubscribeOption func(*FilterSubscribeParameters) FilterSubscribeOption func(*FilterSubscribeParameters)
FilterUnsubscribeOption func(*FilterUnsubscribeParameters) FilterUnsubscribeOption func(*FilterUnsubscribeParameters)
) )
func WithTimeout(timeout time.Duration) Option {
return func(params *FilterParameters) {
params.Timeout = timeout
}
}
func WithPeer(p peer.ID) FilterSubscribeOption { func WithPeer(p peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) { return func(params *FilterSubscribeParameters) {
params.selectedPeer = p params.selectedPeer = p
@ -112,3 +126,16 @@ func DefaultUnsubscribeOptions() []FilterUnsubscribeOption {
AutomaticRequestId(), AutomaticRequestId(),
} }
} }
func WithMaxSubscribers(maxSubscribers int) Option {
return func(params *FilterParameters) {
params.MaxSubscribers = maxSubscribers
}
}
func DefaultOptions() []Option {
return []Option{
WithTimeout(24 * time.Hour),
WithMaxSubscribers(DefaultMaxSubscriptions),
}
}

View File

@ -16,7 +16,6 @@ import (
v2 "github.com/waku-org/go-waku/waku/v2" v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2/pb" "github.com/waku-org/go-waku/waku/v2/protocol/filterv2/pb"
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/tag" "go.opencensus.io/tag"
@ -27,6 +26,8 @@ import (
// allow filter clients to subscribe, modify, refresh and unsubscribe a desired set of filter criteria // allow filter clients to subscribe, modify, refresh and unsubscribe a desired set of filter criteria
const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1") const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1")
const peerHasNoSubscription = "peer has no subscriptions"
type ( type (
WakuFilterFull struct { WakuFilterFull struct {
cancel context.CancelFunc cancel context.CancelFunc
@ -36,16 +37,18 @@ type (
log *zap.Logger log *zap.Logger
subscriptions *SubscribersMap subscriptions *SubscribersMap
maxSubscriptions int
} }
) )
// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options // NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...filter.Option) *WakuFilterFull { func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFull {
wf := new(WakuFilterFull) wf := new(WakuFilterFull)
wf.log = log.Named("filterv2-fullnode") wf.log = log.Named("filterv2-fullnode")
params := new(filter.FilterParameters) params := new(FilterParameters)
optList := filter.DefaultOptions() optList := DefaultOptions()
optList = append(optList, opts...) optList = append(optList, opts...)
for _, opt := range optList { for _, opt := range optList {
opt(params) opt(params)
@ -54,6 +57,7 @@ func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesourc
wf.wg = &sync.WaitGroup{} wf.wg = &sync.WaitGroup{}
wf.h = host wf.h = host
wf.subscriptions = NewSubscribersMap(params.Timeout) wf.subscriptions = NewSubscribersMap(params.Timeout)
wf.maxSubscriptions = params.MaxSubscribers
return wf return wf
} }
@ -138,7 +142,7 @@ func (wf *WakuFilterFull) ping(s network.Stream, logger *zap.Logger, request *pb
if exists { if exists {
reply(s, logger, request, http.StatusOK) reply(s, logger, request, http.StatusOK)
} else { } else {
reply(s, logger, request, http.StatusNotFound) reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
} }
} }
@ -153,8 +157,25 @@ func (wf *WakuFilterFull) subscribe(s network.Stream, logger *zap.Logger, reques
return return
} }
if wf.subscriptions.Count() >= wf.maxSubscriptions {
reply(s, logger, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions")
return
}
peerID := s.Conn().RemotePeer() peerID := s.Conn().RemotePeer()
if totalSubs, exists := wf.subscriptions.Get(peerID); exists {
ctTotal := 0
for _, contentTopicSet := range totalSubs {
ctTotal += len(contentTopicSet)
}
if ctTotal+len(request.ContentTopics) > MaxCriteriaPerSubscription {
reply(s, logger, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria")
return
}
}
wf.subscriptions.Set(peerID, request.PubsubTopic, request.ContentTopics) wf.subscriptions.Set(peerID, request.PubsubTopic, request.ContentTopics)
reply(s, logger, request, http.StatusOK) reply(s, logger, request, http.StatusOK)
@ -173,7 +194,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ
err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics) err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics)
if err != nil { if err != nil {
reply(s, logger, request, http.StatusNotFound) reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
} else { } else {
reply(s, logger, request, http.StatusOK) reply(s, logger, request, http.StatusOK)
} }
@ -182,7 +203,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ
func (wf *WakuFilterFull) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) { func (wf *WakuFilterFull) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer()) err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
if err != nil { if err != nil {
reply(s, logger, request, http.StatusNotFound) reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
} else { } else {
reply(s, logger, request, http.StatusOK) reply(s, logger, request, http.StatusOK)
} }

View File

@ -161,6 +161,13 @@ func (sub *SubscribersMap) RemoveAll() {
sub.items = make(map[peer.ID]PubsubTopics) sub.items = make(map[peer.ID]PubsubTopics)
} }
func (sub *SubscribersMap) Count() int {
sub.RLock()
defer sub.RUnlock()
return len(sub.items)
}
func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID { func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID {
c := make(chan peer.ID) c := make(chan peer.ID)

View File

@ -164,10 +164,8 @@ func (d *RelayService) getV1Messages(w http.ResponseWriter, r *http.Request) {
return return
} }
var response []*pb.WakuMessage response := d.messages[topic]
for i := range d.messages[topic] {
response = append(response, d.messages[topic][i])
}
d.messages[topic] = []*pb.WakuMessage{} d.messages[topic] = []*pb.WakuMessage{}
writeErrOrResponse(w, nil, response) writeErrOrResponse(w, nil, response)
} }