diff --git a/node/geth_node.go b/node/geth_node.go index a899a67e6..c56e6f248 100644 --- a/node/geth_node.go +++ b/node/geth_node.go @@ -578,6 +578,8 @@ func wakuRateLimiter(wakuCfg *params.WakuConfig, clusterCfg *params.ClusterConfi &wakucommon.PeerRateLimiterConfig{ PacketLimitPerSecIP: wakuCfg.PacketRateLimitIP, PacketLimitPerSecPeerID: wakuCfg.PacketRateLimitPeerID, + BytesLimitPerSecIP: wakuCfg.BytesRateLimitIP, + BytesLimitPerSecPeerID: wakuCfg.BytesRateLimitPeerID, WhitelistedIPs: ips, WhitelistedPeerIDs: peerIDs, }, diff --git a/params/config.go b/params/config.go index e03f46960..54b6ba923 100644 --- a/params/config.go +++ b/params/config.go @@ -177,14 +177,22 @@ type WakuConfig struct { // EnableRateLimiter set to true enables IP and peer ID rate limiting. EnableRateLimiter bool - // PacketRateLimitIP sets the limit on the number of messages per second + // PacketRateLimitIP sets the limit on the number of packets per second // from a given IP. PacketRateLimitIP int64 - // PacketRateLimitPeerID sets the limit on the number of messages per second + // PacketRateLimitPeerID sets the limit on the number of packets per second // from a given peer ID. PacketRateLimitPeerID int64 + // BytesRateLimitIP sets the limit on the number of bytes per second + // from a given IP. + BytesRateLimitIP int64 + + // BytesRateLimitPeerID sets the limit on the number of bytes per second + // from a given peer ID. + BytesRateLimitPeerID int64 + // RateLimitTolerance is a number of how many a limit must be exceeded // in order to drop a peer. // If equal to 0, the peers are never dropped. diff --git a/waku/common/rate_limiter.go b/waku/common/rate_limiter.go index c1605cde5..29df9d649 100644 --- a/waku/common/rate_limiter.go +++ b/waku/common/rate_limiter.go @@ -68,9 +68,9 @@ type RateLimits struct { PacketPeerIDLimits uint64 // packets per second from a single peer ID (default 0, no limits) PacketTopicLimits uint64 // packets per second from a single topic (default 0, no limits) - SizeIPLimits uint64 // bytes per second from a single IP (default 0, no limits) - SizePeerIDLimits uint64 // bytes per second from a single peer ID (default 0, no limits) - SizeTopicLimits uint64 // bytes per second from a single topic (default 0, no limits) + BytesIPLimits uint64 // bytes per second from a single IP (default 0, no limits) + BytesPeerIDLimits uint64 // bytes per second from a single peer ID (default 0, no limits) + BytesTopicLimits uint64 // bytes per second from a single topic (default 0, no limits) } func (r RateLimits) IsZero() bool { @@ -107,6 +107,8 @@ func (h *DropPeerRateLimiterHandler) ExceedIPLimit() error { type PeerRateLimiterConfig struct { PacketLimitPerSecIP int64 PacketLimitPerSecPeerID int64 + BytesLimitPerSecIP int64 + BytesLimitPerSecPeerID int64 WhitelistedIPs []string WhitelistedPeerIDs []enode.ID } @@ -114,6 +116,8 @@ type PeerRateLimiterConfig struct { var defaultPeerRateLimiterConfig = PeerRateLimiterConfig{ PacketLimitPerSecIP: 10, PacketLimitPerSecPeerID: 5, + BytesLimitPerSecIP: 1048576, // 2MB + BytesLimitPerSecPeerID: 1048576, // 2MB WhitelistedIPs: nil, WhitelistedPeerIDs: nil, } @@ -123,9 +127,15 @@ type PeerRateLimiter struct { packetPeerIDThrottler *tb.Throttler packetIpThrottler *tb.Throttler + bytesPeerIDThrottler *tb.Throttler + bytesIpThrottler *tb.Throttler + PacketLimitPerSecIP int64 PacketLimitPerSecPeerID int64 + BytesLimitPerSecIP int64 + BytesLimitPerSecPeerID int64 + whitelistedPeerIDs []enode.ID whitelistedIPs []string @@ -141,8 +151,12 @@ func NewPeerRateLimiter(cfg *PeerRateLimiterConfig, handlers ...RateLimiterHandl return &PeerRateLimiter{ packetPeerIDThrottler: tb.NewThrottler(time.Millisecond * 100), packetIpThrottler: tb.NewThrottler(time.Millisecond * 100), + bytesPeerIDThrottler: tb.NewThrottler(time.Millisecond * 100), + bytesIpThrottler: tb.NewThrottler(time.Millisecond * 100), PacketLimitPerSecIP: cfg.PacketLimitPerSecIP, PacketLimitPerSecPeerID: cfg.PacketLimitPerSecPeerID, + BytesLimitPerSecIP: cfg.BytesLimitPerSecIP, + BytesLimitPerSecPeerID: cfg.BytesLimitPerSecPeerID, whitelistedPeerIDs: cfg.WhitelistedPeerIDs, whitelistedIPs: cfg.WhitelistedIPs, handlers: handlers, @@ -181,7 +195,7 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL // as IP() might return a nil value ip = p.IP().String() } - if halted := r.throttleIP(ip); halted { + if halted := r.throttleIP(ip, packet.Size); halted { for _, h := range r.handlers { if err := h.ExceedIPLimit(); err != nil { errC <- fmt.Errorf("exceed rate limit by IP: %v", err) @@ -194,7 +208,7 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL if p != nil { peerID = p.ID() } - if halted := r.throttlePeer(peerID); halted { + if halted := r.throttlePeer(peerID, packet.Size); halted { for _, h := range r.handlers { if err := h.ExceedPeerLimit(); err != nil { errC <- fmt.Errorf("exceeded rate limit by peer: %v", err) @@ -232,30 +246,45 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL return <-errC } -// throttleIP throttles a number of packets incoming from a given IP. -// It allows 10 packets per second. -func (r *PeerRateLimiter) throttleIP(ip string) bool { - if r.PacketLimitPerSecIP == 0 { - return false - } +// throttleIP throttles packets incoming from a given IP. +func (r *PeerRateLimiter) throttleIP(ip string, size uint32) bool { if stringSliceContains(r.whitelistedIPs, ip) { return false } - return r.packetIpThrottler.Halt(ip, 1, r.PacketLimitPerSecIP) + + var packetLimiterResponse bool + var bytesLimiterResponse bool + + if r.PacketLimitPerSecIP != 0 { + packetLimiterResponse = r.packetIpThrottler.Halt(ip, 1, r.PacketLimitPerSecIP) + } + if r.BytesLimitPerSecIP != 0 { + bytesLimiterResponse = r.bytesIpThrottler.Halt(ip, int64(size), r.BytesLimitPerSecIP) + } + + return packetLimiterResponse || bytesLimiterResponse } -// throttlePeer throttles a number of packets incoming from a peer. -// It allows 3 packets per second. -func (r *PeerRateLimiter) throttlePeer(peerID []byte) bool { - if r.PacketLimitPerSecIP == 0 { - return false - } +// throttlePeer throttles packets incoming from a peer. +func (r *PeerRateLimiter) throttlePeer(peerID []byte, size uint32) bool { var id enode.ID copy(id[:], peerID) if enodeIDSliceContains(r.whitelistedPeerIDs, id) { return false } - return r.packetPeerIDThrottler.Halt(id.String(), 1, r.PacketLimitPerSecPeerID) + + var packetLimiterResponse bool + var bytesLimiterResponse bool + + if r.PacketLimitPerSecPeerID != 0 { + packetLimiterResponse = r.packetPeerIDThrottler.Halt(id.String(), 1, r.PacketLimitPerSecPeerID) + } + + if r.BytesLimitPerSecPeerID != 0 { + bytesLimiterResponse = r.bytesPeerIDThrottler.Halt(id.String(), int64(size), r.BytesLimitPerSecPeerID) + } + + return packetLimiterResponse || bytesLimiterResponse } func stringSliceContains(s []string, searched string) bool { diff --git a/waku/common/rate_limiter_test.go b/waku/common/rate_limiter_test.go index 46c5a2830..43f8c90be 100644 --- a/waku/common/rate_limiter_test.go +++ b/waku/common/rate_limiter_test.go @@ -70,14 +70,14 @@ func TestPeerRateLimiterDecorator(t *testing.T) { func TestPeerLimiterThrottlingWithZeroLimit(t *testing.T) { r := NewPeerRateLimiter(&PeerRateLimiterConfig{}, &mockRateLimiterHandler{}) for i := 0; i < 1000; i++ { - throttle := r.throttleIP("") + throttle := r.throttleIP("", 0) require.False(t, throttle) - throttle = r.throttlePeer([]byte{0x01, 0x02, 0x03}) + throttle = r.throttlePeer([]byte{0x01, 0x02, 0x03}, 0) require.False(t, throttle) } } -func TestPeerLimiterHandler(t *testing.T) { +func TestPeerPacketLimiterHandler(t *testing.T) { h := &mockRateLimiterHandler{} r := NewPeerRateLimiter(nil, h) p := &TestWakuPeer{ @@ -108,17 +108,63 @@ func TestPeerLimiterHandler(t *testing.T) { <-done - require.EqualValues(t, 100-defaultPeerRateLimiterConfig.LimitPerSecIP, h.exceedIPLimit) - require.EqualValues(t, 100-defaultPeerRateLimiterConfig.LimitPerSecPeerID, h.exceedPeerLimit) + require.EqualValues(t, 100-defaultPeerRateLimiterConfig.PacketLimitPerSecIP, h.exceedIPLimit) + require.EqualValues(t, 100-defaultPeerRateLimiterConfig.PacketLimitPerSecPeerID, h.exceedPeerLimit) } -func TestPeerLimiterHandlerWithWhitelisting(t *testing.T) { +func TestPeerBytesLimiterHandler(t *testing.T) { h := &mockRateLimiterHandler{} r := NewPeerRateLimiter(&PeerRateLimiterConfig{ - LimitPerSecIP: 1, - LimitPerSecPeerID: 1, - WhitelistedIPs: []string{""}, // no IP is represented as string - WhitelistedPeerIDs: []enode.ID{{0xaa, 0xbb, 0xcc}}, + BytesLimitPerSecIP: 30, + BytesLimitPerSecPeerID: 30, + }, h) + p := &TestWakuPeer{ + peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil), + } + rw1, rw2 := p2p.MsgPipe() + count := 6 + + go func() { + err := echoMessages(r, p, rw2) + require.NoError(t, err) + }() + + done := make(chan struct{}) + go func() { + for i := 0; i < count; i++ { + msg, err := rw1.ReadMsg() + require.NoError(t, err) + require.EqualValues(t, 101, msg.Code) + msg.Discard() + } + close(done) + }() + + for i := 0; i < count; i++ { + payload := make([]byte, 10) + msg := p2p.Msg{ + Code: 101, + Size: uint32(len(payload)), + Payload: bytes.NewReader(payload), + } + + err := rw1.WriteMsg(msg) + require.NoError(t, err) + } + + <-done + + require.EqualValues(t, 3, h.exceedIPLimit) + require.EqualValues(t, 3, h.exceedPeerLimit) +} + +func TestPeerPacketLimiterHandlerWithWhitelisting(t *testing.T) { + h := &mockRateLimiterHandler{} + r := NewPeerRateLimiter(&PeerRateLimiterConfig{ + PacketLimitPerSecIP: 1, + PacketLimitPerSecPeerID: 1, + WhitelistedIPs: []string{""}, // no IP is represented as string + WhitelistedPeerIDs: []enode.ID{{0xaa, 0xbb, 0xcc}}, }, h) p := &TestWakuPeer{ peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil), diff --git a/waku/waku.go b/waku/waku.go index 17711f16a..31d6d4ce3 100644 --- a/waku/waku.go +++ b/waku/waku.go @@ -402,6 +402,8 @@ func (w *Waku) RateLimits() common.RateLimits { return common.RateLimits{ PacketIPLimits: uint64(w.rateLimiter.PacketLimitPerSecIP), PacketPeerIDLimits: uint64(w.rateLimiter.PacketLimitPerSecPeerID), + BytesIPLimits: uint64(w.rateLimiter.BytesLimitPerSecIP), + BytesPeerIDLimits: uint64(w.rateLimiter.BytesLimitPerSecPeerID), } }