diff --git a/waku/v2/protocol/filterv2/client.go b/waku/v2/protocol/filterv2/client.go index e3d444c4..d16ee53e 100644 --- a/waku/v2/protocol/filterv2/client.go +++ b/waku/v2/protocol/filterv2/client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "errors" + "fmt" "math" "net/http" "sync" @@ -188,6 +189,10 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont return nil, errors.New("at least one content topic is required") } + if len(contentFilter.ContentTopics) > MaxContentTopicsPerRequest { + return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest) + } + params := new(FilterSubscribeParameters) params.log = wf.log params.host = wf.h @@ -255,6 +260,10 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co return nil, errors.New("at least one content topic is required") } + if len(contentFilter.ContentTopics) > MaxContentTopicsPerRequest { + return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest) + } + params, err := wf.getUnsubscribeParameters(opts...) if err != nil { return nil, err diff --git a/waku/v2/protocol/filterv2/common.go b/waku/v2/protocol/filterv2/common.go index b087fd9a..67a7ca65 100644 --- a/waku/v2/protocol/filterv2/common.go +++ b/waku/v2/protocol/filterv2/common.go @@ -1,9 +1,14 @@ package filterv2 -import "fmt" +import ( + "fmt" + "time" +) const DefaultMaxSubscriptions = 1000 const MaxCriteriaPerSubscription = 1000 +const MaxContentTopicsPerRequest = 30 +const MessagePushTimeout = 20 * time.Second type FilterError struct { Code int diff --git a/waku/v2/protocol/filterv2/server.go b/waku/v2/protocol/filterv2/server.go index 23d00f85..61d102b2 100644 --- a/waku/v2/protocol/filterv2/server.go +++ b/waku/v2/protocol/filterv2/server.go @@ -3,6 +3,7 @@ package filterv2 import ( "context" "errors" + "fmt" "math" "net/http" "sync" @@ -157,6 +158,10 @@ func (wf *WakuFilterFull) subscribe(s network.Stream, logger *zap.Logger, reques return } + if len(request.ContentTopics) > MaxContentTopicsPerRequest { + reply(s, logger, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)) + } + if wf.subscriptions.Count() >= wf.maxSubscriptions { reply(s, logger, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions") return @@ -192,6 +197,10 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ return } + if len(request.ContentTopics) > MaxContentTopicsPerRequest { + reply(s, logger, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)) + } + err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics) if err != nil { reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription) @@ -254,6 +263,9 @@ func (wf *WakuFilterFull) pushMessage(ctx context.Context, peerID peer.ID, env * WakuMessage: env.Message(), } + ctx, cancel := context.WithTimeout(ctx, MessagePushTimeout) + defer cancel() + // We connect first so dns4 addresses are resolved (NewStream does not do it) err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peerID)) if err != nil {