chore: add rate limiter option to lightpush (#1024)

This commit is contained in:
richΛrd 2024-02-05 08:53:15 -04:00 committed by GitHub
parent 2e7a82e130
commit c09bd8383b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 76 additions and 43 deletions

View File

@ -324,7 +324,7 @@ func (c *Chat) publish(ctx context.Context, message string) error {
} }
if c.options.LightPush.Enable { if c.options.LightPush.Enable {
lightOpt := []lightpush.Option{lightpush.WithDefaultPubsubTopic()} lightOpt := []lightpush.RequestOption{lightpush.WithDefaultPubsubTopic()}
var peerID peer.ID var peerID peer.ID
peerID, err = options.LightPush.NodePeerID() peerID, err = options.LightPush.NodePeerID()
if err != nil { if err != nil {

View File

@ -26,7 +26,7 @@ func lightpushPublish(instance *WakuInstance, msg *pb.WakuMessage, pubsubTopic s
ctx = instance.ctx ctx = instance.ctx
} }
var lpOptions []lightpush.Option var lpOptions []lightpush.RequestOption
if peerID != "" { if peerID != "" {
p, err := peer.Decode(peerID) p, err := peer.Decode(peerID)
if err != nil { if err != nil {

View File

@ -53,6 +53,7 @@ var (
writeRequestFailure metricsErrCategory = "write_request_failure" writeRequestFailure metricsErrCategory = "write_request_failure"
writeResponseFailure metricsErrCategory = "write_response_failure" writeResponseFailure metricsErrCategory = "write_response_failure"
dialFailure metricsErrCategory = "dial_failure" dialFailure metricsErrCategory = "dial_failure"
rateLimitFailure metricsErrCategory = "ratelimit_failure"
messagePushFailure metricsErrCategory = "message_push_failure" messagePushFailure metricsErrCategory = "message_push_failure"
requestBodyFailure metricsErrCategory = "request_failure" requestBodyFailure metricsErrCategory = "request_failure"
responseBodyFailure metricsErrCategory = "response_body_failure" responseBodyFailure metricsErrCategory = "response_body_failure"

View File

@ -22,6 +22,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/time/rate"
) )
// LightPushID_v20beta1 is the current Waku LightPush protocol identifier // LightPushID_v20beta1 is the current Waku LightPush protocol identifier
@ -37,6 +38,7 @@ var (
type WakuLightPush struct { type WakuLightPush struct {
h host.Host h host.Host
relay *relay.WakuRelay relay *relay.WakuRelay
limiter *rate.Limiter
cancel context.CancelFunc cancel context.CancelFunc
pm *peermanager.PeerManager pm *peermanager.PeerManager
metrics Metrics metrics Metrics
@ -47,13 +49,20 @@ type WakuLightPush struct {
// NewWakuLightPush returns a new instance of Waku Lightpush struct // NewWakuLightPush returns a new instance of Waku Lightpush struct
// Takes an optional peermanager if WakuLightPush is being created along with WakuNode. // Takes an optional peermanager if WakuLightPush is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil // If using libp2p host, then pass peermanager as nil
func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger) *WakuLightPush { func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) *WakuLightPush {
wakuLP := new(WakuLightPush) wakuLP := new(WakuLightPush)
wakuLP.relay = relay wakuLP.relay = relay
wakuLP.log = log.Named("lightpush") wakuLP.log = log.Named("lightpush")
wakuLP.pm = pm wakuLP.pm = pm
wakuLP.metrics = newMetrics(reg) wakuLP.metrics = newMetrics(reg)
params := &LightpushParameters{}
for _, opt := range opts {
opt(params)
}
wakuLP.limiter = params.limiter
if pm != nil { if pm != nil {
wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField) wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField)
} }
@ -91,6 +100,18 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream)
logger := wakuLP.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) logger := wakuLP.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
requestPushRPC := &pb.PushRpc{} requestPushRPC := &pb.PushRpc{}
responsePushRPC := &pb.PushRpc{
Response: &pb.PushResponse{},
}
if wakuLP.limiter != nil && !wakuLP.limiter.Allow() {
wakuLP.metrics.RecordError(rateLimitFailure)
responseMsg := "exceeds the rate limit"
responsePushRPC.Response.Info = &responseMsg
wakuLP.reply(stream, responsePushRPC, logger)
return
}
reader := pbio.NewDelimitedReader(stream, math.MaxInt32) reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
err := reader.ReadMsg(requestPushRPC) err := reader.ReadMsg(requestPushRPC)
@ -103,11 +124,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream)
return return
} }
responsePushRPC := &pb.PushRpc{ responsePushRPC.RequestId = requestPushRPC.RequestId
RequestId: requestPushRPC.RequestId,
Response: &pb.PushResponse{},
}
if err := requestPushRPC.ValidateRequest(); err != nil { if err := requestPushRPC.ValidateRequest(); err != nil {
responseMsg := err.Error() responseMsg := err.Error()
responsePushRPC.Response.Info = &responseMsg responsePushRPC.Response.Info = &responseMsg
@ -170,7 +187,7 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu
} }
// request sends a message via lightPush protocol to either a specified peer or peer that is selected. // request sends a message via lightPush protocol to either a specified peer or peer that is selected.
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushParameters) (*pb.PushResponse, error) { func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters) (*pb.PushResponse, error) {
if params == nil { if params == nil {
return nil, errors.New("lightpush params are mandatory") return nil, errors.New("lightpush params are mandatory")
} }
@ -233,8 +250,8 @@ func (wakuLP *WakuLightPush) Stop() {
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
} }
func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMessage, opts ...Option) (*lightPushParameters, error) { func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMessage, opts ...RequestOption) (*lightPushRequestParameters, error) {
params := new(lightPushParameters) params := new(lightPushRequestParameters)
params.host = wakuLP.h params.host = wakuLP.h
params.log = wakuLP.log params.log = wakuLP.log
params.pm = wakuLP.pm params.pm = wakuLP.pm
@ -294,7 +311,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
// Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the // Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the
// contentTopic) via lightpush protocol. If auto-sharding is not to be used, then the // contentTopic) via lightpush protocol. If auto-sharding is not to be used, then the
// `WithPubSubTopic` option should be provided to publish the message to an specific pubSubTopic // `WithPubSubTopic` option should be provided to publish the message to an specific pubSubTopic
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...RequestOption) ([]byte, error) {
if message == nil { if message == nil {
return nil, errors.New("message can't be null") return nil, errors.New("message can't be null")
} }

View File

@ -10,9 +10,23 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/time/rate"
) )
type lightPushParameters struct { type LightpushParameters struct {
limiter *rate.Limiter
}
type Option func(*LightpushParameters)
// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol
func WithRateLimiter(r rate.Limit, b int) Option {
return func(params *LightpushParameters) {
params.limiter = rate.NewLimiter(r, b)
}
}
type lightPushRequestParameters struct {
host host.Host host host.Host
peerAddr multiaddr.Multiaddr peerAddr multiaddr.Multiaddr
selectedPeer peer.ID selectedPeer peer.ID
@ -24,12 +38,12 @@ type lightPushParameters struct {
pubsubTopic string pubsubTopic string
} }
// Option is the type of options accepted when performing LightPush protocol requests // RequestOption is the type of options accepted when performing LightPush protocol requests
type Option func(*lightPushParameters) error type RequestOption func(*lightPushRequestParameters) error
// WithPeer is an option used to specify the peerID to push a waku message to // WithPeer is an option used to specify the peerID to push a waku message to
func WithPeer(p peer.ID) Option { func WithPeer(p peer.ID) RequestOption {
return func(params *lightPushParameters) error { return func(params *lightPushRequestParameters) error {
params.selectedPeer = p params.selectedPeer = p
if params.peerAddr != nil { if params.peerAddr != nil {
return errors.New("peerAddr and peerId options are mutually exclusive") return errors.New("peerAddr and peerId options are mutually exclusive")
@ -41,8 +55,8 @@ func WithPeer(p peer.ID) Option {
// WithPeerAddr is an option used to specify a peerAddress // WithPeerAddr is an option used to specify a peerAddress
// This new peer will be added to peerStore. // This new peer will be added to peerStore.
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
func WithPeerAddr(pAddr multiaddr.Multiaddr) Option { func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption {
return func(params *lightPushParameters) error { return func(params *lightPushRequestParameters) error {
params.peerAddr = pAddr params.peerAddr = pAddr
if params.selectedPeer != "" { if params.selectedPeer != "" {
return errors.New("peerAddr and peerId options are mutually exclusive") return errors.New("peerAddr and peerId options are mutually exclusive")
@ -55,8 +69,8 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) Option {
// to push a waku message to. If a list of specific peers is passed, the peer will be chosen // to push a waku message to. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore // from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) RequestOption {
return func(params *lightPushParameters) error { return func(params *lightPushRequestParameters) error {
params.peerSelectionType = peermanager.Automatic params.peerSelectionType = peermanager.Automatic
params.preferredPeers = fromThesePeers params.preferredPeers = fromThesePeers
return nil return nil
@ -67,24 +81,24 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen // with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore // from the node peerstore
func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option { func WithFastestPeerSelection(fromThesePeers ...peer.ID) RequestOption {
return func(params *lightPushParameters) error { return func(params *lightPushRequestParameters) error {
params.peerSelectionType = peermanager.LowestRTT params.peerSelectionType = peermanager.LowestRTT
return nil return nil
} }
} }
// WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted // WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted
func WithPubSubTopic(pubsubTopic string) Option { func WithPubSubTopic(pubsubTopic string) RequestOption {
return func(params *lightPushParameters) error { return func(params *lightPushRequestParameters) error {
params.pubsubTopic = pubsubTopic params.pubsubTopic = pubsubTopic
return nil return nil
} }
} }
// WithDefaultPubsubTopic is used to indicate that the message should be broadcasted in the default pubsub topic // WithDefaultPubsubTopic is used to indicate that the message should be broadcasted in the default pubsub topic
func WithDefaultPubsubTopic() Option { func WithDefaultPubsubTopic() RequestOption {
return func(params *lightPushParameters) error { return func(params *lightPushRequestParameters) error {
params.pubsubTopic = relay.DefaultWakuTopic params.pubsubTopic = relay.DefaultWakuTopic
return nil return nil
} }
@ -92,8 +106,8 @@ func WithDefaultPubsubTopic() Option {
// WithRequestID is an option to set a specific request ID to be used when // WithRequestID is an option to set a specific request ID to be used when
// publishing a message // publishing a message
func WithRequestID(requestID []byte) Option { func WithRequestID(requestID []byte) RequestOption {
return func(params *lightPushParameters) error { return func(params *lightPushRequestParameters) error {
params.requestID = requestID params.requestID = requestID
return nil return nil
} }
@ -101,16 +115,16 @@ func WithRequestID(requestID []byte) Option {
// WithAutomaticRequestID is an option to automatically generate a request ID // WithAutomaticRequestID is an option to automatically generate a request ID
// when publishing a message // when publishing a message
func WithAutomaticRequestID() Option { func WithAutomaticRequestID() RequestOption {
return func(params *lightPushParameters) error { return func(params *lightPushRequestParameters) error {
params.requestID = protocol.GenerateRequestID() params.requestID = protocol.GenerateRequestID()
return nil return nil
} }
} }
// DefaultOptions are the default options to be used when using the lightpush protocol // DefaultOptions are the default options to be used when using the lightpush protocol
func DefaultOptions(host host.Host) []Option { func DefaultOptions(host host.Host) []RequestOption {
return []Option{ return []RequestOption{
WithAutomaticRequestID(), WithAutomaticRequestID(),
WithAutomaticPeerSelection(), WithAutomaticPeerSelection(),
} }

View File

@ -18,7 +18,7 @@ func TestLightPushOption(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader) host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
options := []Option{ options := []RequestOption{
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
WithAutomaticPeerSelection(), WithAutomaticPeerSelection(),
WithFastestPeerSelection(), WithFastestPeerSelection(),
@ -26,7 +26,7 @@ func TestLightPushOption(t *testing.T) {
WithAutomaticRequestID(), WithAutomaticRequestID(),
} }
params := new(lightPushParameters) params := new(lightPushRequestParameters)
params.host = host params.host = host
params.log = utils.Logger() params.log = utils.Logger()
@ -42,7 +42,7 @@ func TestLightPushOption(t *testing.T) {
maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy") maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy")
require.NoError(t, err) require.NoError(t, err)
options = []Option{ options = []RequestOption{
WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"), WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"),
WithPeerAddr(maddr), WithPeerAddr(maddr),
} }

View File

@ -3,12 +3,13 @@ package lightpush
import ( import (
"context" "context"
"crypto/rand" "crypto/rand"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"go.uber.org/zap"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"go.uber.org/zap"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -122,7 +123,7 @@ func TestWakuLightPush(t *testing.T) {
<-sub2.Ch <-sub2.Ch
}() }()
var lpOptions []Option var lpOptions []RequestOption
lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) lpOptions = append(lpOptions, WithPubSubTopic(testTopic))
lpOptions = append(lpOptions, WithPeer(host2.ID())) lpOptions = append(lpOptions, WithPeer(host2.ID()))
@ -156,7 +157,7 @@ func TestWakuLightPushNoPeers(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger()) client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger())
client.SetHost(clientHost) client.SetHost(clientHost)
var lpOptions []Option var lpOptions []RequestOption
lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) lpOptions = append(lpOptions, WithPubSubTopic(testTopic))
_, err = client.Publish(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...) _, err = client.Publish(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...)
@ -234,7 +235,7 @@ func TestWakuLightPushAutoSharding(t *testing.T) {
<-sub2.Ch <-sub2.Ch
}() }()
var lpOptions []Option var lpOptions []RequestOption
lpOptions = append(lpOptions, WithPeer(host2.ID())) lpOptions = append(lpOptions, WithPeer(host2.ID()))
// Verifying successful request // Verifying successful request
hash1, err := client.Publish(ctx, msg1, lpOptions...) hash1, err := client.Publish(ctx, msg1, lpOptions...)
@ -295,7 +296,7 @@ func TestWakuLightPushCornerCases(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
var lpOptions []Option var lpOptions []RequestOption
lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) lpOptions = append(lpOptions, WithPubSubTopic(testTopic))
lpOptions = append(lpOptions, WithPeer(host2.ID())) lpOptions = append(lpOptions, WithPeer(host2.ID()))
@ -314,7 +315,7 @@ func TestWakuLightPushCornerCases(t *testing.T) {
host3, err := tests.MakeHost(context.Background(), 12345, rand.Reader) host3, err := tests.MakeHost(context.Background(), 12345, rand.Reader)
require.NoError(t, err) require.NoError(t, err)
var lpOptions2 []Option var lpOptions2 []RequestOption
// Test error case with empty options // Test error case with empty options
_, err = client.Publish(ctx, msg2, lpOptions2...) _, err = client.Publish(ctx, msg2, lpOptions2...)