From c5577418b78f904510da5b3b8153b930d8d13ae1 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Mon, 8 Jun 2020 13:15:19 +0200 Subject: [PATCH] Implement rate limiting by bytes Why make this change? Currently we only rate limit by number of messages. This works well if we want to limit a large amount of small messages but breaks down if sending a smaller amount of large messages. This commit extends the current code to limit by size as well, setting the default to 1MB per second, which should be enough. What has changed? - Rate limiter for waku only --- node/geth_node.go | 2 + params/config.go | 12 +++++- waku/common/rate_limiter.go | 67 +++++++++++++++++++++++--------- waku/common/rate_limiter_test.go | 66 ++++++++++++++++++++++++++----- waku/waku.go | 2 + 5 files changed, 118 insertions(+), 31 deletions(-) diff --git a/node/geth_node.go b/node/geth_node.go index a899a67e6..c56e6f248 100644 --- a/node/geth_node.go +++ b/node/geth_node.go @@ -578,6 +578,8 @@ func wakuRateLimiter(wakuCfg *params.WakuConfig, clusterCfg *params.ClusterConfi &wakucommon.PeerRateLimiterConfig{ PacketLimitPerSecIP: wakuCfg.PacketRateLimitIP, PacketLimitPerSecPeerID: wakuCfg.PacketRateLimitPeerID, + BytesLimitPerSecIP: wakuCfg.BytesRateLimitIP, + BytesLimitPerSecPeerID: wakuCfg.BytesRateLimitPeerID, WhitelistedIPs: ips, WhitelistedPeerIDs: peerIDs, }, diff --git a/params/config.go b/params/config.go index e03f46960..54b6ba923 100644 --- a/params/config.go +++ b/params/config.go @@ -177,14 +177,22 @@ type WakuConfig struct { // EnableRateLimiter set to true enables IP and peer ID rate limiting. EnableRateLimiter bool - // PacketRateLimitIP sets the limit on the number of messages per second + // PacketRateLimitIP sets the limit on the number of packets per second // from a given IP. PacketRateLimitIP int64 - // PacketRateLimitPeerID sets the limit on the number of messages per second + // PacketRateLimitPeerID sets the limit on the number of packets per second // from a given peer ID. PacketRateLimitPeerID int64 + // BytesRateLimitIP sets the limit on the number of bytes per second + // from a given IP. + BytesRateLimitIP int64 + + // BytesRateLimitPeerID sets the limit on the number of bytes per second + // from a given peer ID. + BytesRateLimitPeerID int64 + // RateLimitTolerance is a number of how many a limit must be exceeded // in order to drop a peer. // If equal to 0, the peers are never dropped. diff --git a/waku/common/rate_limiter.go b/waku/common/rate_limiter.go index c1605cde5..29df9d649 100644 --- a/waku/common/rate_limiter.go +++ b/waku/common/rate_limiter.go @@ -68,9 +68,9 @@ type RateLimits struct { PacketPeerIDLimits uint64 // packets per second from a single peer ID (default 0, no limits) PacketTopicLimits uint64 // packets per second from a single topic (default 0, no limits) - SizeIPLimits uint64 // bytes per second from a single IP (default 0, no limits) - SizePeerIDLimits uint64 // bytes per second from a single peer ID (default 0, no limits) - SizeTopicLimits uint64 // bytes per second from a single topic (default 0, no limits) + BytesIPLimits uint64 // bytes per second from a single IP (default 0, no limits) + BytesPeerIDLimits uint64 // bytes per second from a single peer ID (default 0, no limits) + BytesTopicLimits uint64 // bytes per second from a single topic (default 0, no limits) } func (r RateLimits) IsZero() bool { @@ -107,6 +107,8 @@ func (h *DropPeerRateLimiterHandler) ExceedIPLimit() error { type PeerRateLimiterConfig struct { PacketLimitPerSecIP int64 PacketLimitPerSecPeerID int64 + BytesLimitPerSecIP int64 + BytesLimitPerSecPeerID int64 WhitelistedIPs []string WhitelistedPeerIDs []enode.ID } @@ -114,6 +116,8 @@ type PeerRateLimiterConfig struct { var defaultPeerRateLimiterConfig = PeerRateLimiterConfig{ PacketLimitPerSecIP: 10, PacketLimitPerSecPeerID: 5, + BytesLimitPerSecIP: 1048576, // 2MB + BytesLimitPerSecPeerID: 1048576, // 2MB WhitelistedIPs: nil, WhitelistedPeerIDs: nil, } @@ -123,9 +127,15 @@ type PeerRateLimiter struct { packetPeerIDThrottler *tb.Throttler packetIpThrottler *tb.Throttler + bytesPeerIDThrottler *tb.Throttler + bytesIpThrottler *tb.Throttler + PacketLimitPerSecIP int64 PacketLimitPerSecPeerID int64 + BytesLimitPerSecIP int64 + BytesLimitPerSecPeerID int64 + whitelistedPeerIDs []enode.ID whitelistedIPs []string @@ -141,8 +151,12 @@ func NewPeerRateLimiter(cfg *PeerRateLimiterConfig, handlers ...RateLimiterHandl return &PeerRateLimiter{ packetPeerIDThrottler: tb.NewThrottler(time.Millisecond * 100), packetIpThrottler: tb.NewThrottler(time.Millisecond * 100), + bytesPeerIDThrottler: tb.NewThrottler(time.Millisecond * 100), + bytesIpThrottler: tb.NewThrottler(time.Millisecond * 100), PacketLimitPerSecIP: cfg.PacketLimitPerSecIP, PacketLimitPerSecPeerID: cfg.PacketLimitPerSecPeerID, + BytesLimitPerSecIP: cfg.BytesLimitPerSecIP, + BytesLimitPerSecPeerID: cfg.BytesLimitPerSecPeerID, whitelistedPeerIDs: cfg.WhitelistedPeerIDs, whitelistedIPs: cfg.WhitelistedIPs, handlers: handlers, @@ -181,7 +195,7 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL // as IP() might return a nil value ip = p.IP().String() } - if halted := r.throttleIP(ip); halted { + if halted := r.throttleIP(ip, packet.Size); halted { for _, h := range r.handlers { if err := h.ExceedIPLimit(); err != nil { errC <- fmt.Errorf("exceed rate limit by IP: %v", err) @@ -194,7 +208,7 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL if p != nil { peerID = p.ID() } - if halted := r.throttlePeer(peerID); halted { + if halted := r.throttlePeer(peerID, packet.Size); halted { for _, h := range r.handlers { if err := h.ExceedPeerLimit(); err != nil { errC <- fmt.Errorf("exceeded rate limit by peer: %v", err) @@ -232,30 +246,45 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL return <-errC } -// throttleIP throttles a number of packets incoming from a given IP. -// It allows 10 packets per second. -func (r *PeerRateLimiter) throttleIP(ip string) bool { - if r.PacketLimitPerSecIP == 0 { - return false - } +// throttleIP throttles packets incoming from a given IP. +func (r *PeerRateLimiter) throttleIP(ip string, size uint32) bool { if stringSliceContains(r.whitelistedIPs, ip) { return false } - return r.packetIpThrottler.Halt(ip, 1, r.PacketLimitPerSecIP) + + var packetLimiterResponse bool + var bytesLimiterResponse bool + + if r.PacketLimitPerSecIP != 0 { + packetLimiterResponse = r.packetIpThrottler.Halt(ip, 1, r.PacketLimitPerSecIP) + } + if r.BytesLimitPerSecIP != 0 { + bytesLimiterResponse = r.bytesIpThrottler.Halt(ip, int64(size), r.BytesLimitPerSecIP) + } + + return packetLimiterResponse || bytesLimiterResponse } -// throttlePeer throttles a number of packets incoming from a peer. -// It allows 3 packets per second. -func (r *PeerRateLimiter) throttlePeer(peerID []byte) bool { - if r.PacketLimitPerSecIP == 0 { - return false - } +// throttlePeer throttles packets incoming from a peer. +func (r *PeerRateLimiter) throttlePeer(peerID []byte, size uint32) bool { var id enode.ID copy(id[:], peerID) if enodeIDSliceContains(r.whitelistedPeerIDs, id) { return false } - return r.packetPeerIDThrottler.Halt(id.String(), 1, r.PacketLimitPerSecPeerID) + + var packetLimiterResponse bool + var bytesLimiterResponse bool + + if r.PacketLimitPerSecPeerID != 0 { + packetLimiterResponse = r.packetPeerIDThrottler.Halt(id.String(), 1, r.PacketLimitPerSecPeerID) + } + + if r.BytesLimitPerSecPeerID != 0 { + bytesLimiterResponse = r.bytesPeerIDThrottler.Halt(id.String(), int64(size), r.BytesLimitPerSecPeerID) + } + + return packetLimiterResponse || bytesLimiterResponse } func stringSliceContains(s []string, searched string) bool { diff --git a/waku/common/rate_limiter_test.go b/waku/common/rate_limiter_test.go index 46c5a2830..43f8c90be 100644 --- a/waku/common/rate_limiter_test.go +++ b/waku/common/rate_limiter_test.go @@ -70,14 +70,14 @@ func TestPeerRateLimiterDecorator(t *testing.T) { func TestPeerLimiterThrottlingWithZeroLimit(t *testing.T) { r := NewPeerRateLimiter(&PeerRateLimiterConfig{}, &mockRateLimiterHandler{}) for i := 0; i < 1000; i++ { - throttle := r.throttleIP("") + throttle := r.throttleIP("", 0) require.False(t, throttle) - throttle = r.throttlePeer([]byte{0x01, 0x02, 0x03}) + throttle = r.throttlePeer([]byte{0x01, 0x02, 0x03}, 0) require.False(t, throttle) } } -func TestPeerLimiterHandler(t *testing.T) { +func TestPeerPacketLimiterHandler(t *testing.T) { h := &mockRateLimiterHandler{} r := NewPeerRateLimiter(nil, h) p := &TestWakuPeer{ @@ -108,17 +108,63 @@ func TestPeerLimiterHandler(t *testing.T) { <-done - require.EqualValues(t, 100-defaultPeerRateLimiterConfig.LimitPerSecIP, h.exceedIPLimit) - require.EqualValues(t, 100-defaultPeerRateLimiterConfig.LimitPerSecPeerID, h.exceedPeerLimit) + require.EqualValues(t, 100-defaultPeerRateLimiterConfig.PacketLimitPerSecIP, h.exceedIPLimit) + require.EqualValues(t, 100-defaultPeerRateLimiterConfig.PacketLimitPerSecPeerID, h.exceedPeerLimit) } -func TestPeerLimiterHandlerWithWhitelisting(t *testing.T) { +func TestPeerBytesLimiterHandler(t *testing.T) { h := &mockRateLimiterHandler{} r := NewPeerRateLimiter(&PeerRateLimiterConfig{ - LimitPerSecIP: 1, - LimitPerSecPeerID: 1, - WhitelistedIPs: []string{""}, // no IP is represented as string - WhitelistedPeerIDs: []enode.ID{{0xaa, 0xbb, 0xcc}}, + BytesLimitPerSecIP: 30, + BytesLimitPerSecPeerID: 30, + }, h) + p := &TestWakuPeer{ + peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil), + } + rw1, rw2 := p2p.MsgPipe() + count := 6 + + go func() { + err := echoMessages(r, p, rw2) + require.NoError(t, err) + }() + + done := make(chan struct{}) + go func() { + for i := 0; i < count; i++ { + msg, err := rw1.ReadMsg() + require.NoError(t, err) + require.EqualValues(t, 101, msg.Code) + msg.Discard() + } + close(done) + }() + + for i := 0; i < count; i++ { + payload := make([]byte, 10) + msg := p2p.Msg{ + Code: 101, + Size: uint32(len(payload)), + Payload: bytes.NewReader(payload), + } + + err := rw1.WriteMsg(msg) + require.NoError(t, err) + } + + <-done + + require.EqualValues(t, 3, h.exceedIPLimit) + require.EqualValues(t, 3, h.exceedPeerLimit) +} + +func TestPeerPacketLimiterHandlerWithWhitelisting(t *testing.T) { + h := &mockRateLimiterHandler{} + r := NewPeerRateLimiter(&PeerRateLimiterConfig{ + PacketLimitPerSecIP: 1, + PacketLimitPerSecPeerID: 1, + WhitelistedIPs: []string{""}, // no IP is represented as string + WhitelistedPeerIDs: []enode.ID{{0xaa, 0xbb, 0xcc}}, }, h) p := &TestWakuPeer{ peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil), diff --git a/waku/waku.go b/waku/waku.go index 17711f16a..31d6d4ce3 100644 --- a/waku/waku.go +++ b/waku/waku.go @@ -402,6 +402,8 @@ func (w *Waku) RateLimits() common.RateLimits { return common.RateLimits{ PacketIPLimits: uint64(w.rateLimiter.PacketLimitPerSecIP), PacketPeerIDLimits: uint64(w.rateLimiter.PacketLimitPerSecPeerID), + BytesIPLimits: uint64(w.rateLimiter.BytesLimitPerSecIP), + BytesPeerIDLimits: uint64(w.rateLimiter.BytesLimitPerSecPeerID), } }