Add whitelisting and setting rate limiter from outside (#36)

This commit is contained in:
Adam Babik 2019-11-15 13:32:20 +01:00 committed by GitHub
parent a527df1568
commit c11e8d8fb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 45 deletions

1
go.mod
View File

@ -8,7 +8,6 @@ require (
github.com/btcsuite/btcd v0.0.0-20181013004428-67e573d211ac // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set v1.7.1
github.com/dvyukov/go-fuzz v0.0.0-20191022152526-8cb203812681 // indirect
github.com/ethereum/go-ethereum v1.9.5
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect

2
go.sum
View File

@ -22,8 +22,6 @@ github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9r
github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
github.com/dgrijalva/jwt-go v0.0.0-20170201225849-2268707a8f08/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/docker/docker v0.0.0-20180625184442-8e610b2b55bf/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/dvyukov/go-fuzz v0.0.0-20191022152526-8cb203812681 h1:3WV5aRRj1ELP3RcLlBp/v0WJTuy47OQMkL9GIQq8QEE=
github.com/dvyukov/go-fuzz v0.0.0-20191022152526-8cb203812681/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw=
github.com/edsrzf/mmap-go v0.0.0-20160512033002-935e0e8a636c/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elastic/gosigar v0.0.0-20180330100440-37f05ff46ffa h1:o8OuEkracbk3qH6GvlI6XpEN1HTSxkzOG42xZpfDv/s=
github.com/elastic/gosigar v0.0.0-20180330100440-37f05ff46ffa/go.mod h1:cdorVVzy1fhmEqmtgqkoE3bYtCfSCkVyjTyCIo22xvs=

View File

@ -15,6 +15,7 @@ var (
envelopeSizeMeter = metrics.NewRegisteredMeter("whisper/envelopeSize", nil)
// rate limiter metrics
rateLimiterProcessed = metrics.NewRegisteredCounter("whisper/rateLimiterProcessed", nil)
rateLimiterIPExceeded = metrics.NewRegisteredCounter("whisper/rateLimiterIPExceeded", nil)
rateLimiterPeerExceeded = metrics.NewRegisteredCounter("whisper/rateLimiterPeerExceeded", nil)
)

View File

@ -1,6 +1,7 @@
package whisperv6
import (
"bytes"
"fmt"
"time"
@ -10,39 +11,63 @@ import (
"github.com/tsenart/tb"
)
const (
rateLimitPerSecIP = 10
rateLimitPerSecPeerID = 3
)
type runLoop func(p *Peer, rw p2p.MsgReadWriter) error
type rateLimiterHandler interface {
type RateLimiterHandler interface {
ExceedPeerLimit()
ExceedIPLimit()
}
type metricsRateLimiterHandler struct{}
type MetricsRateLimiterHandler struct{}
func (metricsRateLimiterHandler) ExceedPeerLimit() { rateLimiterPeerExceeded.Inc(1) }
func (metricsRateLimiterHandler) ExceedIPLimit() { rateLimiterIPExceeded.Inc(1) }
func (MetricsRateLimiterHandler) ExceedPeerLimit() { rateLimiterPeerExceeded.Inc(1) }
func (MetricsRateLimiterHandler) ExceedIPLimit() { rateLimiterIPExceeded.Inc(1) }
type peerRateLimiter struct {
type PeerRateLimiterConfig struct {
LimitPerSecIP int64
LimitPerSecPeerID int64
WhitelistedIPs []string
WhitelistedPeerIDs []enode.ID
}
var defaultPeerRateLimiterConfig = PeerRateLimiterConfig{
LimitPerSecIP: 10,
LimitPerSecPeerID: 5,
WhitelistedIPs: nil,
WhitelistedPeerIDs: nil,
}
type PeerRateLimiter struct {
peerIDThrottler *tb.Throttler
ipThrottler *tb.Throttler
handler rateLimiterHandler
limitPerSecIP int64
limitPerSecPeerID int64
whitelistedPeerIDs []enode.ID
whitelistedIPs []string
handler RateLimiterHandler
}
func newPeerRateLimiter(handler rateLimiterHandler) *peerRateLimiter {
return &peerRateLimiter{
peerIDThrottler: tb.NewThrottler(time.Millisecond * 100),
ipThrottler: tb.NewThrottler(time.Millisecond * 100),
handler: handler,
func NewPeerRateLimiter(handler RateLimiterHandler, cfg *PeerRateLimiterConfig) *PeerRateLimiter {
if cfg == nil {
copy := defaultPeerRateLimiterConfig
cfg = &copy
}
return &PeerRateLimiter{
peerIDThrottler: tb.NewThrottler(time.Millisecond * 100),
ipThrottler: tb.NewThrottler(time.Millisecond * 100),
limitPerSecIP: cfg.LimitPerSecIP,
limitPerSecPeerID: cfg.LimitPerSecPeerID,
whitelistedPeerIDs: cfg.WhitelistedPeerIDs,
whitelistedIPs: cfg.WhitelistedIPs,
handler: handler,
}
}
func (r *peerRateLimiter) Decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoop) error {
func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoop) error {
in, out := p2p.MsgPipe()
defer in.Close()
defer out.Close()
@ -57,6 +82,8 @@ func (r *peerRateLimiter) Decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
return
}
rateLimiterProcessed.Inc(1)
var ip string
if p != nil && p.peer != nil {
ip = p.peer.Node().IP().String()
@ -73,8 +100,6 @@ func (r *peerRateLimiter) Decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
r.handler.ExceedPeerLimit()
}
// TODO: use whitelisting for cluster peers.
if err := in.WriteMsg(packet); err != nil {
errC <- fmt.Errorf("failed to write packet to pipe: %v", err)
return
@ -106,14 +131,38 @@ func (r *peerRateLimiter) Decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
// throttleIP throttles a number of messages incoming from a given IP.
// It allows 10 packets per second.
func (r *peerRateLimiter) throttleIP(ip string) bool {
return r.ipThrottler.Halt(ip, 1, rateLimitPerSecIP)
func (r *PeerRateLimiter) throttleIP(ip string) bool {
if stringSliceContains(r.whitelistedIPs, ip) {
return false
}
return r.ipThrottler.Halt(ip, 1, r.limitPerSecIP)
}
// throttlePeer throttles a number of messages incoming from a peer.
// It allows 3 packets per second.
func (r *peerRateLimiter) throttlePeer(peerID []byte) bool {
func (r *PeerRateLimiter) throttlePeer(peerID []byte) bool {
var id enode.ID
copy(id[:], peerID)
return r.peerIDThrottler.Halt(id.String(), 1, rateLimitPerSecPeerID)
if enodeIDSliceContains(r.whitelistedPeerIDs, id) {
return false
}
return r.peerIDThrottler.Halt(id.String(), 1, r.limitPerSecPeerID)
}
func stringSliceContains(s []string, searched string) bool {
for _, item := range s {
if item == searched {
return true
}
}
return false
}
func enodeIDSliceContains(s []enode.ID, searched enode.ID) bool {
for _, item := range s {
if bytes.Equal(item.Bytes(), searched.Bytes()) {
return true
}
}
return false
}

View File

@ -36,8 +36,8 @@ func TestPeerRateLimiterDecorator(t *testing.T) {
return nil
}
r := newPeerRateLimiter(&mockRateLimiterHandler{})
err := r.Decorate(nil, out, runLoop)
r := NewPeerRateLimiter(&mockRateLimiterHandler{}, nil)
err := r.decorate(nil, out, runLoop)
require.NoError(t, err)
receivedMsg := <-messages
@ -50,7 +50,7 @@ func TestPeerRateLimiterDecorator(t *testing.T) {
func TestPeerLimiterHandler(t *testing.T) {
h := &mockRateLimiterHandler{}
r := newPeerRateLimiter(h)
r := NewPeerRateLimiter(h, nil)
p := &Peer{
peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil),
}
@ -58,18 +58,7 @@ func TestPeerLimiterHandler(t *testing.T) {
count := 100
go func() {
err := r.Decorate(p, rw2, func(p *Peer, rw p2p.MsgReadWriter) error {
for {
msg, err := rw.ReadMsg()
if err != nil {
return err
}
err = rw.WriteMsg(msg)
if err != nil {
return err
}
}
})
err := echoMessages(r, p, rw2)
require.NoError(t, err)
}()
@ -90,8 +79,63 @@ func TestPeerLimiterHandler(t *testing.T) {
<-done
require.Equal(t, 100-rateLimitPerSecIP, h.exceedIPLimit)
require.Equal(t, 100-rateLimitPerSecPeerID, h.exceedPeerLimit)
require.EqualValues(t, 100-defaultPeerRateLimiterConfig.LimitPerSecIP, h.exceedIPLimit)
require.EqualValues(t, 100-defaultPeerRateLimiterConfig.LimitPerSecPeerID, h.exceedPeerLimit)
}
func TestPeerLimiterHandlerWithWhitelisting(t *testing.T) {
h := &mockRateLimiterHandler{}
r := NewPeerRateLimiter(h, &PeerRateLimiterConfig{
LimitPerSecIP: 1,
LimitPerSecPeerID: 1,
WhitelistedIPs: []string{"<nil>"}, // no IP is represented as <nil> string
WhitelistedPeerIDs: []enode.ID{enode.ID{0xaa, 0xbb, 0xcc}},
})
p := &Peer{
peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil),
}
rw1, rw2 := p2p.MsgPipe()
count := 100
go func() {
err := echoMessages(r, p, rw2)
require.NoError(t, err)
}()
done := make(chan struct{})
go func() {
for i := 0; i < count; i++ {
msg, err := rw1.ReadMsg()
require.NoError(t, err)
require.EqualValues(t, 101, msg.Code)
}
close(done)
}()
for i := 0; i < count; i += 1 {
err := rw1.WriteMsg(p2p.Msg{Code: 101})
require.NoError(t, err)
}
<-done
require.Equal(t, 0, h.exceedIPLimit)
require.Equal(t, 0, h.exceedPeerLimit)
}
func echoMessages(r *PeerRateLimiter, p *Peer, rw p2p.MsgReadWriter) error {
return r.decorate(p, rw, func(p *Peer, rw p2p.MsgReadWriter) error {
for {
msg, err := rw.ReadMsg()
if err != nil {
return err
}
err = rw.WriteMsg(msg)
if err != nil {
return err
}
}
})
}
type mockRateLimiterHandler struct {

View File

@ -105,6 +105,8 @@ type Whisper struct {
mailServer MailServer // MailServer interface
rateLimiter *PeerRateLimiter
messageStoreFabric func() MessageStore
envelopeFeed event.Feed
@ -335,6 +337,10 @@ func (whisper *Whisper) SetLightClientMode(v bool) {
whisper.settings.Store(lightClientModeIdx, v)
}
func (whisper *Whisper) SetRateLimiter(r *PeerRateLimiter) {
whisper.rateLimiter = r
}
//LightClientMode indicates is this node is light client (does not forward any messages)
func (whisper *Whisper) LightClientMode() bool {
val, exist := whisper.settings.Load(lightClientModeIdx)
@ -887,8 +893,10 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
whisperPeer.start()
defer whisperPeer.stop()
r := newPeerRateLimiter(&metricsRateLimiterHandler{})
return r.Decorate(whisperPeer, rw, whisper.runMessageLoop)
if whisper.rateLimiter != nil {
return whisper.rateLimiter.decorate(whisperPeer, rw, whisper.runMessageLoop)
}
return whisper.runMessageLoop(whisperPeer, rw)
}
func (whisper *Whisper) sendConfirmation(peer enode.ID, rw p2p.MsgReadWriter, data []byte,

View File

@ -1540,6 +1540,42 @@ func TestHandleP2PSyncResponseCode(t *testing.T) {
mailMock.AssertNumberOfCalls(t, "Archive", envelopesCount)
}
func TestRateLimiterIntegration(t *testing.T) {
conf := &Config{
MinimumAcceptedPOW: 0,
MaxMessageSize: 10 << 20,
}
w := New(conf)
w.SetRateLimiter(NewPeerRateLimiter(&MetricsRateLimiterHandler{}, nil))
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
rw1, rw2 := p2p.MsgPipe()
defer func() {
rw1.Close()
rw2.Close()
}()
errorc := make(chan error, 1)
go func() {
err := w.HandlePeer(p, rw2)
errorc <- err
}()
require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true}))
require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, true))
envelope := Envelope{
Expiry: uint32(time.Now().Unix()),
TTL: 10,
Topic: TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
data, err := rlp.EncodeToBytes([]*Envelope{&envelope})
require.NoError(t, err)
hash := crypto.Keccak256Hash(data)
require.NoError(t, p2p.SendItems(rw1, messagesCode, &envelope))
require.NoError(t, p2p.ExpectMsg(rw1, messageResponseCode, NewMessagesResponse(hash, nil)))
}
type stubMailServer struct{}
func (stubMailServer) Archive(*Envelope) {}