Drop peers when rate limit is exceeded (#1732)
This commit is contained in:
parent
f855228010
commit
4f45bceb9a
4
go.sum
4
go.sum
|
@ -206,7 +206,6 @@ github.com/gorilla/mux v1.7.1/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
|
|||
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
|
||||
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/goware/modvendor v0.1.0/go.mod h1:iTv+KoD55Rr5NW/P2WlDEJQ2Wa7+LRblD6/u/xJl/i0=
|
||||
github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6/go.mod h1:Au3iQ8DvDis8hZ4q2OzRcaKYlAsPt+fYvib5q4nIqu4=
|
||||
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
|
||||
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
|
||||
|
@ -296,7 +295,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
|||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kshvakov/clickhouse v1.3.5/go.mod h1:DMzX7FxRymoNkVgizH0DWAL8Cur7wHLgx3MUnGwJqpE=
|
||||
github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw=
|
||||
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
|
||||
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
|
||||
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
|
@ -425,8 +423,6 @@ github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/
|
|||
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
||||
github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o=
|
||||
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||
github.com/mattn/go-zglob v0.0.0-20180803001819-2ea3427bfa53/go.mod h1:9fxibJccNxU2cnpIKLRRFA7zX7qhkJIQWBb449FYHOo=
|
||||
github.com/mattn/go-zglob v0.0.1/go.mod h1:9fxibJccNxU2cnpIKLRRFA7zX7qhkJIQWBb449FYHOo=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
|
||||
|
|
|
@ -499,12 +499,15 @@ func whisperRateLimiter(whisperConfig *params.WhisperConfig, clusterConfig *para
|
|||
peerIDs = append(peerIDs, item.ID())
|
||||
}
|
||||
return whisper.NewPeerRateLimiter(
|
||||
&whisper.MetricsRateLimiterHandler{},
|
||||
&whisper.PeerRateLimiterConfig{
|
||||
LimitPerSecIP: whisperConfig.RateLimitIP,
|
||||
LimitPerSecPeerID: whisperConfig.RateLimitPeerID,
|
||||
WhitelistedIPs: ips,
|
||||
WhitelistedPeerIDs: peerIDs,
|
||||
},
|
||||
&whisper.MetricsRateLimiterHandler{},
|
||||
&whisper.DropPeerRateLimiterHandler{
|
||||
Tolerance: whisperConfig.RateLimitTolerance,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
|
|
@ -99,6 +99,11 @@ type WhisperConfig struct {
|
|||
// RateLimitPeerID sets the limit on the number of messages per second
|
||||
// from a given peer ID.
|
||||
RateLimitPeerID int64
|
||||
|
||||
// RateLimitTolerance is a number of how many a limit must be exceeded
|
||||
// in order to drop a peer.
|
||||
// If equal to 0, the peers are never dropped.
|
||||
RateLimitTolerance int64
|
||||
}
|
||||
|
||||
type DatabaseConfig struct {
|
||||
|
|
|
@ -8,6 +8,8 @@ require (
|
|||
github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea
|
||||
github.com/ethereum/go-ethereum v1.9.5
|
||||
github.com/gorilla/websocket v1.4.1 // indirect
|
||||
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 // indirect
|
||||
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect
|
||||
github.com/stretchr/objx v0.2.0 // indirect
|
||||
github.com/stretchr/testify v1.3.0
|
||||
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2
|
||||
|
|
|
@ -4,6 +4,7 @@ github.com/Azure/azure-storage-blob-go v0.0.0-20180712005634-eaae161d9d5e/go.mod
|
|||
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
|
||||
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
|
||||
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
||||
github.com/allegro/bigcache v0.0.0-20190218064605-e24eb225f156 h1:hh7BAWFHv41r0gce0KRYtDJpL4erKfmB1/mpgoSADeI=
|
||||
github.com/allegro/bigcache v0.0.0-20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
|
||||
github.com/aristanetworks/goarista v0.0.0-20170210015632-ea17b1a17847 h1:rtI0fD4oG/8eVokGVPYJEW1F88p1ZNgXiEIs9thEE4A=
|
||||
github.com/aristanetworks/goarista v0.0.0-20170210015632-ea17b1a17847/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ=
|
||||
|
@ -91,7 +92,13 @@ github.com/status-im/go-ethereum v1.9.5-status.6 h1:ytuTO1yBIAuTVRtRQoc2mrdyngtP
|
|||
github.com/status-im/go-ethereum v1.9.5-status.6/go.mod h1:08JvQWE+IOnAFSe4UD4ACLNe2fDd9XmWMCq5Yzy9mk0=
|
||||
github.com/status-im/go-ethereum v1.9.5-status.7 h1:DKH1GiF52LwaZaw6YDBliFEgm/JDsbIT+hn7ph6X94Q=
|
||||
github.com/status-im/go-ethereum v1.9.5-status.7/go.mod h1:YyH5DKB6+z+Vaya7eIm67pnuPZ1oiUMbbsZW41ktN0g=
|
||||
github.com/status-im/status-go v0.37.3 h1:94/bOA8qrEIgWd23mSLN39SwUJwCu2TPQFV2HzSI2ZE=
|
||||
github.com/status-im/status-go v0.38.0 h1:3toC1ToY48wbRBVt7CMWSSG5FZAcPPMlnt0+G6iCbcE=
|
||||
github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU=
|
||||
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE=
|
||||
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw=
|
||||
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM=
|
||||
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3/go.mod h1:hpGUWaI9xL8pRQCTXQgocU38Qw1g0Us7n5PxxTwTCYU=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
|
||||
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
|
||||
|
|
|
@ -2,12 +2,12 @@ package whisper
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/tsenart/tb"
|
||||
)
|
||||
|
||||
|
@ -23,6 +23,33 @@ type MetricsRateLimiterHandler struct{}
|
|||
func (MetricsRateLimiterHandler) ExceedPeerLimit() error { rateLimiterPeerExceeded.Inc(1); return nil }
|
||||
func (MetricsRateLimiterHandler) ExceedIPLimit() error { rateLimiterIPExceeded.Inc(1); return nil }
|
||||
|
||||
var ErrRateLimitExceeded = errors.New("rate limit has been exceeded")
|
||||
|
||||
type DropPeerRateLimiterHandler struct {
|
||||
// Tolerance is a number of how many a limit must be exceeded
|
||||
// in order to drop a peer.
|
||||
Tolerance int64
|
||||
|
||||
peerLimitExceeds int64
|
||||
ipLimitExceeds int64
|
||||
}
|
||||
|
||||
func (h *DropPeerRateLimiterHandler) ExceedPeerLimit() error {
|
||||
h.peerLimitExceeds++
|
||||
if h.Tolerance > 0 && h.peerLimitExceeds >= h.Tolerance {
|
||||
return ErrRateLimitExceeded
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *DropPeerRateLimiterHandler) ExceedIPLimit() error {
|
||||
h.ipLimitExceeds++
|
||||
if h.Tolerance > 0 && h.ipLimitExceeds >= h.Tolerance {
|
||||
return ErrRateLimitExceeded
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type PeerRateLimiterConfig struct {
|
||||
LimitPerSecIP int64
|
||||
LimitPerSecPeerID int64
|
||||
|
@ -47,10 +74,10 @@ type PeerRateLimiter struct {
|
|||
whitelistedPeerIDs []enode.ID
|
||||
whitelistedIPs []string
|
||||
|
||||
handler RateLimiterHandler
|
||||
handlers []RateLimiterHandler
|
||||
}
|
||||
|
||||
func NewPeerRateLimiter(handler RateLimiterHandler, cfg *PeerRateLimiterConfig) *PeerRateLimiter {
|
||||
func NewPeerRateLimiter(cfg *PeerRateLimiterConfig, handlers ...RateLimiterHandler) *PeerRateLimiter {
|
||||
if cfg == nil {
|
||||
copy := defaultPeerRateLimiterConfig
|
||||
cfg = ©
|
||||
|
@ -63,7 +90,7 @@ func NewPeerRateLimiter(handler RateLimiterHandler, cfg *PeerRateLimiterConfig)
|
|||
limitPerSecPeerID: cfg.LimitPerSecPeerID,
|
||||
whitelistedPeerIDs: cfg.WhitelistedPeerIDs,
|
||||
whitelistedIPs: cfg.WhitelistedIPs,
|
||||
handler: handler,
|
||||
handlers: handlers,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -89,9 +116,11 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
|
|||
ip = p.peer.Node().IP().String()
|
||||
}
|
||||
if halted := r.throttleIP(ip); halted {
|
||||
if err := r.handler.ExceedIPLimit(); err != nil {
|
||||
errC <- fmt.Errorf("exceeded rate limit by IP: %v", err)
|
||||
return
|
||||
for _, h := range r.handlers {
|
||||
if err := h.ExceedIPLimit(); err != nil {
|
||||
errC <- fmt.Errorf("exceed rate limit by IP: %w", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,9 +129,11 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
|
|||
peerID = p.ID()
|
||||
}
|
||||
if halted := r.throttlePeer(peerID); halted {
|
||||
if err := r.handler.ExceedPeerLimit(); err != nil {
|
||||
errC <- fmt.Errorf("exceeded rate limit by peer: %v", err)
|
||||
return
|
||||
for _, h := range r.handlers {
|
||||
if err := h.ExceedPeerLimit(); err != nil {
|
||||
errC <- fmt.Errorf("exceeded rate limit by peer: %w", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,12 +2,12 @@ package whisper
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/tsenart/tb"
|
||||
)
|
||||
|
||||
|
@ -23,6 +23,33 @@ type MetricsRateLimiterHandler struct{}
|
|||
func (MetricsRateLimiterHandler) ExceedPeerLimit() error { rateLimiterPeerExceeded.Inc(1); return nil }
|
||||
func (MetricsRateLimiterHandler) ExceedIPLimit() error { rateLimiterIPExceeded.Inc(1); return nil }
|
||||
|
||||
var ErrRateLimitExceeded = errors.New("rate limit has been exceeded")
|
||||
|
||||
type DropPeerRateLimiterHandler struct {
|
||||
// Tolerance is a number of how many a limit must be exceeded
|
||||
// in order to drop a peer.
|
||||
Tolerance int64
|
||||
|
||||
peerLimitExceeds int64
|
||||
ipLimitExceeds int64
|
||||
}
|
||||
|
||||
func (h *DropPeerRateLimiterHandler) ExceedPeerLimit() error {
|
||||
h.peerLimitExceeds++
|
||||
if h.Tolerance > 0 && h.peerLimitExceeds >= h.Tolerance {
|
||||
return ErrRateLimitExceeded
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *DropPeerRateLimiterHandler) ExceedIPLimit() error {
|
||||
h.ipLimitExceeds++
|
||||
if h.Tolerance > 0 && h.ipLimitExceeds >= h.Tolerance {
|
||||
return ErrRateLimitExceeded
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type PeerRateLimiterConfig struct {
|
||||
LimitPerSecIP int64
|
||||
LimitPerSecPeerID int64
|
||||
|
@ -47,10 +74,10 @@ type PeerRateLimiter struct {
|
|||
whitelistedPeerIDs []enode.ID
|
||||
whitelistedIPs []string
|
||||
|
||||
handler RateLimiterHandler
|
||||
handlers []RateLimiterHandler
|
||||
}
|
||||
|
||||
func NewPeerRateLimiter(handler RateLimiterHandler, cfg *PeerRateLimiterConfig) *PeerRateLimiter {
|
||||
func NewPeerRateLimiter(cfg *PeerRateLimiterConfig, handlers ...RateLimiterHandler) *PeerRateLimiter {
|
||||
if cfg == nil {
|
||||
copy := defaultPeerRateLimiterConfig
|
||||
cfg = ©
|
||||
|
@ -63,7 +90,7 @@ func NewPeerRateLimiter(handler RateLimiterHandler, cfg *PeerRateLimiterConfig)
|
|||
limitPerSecPeerID: cfg.LimitPerSecPeerID,
|
||||
whitelistedPeerIDs: cfg.WhitelistedPeerIDs,
|
||||
whitelistedIPs: cfg.WhitelistedIPs,
|
||||
handler: handler,
|
||||
handlers: handlers,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -89,9 +116,11 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
|
|||
ip = p.peer.Node().IP().String()
|
||||
}
|
||||
if halted := r.throttleIP(ip); halted {
|
||||
if err := r.handler.ExceedIPLimit(); err != nil {
|
||||
errC <- fmt.Errorf("exceeded rate limit by IP: %v", err)
|
||||
return
|
||||
for _, h := range r.handlers {
|
||||
if err := h.ExceedIPLimit(); err != nil {
|
||||
errC <- fmt.Errorf("exceed rate limit by IP: %w", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,9 +129,11 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
|
|||
peerID = p.ID()
|
||||
}
|
||||
if halted := r.throttlePeer(peerID); halted {
|
||||
if err := r.handler.ExceedPeerLimit(); err != nil {
|
||||
errC <- fmt.Errorf("exceeded rate limit by peer: %v", err)
|
||||
return
|
||||
for _, h := range r.handlers {
|
||||
if err := h.ExceedPeerLimit(); err != nil {
|
||||
errC <- fmt.Errorf("exceeded rate limit by peer: %w", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ func TestPeerRateLimiterDecorator(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
r := NewPeerRateLimiter(&mockRateLimiterHandler{}, nil)
|
||||
r := NewPeerRateLimiter(nil, &mockRateLimiterHandler{})
|
||||
err := r.decorate(nil, out, runLoop)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -49,7 +49,7 @@ func TestPeerRateLimiterDecorator(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestPeerLimiterThrottlingWithZeroLimit(t *testing.T) {
|
||||
r := NewPeerRateLimiter(&mockRateLimiterHandler{}, &PeerRateLimiterConfig{})
|
||||
r := NewPeerRateLimiter(&PeerRateLimiterConfig{}, &mockRateLimiterHandler{})
|
||||
for i := 0; i < 1000; i++ {
|
||||
throttle := r.throttleIP("<nil>")
|
||||
require.False(t, throttle)
|
||||
|
@ -60,7 +60,7 @@ func TestPeerLimiterThrottlingWithZeroLimit(t *testing.T) {
|
|||
|
||||
func TestPeerLimiterHandler(t *testing.T) {
|
||||
h := &mockRateLimiterHandler{}
|
||||
r := NewPeerRateLimiter(h, nil)
|
||||
r := NewPeerRateLimiter(nil, h)
|
||||
p := &Peer{
|
||||
peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil),
|
||||
}
|
||||
|
@ -95,12 +95,12 @@ func TestPeerLimiterHandler(t *testing.T) {
|
|||
|
||||
func TestPeerLimiterHandlerWithWhitelisting(t *testing.T) {
|
||||
h := &mockRateLimiterHandler{}
|
||||
r := NewPeerRateLimiter(h, &PeerRateLimiterConfig{
|
||||
r := NewPeerRateLimiter(&PeerRateLimiterConfig{
|
||||
LimitPerSecIP: 1,
|
||||
LimitPerSecPeerID: 1,
|
||||
WhitelistedIPs: []string{"<nil>"}, // no IP is represented as <nil> string
|
||||
WhitelistedPeerIDs: []enode.ID{enode.ID{0xaa, 0xbb, 0xcc}},
|
||||
})
|
||||
}, h)
|
||||
p := &Peer{
|
||||
peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil),
|
||||
}
|
||||
|
|
|
@ -1546,7 +1546,7 @@ func TestRateLimiterIntegration(t *testing.T) {
|
|||
MaxMessageSize: 10 << 20,
|
||||
}
|
||||
w := New(conf)
|
||||
w.SetRateLimiter(NewPeerRateLimiter(&MetricsRateLimiterHandler{}, nil))
|
||||
w.SetRateLimiter(NewPeerRateLimiter(nil, &MetricsRateLimiterHandler{}))
|
||||
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
|
||||
rw1, rw2 := p2p.MsgPipe()
|
||||
defer func() {
|
||||
|
|
Loading…
Reference in New Issue