refactor: validate protobuffer for filter (#833)

This commit is contained in:
richΛrd 2023-10-25 08:49:25 -04:00 committed by GitHub
parent 18c16de94e
commit 2701a38b2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 137 additions and 56 deletions

View File

@ -134,6 +134,11 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
stream.Close()
if err = messagePush.Validate(); err != nil {
logger.Warn("received invalid messagepush")
return
}
pubSubTopic := ""
//For now returning failure, this will get addressed with autosharding changes for filter.
if messagePush.PubsubTopic == nil {
@ -178,6 +183,18 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string,
func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {
request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestID),
FilterSubscribeType: reqType,
PubsubTopic: &contentFilter.PubsubTopic,
ContentTopics: contentFilter.ContentTopicsList(),
}
err := request.Validate()
if err != nil {
return err
}
stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
@ -187,13 +204,6 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestID),
FilterSubscribeType: reqType,
PubsubTopic: &contentFilter.PubsubTopic,
ContentTopics: contentFilter.ContentTopicsList(),
}
wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request))
err = writer.WriteMsg(request)
if err != nil {
@ -218,6 +228,12 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
stream.Close()
if err = filterSubscribeResponse.Validate(); err != nil {
wf.metrics.RecordError(decodeRPCFailure)
return err
}
if filterSubscribeResponse.RequestId != request.RequestId {
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
wf.metrics.RecordError(requestIDMismatch)
@ -245,17 +261,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
return nil, err
}
if len(contentFilter.ContentTopics) == 0 {
return nil, errors.New("at least one content topic is required")
}
if slices.Contains[string](contentFilter.ContentTopicsList(), "") {
return nil, errors.New("one or more content topics specified is empty")
}
if len(contentFilter.ContentTopics) > MaxContentTopicsPerRequest {
return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)
}
params := new(FilterSubscribeParameters)
params.log = wf.log
params.host = wf.h

View File

@ -0,0 +1,60 @@
package pb
import (
"errors"
"fmt"
"golang.org/x/exp/slices"
)
const MaxContentTopicsPerRequest = 30
var (
errMissingRequestID = errors.New("missing RequestId field")
errMissingPubsubTopic = errors.New("missing PubsubTopic field")
errNoContentTopics = errors.New("at least one contenttopic should be specified")
errMaxContentTopics = fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)
errEmptyContentTopics = errors.New("one or more content topics specified is empty")
errMissingMessage = errors.New("missing WakuMessage field")
)
func (x *FilterSubscribeRequest) Validate() error {
if x.RequestId == "" {
return errMissingRequestID
}
if x.FilterSubscribeType == FilterSubscribeRequest_SUBSCRIBE || x.FilterSubscribeType == FilterSubscribeRequest_UNSUBSCRIBE {
if x.PubsubTopic == nil || *x.PubsubTopic == "" {
return errMissingPubsubTopic
}
if len(x.ContentTopics) == 0 {
return errNoContentTopics
}
if slices.Contains[string](x.ContentTopics, "") {
return errEmptyContentTopics
}
if len(x.ContentTopics) > MaxContentTopicsPerRequest {
return errMaxContentTopics
}
}
return nil
}
func (x *FilterSubscribeResponse) Validate() error {
if x.RequestId == "" {
return errMissingRequestID
}
return nil
}
func (x *MessagePushV2) Validate() error {
if x.WakuMessage == nil {
return errMissingMessage
}
return x.WakuMessage.Validate()
}

View File

@ -0,0 +1,41 @@
package pb
import (
"testing"
"github.com/stretchr/testify/require"
pb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
func TestValidateRequest(t *testing.T) {
request := &FilterSubscribeRequest{}
require.ErrorIs(t, request.Validate(), errMissingRequestID)
request.RequestId = "test"
request.FilterSubscribeType = FilterSubscribeRequest_SUBSCRIBE
require.ErrorIs(t, request.Validate(), errMissingPubsubTopic)
pubsubTopic := "test"
request.PubsubTopic = &pubsubTopic
require.ErrorIs(t, request.Validate(), errNoContentTopics)
request.ContentTopics = []string{""}
require.ErrorIs(t, request.Validate(), errEmptyContentTopics)
request.ContentTopics[0] = "test"
require.NoError(t, request.Validate())
}
func TestValidateResponse(t *testing.T) {
response := FilterSubscribeResponse{}
require.ErrorIs(t, response.Validate(), errMissingRequestID)
response.RequestId = "test"
require.NoError(t, response.Validate())
}
func TestValidateMessagePush(t *testing.T) {
msgPush := &MessagePushV2{}
require.ErrorIs(t, msgPush.Validate(), errMissingMessage)
msgPush.WakuMessage = &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "test",
}
require.NoError(t, msgPush.Validate())
}

View File

@ -3,7 +3,6 @@ package filter
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"time"
@ -104,15 +103,19 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream
start := time.Now()
switch subscribeRequest.FilterSubscribeType {
case pb.FilterSubscribeRequest_SUBSCRIBE:
wf.subscribe(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_SUBSCRIBER_PING:
wf.ping(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE:
wf.unsubscribe(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL:
wf.unsubscribeAll(ctx, stream, subscribeRequest)
if err := subscribeRequest.Validate(); err != nil {
wf.reply(ctx, stream, subscribeRequest, http.StatusBadRequest, err.Error())
} else {
switch subscribeRequest.FilterSubscribeType {
case pb.FilterSubscribeRequest_SUBSCRIBE:
wf.subscribe(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_SUBSCRIBER_PING:
wf.ping(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE:
wf.unsubscribe(ctx, stream, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL:
wf.unsubscribeAll(ctx, stream, subscribeRequest)
}
}
stream.Close()
@ -157,20 +160,6 @@ func (wf *WakuFilterFullNode) ping(ctx context.Context, stream network.Stream, r
}
func (wf *WakuFilterFullNode) subscribe(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == nil {
wf.reply(ctx, stream, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
}
if len(request.ContentTopics) == 0 {
wf.reply(ctx, stream, request, http.StatusBadRequest, "at least one contenttopic should be specified")
return
}
if len(request.ContentTopics) > MaxContentTopicsPerRequest {
wf.reply(ctx, stream, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
}
if wf.subscriptions.Count() >= wf.maxSubscriptions {
wf.reply(ctx, stream, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions")
return
@ -197,20 +186,6 @@ func (wf *WakuFilterFullNode) subscribe(ctx context.Context, stream network.Stre
}
func (wf *WakuFilterFullNode) unsubscribe(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == nil {
wf.reply(ctx, stream, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
}
if len(request.ContentTopics) == 0 {
wf.reply(ctx, stream, request, http.StatusBadRequest, "at least one contenttopic should be specified")
return
}
if len(request.ContentTopics) > MaxContentTopicsPerRequest {
wf.reply(ctx, stream, request, http.StatusBadRequest, fmt.Sprintf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest))
}
err := wf.subscriptions.Delete(stream.Conn().RemotePeer(), *request.PubsubTopic, request.ContentTopics)
if err != nil {
wf.reply(ctx, stream, request, http.StatusNotFound, peerHasNoSubscription)