diff --git a/examples/filter2/main.go b/examples/filter2/main.go index ec31f036..f28cc462 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -18,6 +18,7 @@ import ( wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" @@ -64,6 +65,7 @@ func main() { node.WithHostAddress(hostAddr1), node.WithWakuRelay(), node.WithWakuFilterFullNode(), + node.WithLightPush(lightpush.WithRateLimiter(1, 1)), ) if err != nil { panic(err) @@ -118,6 +120,7 @@ func main() { go writeLoop(ctx, fullNode) go readLoop(ctx, fullNode) + go writeLightpushLoop(ctx, lightNode) go func() { // Unsubscribe filter after 5 seconds @@ -144,6 +147,24 @@ func randomHex(n int) (string, error) { } func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { + msg := createMessage(wakuNode, msgContent) + + _, err := wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic.String())) + if err != nil { + log.Error("Error sending a message: ", err) + } +} + +func writeLightpush(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { + msg := createMessage(wakuNode, msgContent) + + _, err := wakuNode.Lightpush().Publish(ctx, msg, lightpush.WithPubSubTopic(pubSubTopic.String())) + if err != nil { + log.Error("Error sending a LP message: ", err) + } +} + +func createMessage(wakuNode *node.WakuNode, msgContent string) *pb.WakuMessage { var version uint32 = 0 p := new(payload.Payload) @@ -159,10 +180,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { Timestamp: utils.GetUnixEpoch(wakuNode.Timesource()), } - _, err := wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic.String())) - if err != nil { - log.Error("Error sending a message: ", err) - } + return msg } func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { @@ -172,6 +190,14 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { } } +func writeLightpushLoop(ctx context.Context, wakuNode *node.WakuNode) { + for { + //Change this sleep value to lower than 1 second to get Lightpush rate limiting kick in + time.Sleep(1000 * time.Millisecond) + writeLightpush(ctx, wakuNode, "Hello World via Lightpush!") + } +} + func readLoop(ctx context.Context, wakuNode *node.WakuNode) { pubsubTopic := pubSubTopic.String() sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic)) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index e2f95b7c..1427c2b0 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -291,7 +291,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) - w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log) + w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...) w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 90f67f06..0c175992 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -28,6 +28,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -77,6 +78,7 @@ type WakuNodeParameters struct { enableFilterFullNode bool filterOpts []filter.Option pubsubOpts []pubsub.Option + lightpushOpts []lightpush.Option minRelayPeersToPublish int maxMsgSizeBytes int @@ -458,9 +460,10 @@ func WithMessageProvider(s legacy_store.MessageProvider) WakuNodeOption { } // WithLightPush is a WakuNodeOption that enables the lightpush protocol -func WithLightPush() WakuNodeOption { +func WithLightPush(lightpushOpts ...lightpush.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableLightPush = true + params.lightpushOpts = lightpushOpts return nil } } diff --git a/waku/v2/protocol/lightpush/pb/validation.go b/waku/v2/protocol/lightpush/pb/validation.go index 6477c355..f915c5b7 100644 --- a/waku/v2/protocol/lightpush/pb/validation.go +++ b/waku/v2/protocol/lightpush/pb/validation.go @@ -2,6 +2,10 @@ package pb import "errors" +// This special value for requestId indicates that the message was rate limited +// and we did not retreive the requestId to avoid a potential attack vector. +const REQUESTID_RATE_LIMITED = "N/A" + var ( errMissingRequestID = errors.New("missing RequestId field") errMissingQuery = errors.New("missing Query field") @@ -32,6 +36,9 @@ func (x *PushRpc) ValidateRequest() error { } func (x *PushRpc) ValidateResponse(requestID string) error { + if x.RequestId == REQUESTID_RATE_LIMITED { + return nil + } if x.RequestId == "" { return errMissingRequestID } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index f2b98bb3..19708d11 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -108,6 +108,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream) wakuLP.metrics.RecordError(rateLimitFailure) responseMsg := "exceeds the rate limit" responsePushRPC.Response.Info = &responseMsg + responsePushRPC.RequestId = pb.REQUESTID_RATE_LIMITED wakuLP.reply(stream, responsePushRPC, logger) return }