diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index 5a465acb..f8cd2bb3 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -324,7 +324,7 @@ func (c *Chat) publish(ctx context.Context, message string) error { } if c.options.LightPush.Enable { - lightOpt := []lightpush.Option{lightpush.WithDefaultPubsubTopic()} + lightOpt := []lightpush.RequestOption{lightpush.WithDefaultPubsubTopic()} var peerID peer.ID peerID, err = options.LightPush.NodePeerID() if err != nil { diff --git a/library/lightpush.go b/library/lightpush.go index 39ebd0f4..1714b2d6 100644 --- a/library/lightpush.go +++ b/library/lightpush.go @@ -26,7 +26,7 @@ func lightpushPublish(instance *WakuInstance, msg *pb.WakuMessage, pubsubTopic s ctx = instance.ctx } - var lpOptions []lightpush.Option + var lpOptions []lightpush.RequestOption if peerID != "" { p, err := peer.Decode(peerID) if err != nil { diff --git a/waku/v2/protocol/lightpush/metrics.go b/waku/v2/protocol/lightpush/metrics.go index 8938bcd7..8769ba72 100644 --- a/waku/v2/protocol/lightpush/metrics.go +++ b/waku/v2/protocol/lightpush/metrics.go @@ -53,6 +53,7 @@ var ( writeRequestFailure metricsErrCategory = "write_request_failure" writeResponseFailure metricsErrCategory = "write_response_failure" dialFailure metricsErrCategory = "dial_failure" + rateLimitFailure metricsErrCategory = "ratelimit_failure" messagePushFailure metricsErrCategory = "message_push_failure" requestBodyFailure metricsErrCategory = "request_failure" responseBodyFailure metricsErrCategory = "response_body_failure" diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index aef39113..7bf2c7e7 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -22,6 +22,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" + "golang.org/x/time/rate" ) // LightPushID_v20beta1 is the current Waku LightPush protocol identifier @@ -37,6 +38,7 @@ var ( type WakuLightPush struct { h host.Host relay *relay.WakuRelay + limiter *rate.Limiter cancel context.CancelFunc pm *peermanager.PeerManager metrics Metrics @@ -47,13 +49,20 @@ type WakuLightPush struct { // NewWakuLightPush returns a new instance of Waku Lightpush struct // Takes an optional peermanager if WakuLightPush is being created along with WakuNode. // If using libp2p host, then pass peermanager as nil -func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger) *WakuLightPush { +func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) *WakuLightPush { wakuLP := new(WakuLightPush) wakuLP.relay = relay wakuLP.log = log.Named("lightpush") wakuLP.pm = pm wakuLP.metrics = newMetrics(reg) + params := &LightpushParameters{} + for _, opt := range opts { + opt(params) + } + + wakuLP.limiter = params.limiter + if pm != nil { wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField) } @@ -91,6 +100,18 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream) logger := wakuLP.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) requestPushRPC := &pb.PushRpc{} + responsePushRPC := &pb.PushRpc{ + Response: &pb.PushResponse{}, + } + + if wakuLP.limiter != nil && !wakuLP.limiter.Allow() { + wakuLP.metrics.RecordError(rateLimitFailure) + responseMsg := "exceeds the rate limit" + responsePushRPC.Response.Info = &responseMsg + wakuLP.reply(stream, responsePushRPC, logger) + return + } + reader := pbio.NewDelimitedReader(stream, math.MaxInt32) err := reader.ReadMsg(requestPushRPC) @@ -103,11 +124,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream) return } - responsePushRPC := &pb.PushRpc{ - RequestId: requestPushRPC.RequestId, - Response: &pb.PushResponse{}, - } - + responsePushRPC.RequestId = requestPushRPC.RequestId if err := requestPushRPC.ValidateRequest(); err != nil { responseMsg := err.Error() responsePushRPC.Response.Info = &responseMsg @@ -170,7 +187,7 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu } // request sends a message via lightPush protocol to either a specified peer or peer that is selected. -func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushParameters) (*pb.PushResponse, error) { +func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters) (*pb.PushResponse, error) { if params == nil { return nil, errors.New("lightpush params are mandatory") } @@ -233,8 +250,8 @@ func (wakuLP *WakuLightPush) Stop() { wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) } -func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMessage, opts ...Option) (*lightPushParameters, error) { - params := new(lightPushParameters) +func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMessage, opts ...RequestOption) (*lightPushRequestParameters, error) { + params := new(lightPushRequestParameters) params.host = wakuLP.h params.log = wakuLP.log params.pm = wakuLP.pm @@ -294,7 +311,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe // Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the // contentTopic) via lightpush protocol. If auto-sharding is not to be used, then the // `WithPubSubTopic` option should be provided to publish the message to an specific pubSubTopic -func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { +func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...RequestOption) ([]byte, error) { if message == nil { return nil, errors.New("message can't be null") } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index caf87c0c..6ec25899 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -10,9 +10,23 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" + "golang.org/x/time/rate" ) -type lightPushParameters struct { +type LightpushParameters struct { + limiter *rate.Limiter +} + +type Option func(*LightpushParameters) + +// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol +func WithRateLimiter(r rate.Limit, b int) Option { + return func(params *LightpushParameters) { + params.limiter = rate.NewLimiter(r, b) + } +} + +type lightPushRequestParameters struct { host host.Host peerAddr multiaddr.Multiaddr selectedPeer peer.ID @@ -24,12 +38,12 @@ type lightPushParameters struct { pubsubTopic string } -// Option is the type of options accepted when performing LightPush protocol requests -type Option func(*lightPushParameters) error +// RequestOption is the type of options accepted when performing LightPush protocol requests +type RequestOption func(*lightPushRequestParameters) error // WithPeer is an option used to specify the peerID to push a waku message to -func WithPeer(p peer.ID) Option { - return func(params *lightPushParameters) error { +func WithPeer(p peer.ID) RequestOption { + return func(params *lightPushRequestParameters) error { params.selectedPeer = p if params.peerAddr != nil { return errors.New("peerAddr and peerId options are mutually exclusive") @@ -41,8 +55,8 @@ func WithPeer(p peer.ID) Option { // WithPeerAddr is an option used to specify a peerAddress // This new peer will be added to peerStore. // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. -func WithPeerAddr(pAddr multiaddr.Multiaddr) Option { - return func(params *lightPushParameters) error { +func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption { + return func(params *lightPushRequestParameters) error { params.peerAddr = pAddr if params.selectedPeer != "" { return errors.New("peerAddr and peerId options are mutually exclusive") @@ -55,8 +69,8 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) Option { // to push a waku message to. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore -func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { - return func(params *lightPushParameters) error { +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) RequestOption { + return func(params *lightPushRequestParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers return nil @@ -67,24 +81,24 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore -func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option { - return func(params *lightPushParameters) error { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) RequestOption { + return func(params *lightPushRequestParameters) error { params.peerSelectionType = peermanager.LowestRTT return nil } } // WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted -func WithPubSubTopic(pubsubTopic string) Option { - return func(params *lightPushParameters) error { +func WithPubSubTopic(pubsubTopic string) RequestOption { + return func(params *lightPushRequestParameters) error { params.pubsubTopic = pubsubTopic return nil } } // WithDefaultPubsubTopic is used to indicate that the message should be broadcasted in the default pubsub topic -func WithDefaultPubsubTopic() Option { - return func(params *lightPushParameters) error { +func WithDefaultPubsubTopic() RequestOption { + return func(params *lightPushRequestParameters) error { params.pubsubTopic = relay.DefaultWakuTopic return nil } @@ -92,8 +106,8 @@ func WithDefaultPubsubTopic() Option { // WithRequestID is an option to set a specific request ID to be used when // publishing a message -func WithRequestID(requestID []byte) Option { - return func(params *lightPushParameters) error { +func WithRequestID(requestID []byte) RequestOption { + return func(params *lightPushRequestParameters) error { params.requestID = requestID return nil } @@ -101,16 +115,16 @@ func WithRequestID(requestID []byte) Option { // WithAutomaticRequestID is an option to automatically generate a request ID // when publishing a message -func WithAutomaticRequestID() Option { - return func(params *lightPushParameters) error { +func WithAutomaticRequestID() RequestOption { + return func(params *lightPushRequestParameters) error { params.requestID = protocol.GenerateRequestID() return nil } } // DefaultOptions are the default options to be used when using the lightpush protocol -func DefaultOptions(host host.Host) []Option { - return []Option{ +func DefaultOptions(host host.Host) []RequestOption { + return []RequestOption{ WithAutomaticRequestID(), WithAutomaticPeerSelection(), } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go index 7c69dd9c..6998b1fa 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go @@ -18,7 +18,7 @@ func TestLightPushOption(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - options := []Option{ + options := []RequestOption{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), WithFastestPeerSelection(), @@ -26,7 +26,7 @@ func TestLightPushOption(t *testing.T) { WithAutomaticRequestID(), } - params := new(lightPushParameters) + params := new(lightPushRequestParameters) params.host = host params.log = utils.Logger() @@ -42,7 +42,7 @@ func TestLightPushOption(t *testing.T) { maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy") require.NoError(t, err) - options = []Option{ + options = []RequestOption{ WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"), WithPeerAddr(maddr), } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 81112746..5775bbd2 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -3,12 +3,13 @@ package lightpush import ( "context" "crypto/rand" - "github.com/waku-org/go-waku/waku/v2/peermanager" - "go.uber.org/zap" "sync" "testing" "time" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "go.uber.org/zap" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" @@ -122,7 +123,7 @@ func TestWakuLightPush(t *testing.T) { <-sub2.Ch }() - var lpOptions []Option + var lpOptions []RequestOption lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) lpOptions = append(lpOptions, WithPeer(host2.ID())) @@ -156,7 +157,7 @@ func TestWakuLightPushNoPeers(t *testing.T) { require.NoError(t, err) client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger()) client.SetHost(clientHost) - var lpOptions []Option + var lpOptions []RequestOption lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) _, err = client.Publish(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...) @@ -234,7 +235,7 @@ func TestWakuLightPushAutoSharding(t *testing.T) { <-sub2.Ch }() - var lpOptions []Option + var lpOptions []RequestOption lpOptions = append(lpOptions, WithPeer(host2.ID())) // Verifying successful request hash1, err := client.Publish(ctx, msg1, lpOptions...) @@ -295,7 +296,7 @@ func TestWakuLightPushCornerCases(t *testing.T) { var wg sync.WaitGroup - var lpOptions []Option + var lpOptions []RequestOption lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) lpOptions = append(lpOptions, WithPeer(host2.ID())) @@ -314,7 +315,7 @@ func TestWakuLightPushCornerCases(t *testing.T) { host3, err := tests.MakeHost(context.Background(), 12345, rand.Reader) require.NoError(t, err) - var lpOptions2 []Option + var lpOptions2 []RequestOption // Test error case with empty options _, err = client.Publish(ctx, msg2, lpOptions2...)