feat(peer-exchange): rate limit (#1043)

Co-authored-by: Prem Chaitanya Prathi <chaitanyaprem@gmail.com>
This commit is contained in:
richΛrd 2024-03-11 00:28:14 -04:00 committed by GitHub
parent d4bda1255c
commit 4d828bdf70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 48 additions and 17 deletions

View File

@ -19,8 +19,8 @@ import (
"go.uber.org/zap"
)
func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error {
params := new(PeerExchangeParameters)
func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...RequestOption) error {
params := new(PeerExchangeRequestParameters)
params.host = wakuPX.h
params.log = wakuPX.log
params.pm = wakuPX.pm
@ -103,7 +103,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
return wakuPX.handleResponse(ctx, responseRPC.Response, params)
}
func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse, params *PeerExchangeParameters) error {
func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse, params *PeerExchangeRequestParameters) error {
var discoveredPeers []struct {
addrInfo peer.AddrInfo
enr *enode.Node

View File

@ -39,6 +39,7 @@ var (
decodeRPCFailure metricsErrCategory = "decode_rpc_failure"
pxFailure metricsErrCategory = "px_failure"
dialFailure metricsErrCategory = "dial_failure"
rateLimitFailure metricsErrCategory = "ratelimit_failure"
)
// RecordError increases the counter for different error types

View File

@ -20,6 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"github.com/waku-org/go-waku/waku/v2/service"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
@ -47,12 +48,13 @@ type WakuPeerExchange struct {
peerConnector PeerConnector
enrCache *enrCache
limiter *rate.Limiter
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
// Takes an optional peermanager if WakuPeerExchange is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger) (*WakuPeerExchange, error) {
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) (*WakuPeerExchange, error) {
wakuPX := new(WakuPeerExchange)
wakuPX.disc = disc
wakuPX.metrics = newMetrics(reg)
@ -62,6 +64,12 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector,
wakuPX.pm = pm
wakuPX.CommonService = service.NewCommonService()
params := &PeerExchangeParameters{}
for _, opt := range opts {
opt(params)
}
wakuPX.limiter = params.limiter
return wakuPX, nil
}
@ -87,6 +95,14 @@ func (wakuPX *WakuPeerExchange) start() error {
func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
return func(stream network.Stream) {
logger := wakuPX.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
if wakuPX.limiter != nil && !wakuPX.limiter.Allow() {
wakuPX.metrics.RecordError(rateLimitFailure)
wakuPX.log.Error("exceeds the rate limit")
// TODO: peer exchange protocol should contain an err field
return
}
requestRPC := &pb.PeerExchangeRPC{}
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
err := reader.ReadMsg(requestRPC)

View File

@ -8,9 +8,23 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
type PeerExchangeParameters struct {
limiter *rate.Limiter
}
type Option func(*PeerExchangeParameters)
// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol
func WithRateLimiter(r rate.Limit, b int) Option {
return func(params *PeerExchangeParameters) {
params.limiter = rate.NewLimiter(r, b)
}
}
type PeerExchangeRequestParameters struct {
host host.Host
selectedPeer peer.ID
peerAddr multiaddr.Multiaddr
@ -22,11 +36,11 @@ type PeerExchangeParameters struct {
clusterID int
}
type PeerExchangeOption func(*PeerExchangeParameters) error
type RequestOption func(*PeerExchangeRequestParameters) error
// WithPeer is an option used to specify the peerID to fetch peers from
func WithPeer(p peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithPeer(p peer.ID) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.selectedPeer = p
if params.peerAddr != nil {
return errors.New("peerAddr and peerId options are mutually exclusive")
@ -38,8 +52,8 @@ func WithPeer(p peer.ID) PeerExchangeOption {
// WithPeerAddr is an option used to specify a peerAddress to fetch peers from
// This new peer will be added to peerStore.
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
func WithPeerAddr(pAddr multiaddr.Multiaddr) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.peerAddr = pAddr
if params.selectedPeer != "" {
return errors.New("peerAddr and peerId options are mutually exclusive")
@ -53,8 +67,8 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) PeerExchangeOption {
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
// Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.peerSelectionType = peermanager.Automatic
params.preferredPeers = fromThesePeers
return nil
@ -65,8 +79,8 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func WithFastestPeerSelection(fromThesePeers ...peer.ID) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.peerSelectionType = peermanager.LowestRTT
params.preferredPeers = fromThesePeers
return nil
@ -74,15 +88,15 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
}
// DefaultOptions are the default options to be used when using the lightpush protocol
func DefaultOptions(host host.Host) []PeerExchangeOption {
return []PeerExchangeOption{
func DefaultOptions(host host.Host) []RequestOption {
return []RequestOption{
WithAutomaticPeerSelection(),
}
}
// Use this if you want to filter peers by specific shards
func FilterByShard(clusterID int, shard int) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
func FilterByShard(clusterID int, shard int) RequestOption {
return func(params *PeerExchangeRequestParameters) error {
params.shard = shard
params.clusterID = clusterID
return nil