Implement rate limiting by bytes

Why make this change?

Currently we only rate limit by number of messages.
This works well if we want to limit a large amount of small messages but
breaks down if sending a smaller amount of large messages.

This commit extends the current code to limit by size as well, setting
the default to 1MB per second, which should be enough.

What has changed?

- Rate limiter for waku only
This commit is contained in:
Andrea Maria Piana 2020-06-08 13:15:19 +02:00
parent 2d12ac4fbb
commit c5577418b7
5 changed files with 118 additions and 31 deletions

View File

@ -578,6 +578,8 @@ func wakuRateLimiter(wakuCfg *params.WakuConfig, clusterCfg *params.ClusterConfi
&wakucommon.PeerRateLimiterConfig{
PacketLimitPerSecIP: wakuCfg.PacketRateLimitIP,
PacketLimitPerSecPeerID: wakuCfg.PacketRateLimitPeerID,
BytesLimitPerSecIP: wakuCfg.BytesRateLimitIP,
BytesLimitPerSecPeerID: wakuCfg.BytesRateLimitPeerID,
WhitelistedIPs: ips,
WhitelistedPeerIDs: peerIDs,
},

View File

@ -177,14 +177,22 @@ type WakuConfig struct {
// EnableRateLimiter set to true enables IP and peer ID rate limiting.
EnableRateLimiter bool
// PacketRateLimitIP sets the limit on the number of messages per second
// PacketRateLimitIP sets the limit on the number of packets per second
// from a given IP.
PacketRateLimitIP int64
// PacketRateLimitPeerID sets the limit on the number of messages per second
// PacketRateLimitPeerID sets the limit on the number of packets per second
// from a given peer ID.
PacketRateLimitPeerID int64
// BytesRateLimitIP sets the limit on the number of bytes per second
// from a given IP.
BytesRateLimitIP int64
// BytesRateLimitPeerID sets the limit on the number of bytes per second
// from a given peer ID.
BytesRateLimitPeerID int64
// RateLimitTolerance is a number of how many a limit must be exceeded
// in order to drop a peer.
// If equal to 0, the peers are never dropped.

View File

@ -68,9 +68,9 @@ type RateLimits struct {
PacketPeerIDLimits uint64 // packets per second from a single peer ID (default 0, no limits)
PacketTopicLimits uint64 // packets per second from a single topic (default 0, no limits)
SizeIPLimits uint64 // bytes per second from a single IP (default 0, no limits)
SizePeerIDLimits uint64 // bytes per second from a single peer ID (default 0, no limits)
SizeTopicLimits uint64 // bytes per second from a single topic (default 0, no limits)
BytesIPLimits uint64 // bytes per second from a single IP (default 0, no limits)
BytesPeerIDLimits uint64 // bytes per second from a single peer ID (default 0, no limits)
BytesTopicLimits uint64 // bytes per second from a single topic (default 0, no limits)
}
func (r RateLimits) IsZero() bool {
@ -107,6 +107,8 @@ func (h *DropPeerRateLimiterHandler) ExceedIPLimit() error {
type PeerRateLimiterConfig struct {
PacketLimitPerSecIP int64
PacketLimitPerSecPeerID int64
BytesLimitPerSecIP int64
BytesLimitPerSecPeerID int64
WhitelistedIPs []string
WhitelistedPeerIDs []enode.ID
}
@ -114,6 +116,8 @@ type PeerRateLimiterConfig struct {
var defaultPeerRateLimiterConfig = PeerRateLimiterConfig{
PacketLimitPerSecIP: 10,
PacketLimitPerSecPeerID: 5,
BytesLimitPerSecIP: 1048576, // 2MB
BytesLimitPerSecPeerID: 1048576, // 2MB
WhitelistedIPs: nil,
WhitelistedPeerIDs: nil,
}
@ -123,9 +127,15 @@ type PeerRateLimiter struct {
packetPeerIDThrottler *tb.Throttler
packetIpThrottler *tb.Throttler
bytesPeerIDThrottler *tb.Throttler
bytesIpThrottler *tb.Throttler
PacketLimitPerSecIP int64
PacketLimitPerSecPeerID int64
BytesLimitPerSecIP int64
BytesLimitPerSecPeerID int64
whitelistedPeerIDs []enode.ID
whitelistedIPs []string
@ -141,8 +151,12 @@ func NewPeerRateLimiter(cfg *PeerRateLimiterConfig, handlers ...RateLimiterHandl
return &PeerRateLimiter{
packetPeerIDThrottler: tb.NewThrottler(time.Millisecond * 100),
packetIpThrottler: tb.NewThrottler(time.Millisecond * 100),
bytesPeerIDThrottler: tb.NewThrottler(time.Millisecond * 100),
bytesIpThrottler: tb.NewThrottler(time.Millisecond * 100),
PacketLimitPerSecIP: cfg.PacketLimitPerSecIP,
PacketLimitPerSecPeerID: cfg.PacketLimitPerSecPeerID,
BytesLimitPerSecIP: cfg.BytesLimitPerSecIP,
BytesLimitPerSecPeerID: cfg.BytesLimitPerSecPeerID,
whitelistedPeerIDs: cfg.WhitelistedPeerIDs,
whitelistedIPs: cfg.WhitelistedIPs,
handlers: handlers,
@ -181,7 +195,7 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL
// as IP() might return a nil value
ip = p.IP().String()
}
if halted := r.throttleIP(ip); halted {
if halted := r.throttleIP(ip, packet.Size); halted {
for _, h := range r.handlers {
if err := h.ExceedIPLimit(); err != nil {
errC <- fmt.Errorf("exceed rate limit by IP: %v", err)
@ -194,7 +208,7 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL
if p != nil {
peerID = p.ID()
}
if halted := r.throttlePeer(peerID); halted {
if halted := r.throttlePeer(peerID, packet.Size); halted {
for _, h := range r.handlers {
if err := h.ExceedPeerLimit(); err != nil {
errC <- fmt.Errorf("exceeded rate limit by peer: %v", err)
@ -232,30 +246,45 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL
return <-errC
}
// throttleIP throttles a number of packets incoming from a given IP.
// It allows 10 packets per second.
func (r *PeerRateLimiter) throttleIP(ip string) bool {
if r.PacketLimitPerSecIP == 0 {
return false
}
// throttleIP throttles packets incoming from a given IP.
func (r *PeerRateLimiter) throttleIP(ip string, size uint32) bool {
if stringSliceContains(r.whitelistedIPs, ip) {
return false
}
return r.packetIpThrottler.Halt(ip, 1, r.PacketLimitPerSecIP)
var packetLimiterResponse bool
var bytesLimiterResponse bool
if r.PacketLimitPerSecIP != 0 {
packetLimiterResponse = r.packetIpThrottler.Halt(ip, 1, r.PacketLimitPerSecIP)
}
if r.BytesLimitPerSecIP != 0 {
bytesLimiterResponse = r.bytesIpThrottler.Halt(ip, int64(size), r.BytesLimitPerSecIP)
}
// throttlePeer throttles a number of packets incoming from a peer.
// It allows 3 packets per second.
func (r *PeerRateLimiter) throttlePeer(peerID []byte) bool {
if r.PacketLimitPerSecIP == 0 {
return false
return packetLimiterResponse || bytesLimiterResponse
}
// throttlePeer throttles packets incoming from a peer.
func (r *PeerRateLimiter) throttlePeer(peerID []byte, size uint32) bool {
var id enode.ID
copy(id[:], peerID)
if enodeIDSliceContains(r.whitelistedPeerIDs, id) {
return false
}
return r.packetPeerIDThrottler.Halt(id.String(), 1, r.PacketLimitPerSecPeerID)
var packetLimiterResponse bool
var bytesLimiterResponse bool
if r.PacketLimitPerSecPeerID != 0 {
packetLimiterResponse = r.packetPeerIDThrottler.Halt(id.String(), 1, r.PacketLimitPerSecPeerID)
}
if r.BytesLimitPerSecPeerID != 0 {
bytesLimiterResponse = r.bytesPeerIDThrottler.Halt(id.String(), int64(size), r.BytesLimitPerSecPeerID)
}
return packetLimiterResponse || bytesLimiterResponse
}
func stringSliceContains(s []string, searched string) bool {

View File

@ -70,14 +70,14 @@ func TestPeerRateLimiterDecorator(t *testing.T) {
func TestPeerLimiterThrottlingWithZeroLimit(t *testing.T) {
r := NewPeerRateLimiter(&PeerRateLimiterConfig{}, &mockRateLimiterHandler{})
for i := 0; i < 1000; i++ {
throttle := r.throttleIP("<nil>")
throttle := r.throttleIP("<nil>", 0)
require.False(t, throttle)
throttle = r.throttlePeer([]byte{0x01, 0x02, 0x03})
throttle = r.throttlePeer([]byte{0x01, 0x02, 0x03}, 0)
require.False(t, throttle)
}
}
func TestPeerLimiterHandler(t *testing.T) {
func TestPeerPacketLimiterHandler(t *testing.T) {
h := &mockRateLimiterHandler{}
r := NewPeerRateLimiter(nil, h)
p := &TestWakuPeer{
@ -108,15 +108,61 @@ func TestPeerLimiterHandler(t *testing.T) {
<-done
require.EqualValues(t, 100-defaultPeerRateLimiterConfig.LimitPerSecIP, h.exceedIPLimit)
require.EqualValues(t, 100-defaultPeerRateLimiterConfig.LimitPerSecPeerID, h.exceedPeerLimit)
require.EqualValues(t, 100-defaultPeerRateLimiterConfig.PacketLimitPerSecIP, h.exceedIPLimit)
require.EqualValues(t, 100-defaultPeerRateLimiterConfig.PacketLimitPerSecPeerID, h.exceedPeerLimit)
}
func TestPeerLimiterHandlerWithWhitelisting(t *testing.T) {
func TestPeerBytesLimiterHandler(t *testing.T) {
h := &mockRateLimiterHandler{}
r := NewPeerRateLimiter(&PeerRateLimiterConfig{
LimitPerSecIP: 1,
LimitPerSecPeerID: 1,
BytesLimitPerSecIP: 30,
BytesLimitPerSecPeerID: 30,
}, h)
p := &TestWakuPeer{
peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil),
}
rw1, rw2 := p2p.MsgPipe()
count := 6
go func() {
err := echoMessages(r, p, rw2)
require.NoError(t, err)
}()
done := make(chan struct{})
go func() {
for i := 0; i < count; i++ {
msg, err := rw1.ReadMsg()
require.NoError(t, err)
require.EqualValues(t, 101, msg.Code)
msg.Discard()
}
close(done)
}()
for i := 0; i < count; i++ {
payload := make([]byte, 10)
msg := p2p.Msg{
Code: 101,
Size: uint32(len(payload)),
Payload: bytes.NewReader(payload),
}
err := rw1.WriteMsg(msg)
require.NoError(t, err)
}
<-done
require.EqualValues(t, 3, h.exceedIPLimit)
require.EqualValues(t, 3, h.exceedPeerLimit)
}
func TestPeerPacketLimiterHandlerWithWhitelisting(t *testing.T) {
h := &mockRateLimiterHandler{}
r := NewPeerRateLimiter(&PeerRateLimiterConfig{
PacketLimitPerSecIP: 1,
PacketLimitPerSecPeerID: 1,
WhitelistedIPs: []string{"<nil>"}, // no IP is represented as <nil> string
WhitelistedPeerIDs: []enode.ID{{0xaa, 0xbb, 0xcc}},
}, h)

View File

@ -402,6 +402,8 @@ func (w *Waku) RateLimits() common.RateLimits {
return common.RateLimits{
PacketIPLimits: uint64(w.rateLimiter.PacketLimitPerSecIP),
PacketPeerIDLimits: uint64(w.rateLimiter.PacketLimitPerSecPeerID),
BytesIPLimits: uint64(w.rateLimiter.BytesLimitPerSecIP),
BytesPeerIDLimits: uint64(w.rateLimiter.BytesLimitPerSecPeerID),
}
}