Implement rate limits exchange (#1729)

This commit is contained in:
Adam Babik 2019-12-11 14:11:29 +01:00 committed by GitHub
parent 203f29b13e
commit 52dd835692
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 100 additions and 11 deletions

View File

@ -429,8 +429,25 @@ func (s *WhisperNodeMockSuite) SetupTest() {
errorc <- err errorc <- err
}() }()
whisperWrapper := gethbridge.NewGethWhisperWrapper(w) whisperWrapper := gethbridge.NewGethWhisperWrapper(w)
s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{whisper.ProtocolVersion, math.Float64bits(whisperWrapper.MinPow()), whisperWrapper.BloomFilter(), false, true})) s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{
s.Require().NoError(p2p.SendItems(rw1, statusCode, whisper.ProtocolVersion, whisper.ProtocolVersion, math.Float64bits(whisperWrapper.MinPow()), whisperWrapper.BloomFilter(), true, true)) whisper.ProtocolVersion,
math.Float64bits(whisperWrapper.MinPow()),
whisperWrapper.BloomFilter(),
false,
true,
whisper.RateLimits{},
}))
s.Require().NoError(p2p.SendItems(
rw1,
statusCode,
whisper.ProtocolVersion,
whisper.ProtocolVersion,
math.Float64bits(whisperWrapper.MinPow()),
whisperWrapper.BloomFilter(),
true,
true,
whisper.RateLimits{},
))
nodeWrapper := &testNodeWrapper{w: whisperWrapper} nodeWrapper := &testNodeWrapper{w: whisperWrapper}
s.localService = New(nodeWrapper, nil, nil, db, params.ShhextConfig{MailServerConfirmations: true, MaxMessageDeliveryAttempts: 3}) s.localService = New(nodeWrapper, nil, nil, db, params.ShhextConfig{MailServerConfirmations: true, MaxMessageDeliveryAttempts: 3})

View File

@ -55,6 +55,7 @@ const (
bloomFilterExCode = 3 // bloom filter exchange bloomFilterExCode = 3 // bloom filter exchange
batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received
messageResponseCode = 12 // includes confirmation for delivery and information about errors messageResponseCode = 12 // includes confirmation for delivery and information about errors
rateLimitingCode = 20 // includes peer's rate limiting settings
p2pSyncRequestCode = 123 // used to sync envelopes between two mail servers p2pSyncRequestCode = 123 // used to sync envelopes between two mail servers
p2pSyncResponseCode = 124 // used to sync envelopes between two mail servers p2pSyncResponseCode = 124 // used to sync envelopes between two mail servers
p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
@ -259,3 +260,15 @@ func ErrorToEnvelopeError(hash common.Hash, err error) EnvelopeError {
Description: err.Error(), Description: err.Error(),
} }
} }
// RateLimits contains information about rate limit settings.
// It is exchanged using rateLimitingCode packet or in the handshake.
type RateLimits struct {
IPLimits uint64 // messages per second from a single IP (default 0, no limits)
PeerIDLimits uint64 // messages per second from a single peer ID (default 0, no limits)
TopicLimits uint64 // messages per second from a single topic (default 0, no limits)
}
func (r RateLimits) IsZero() bool {
return r == (RateLimits{})
}

View File

@ -89,6 +89,7 @@ github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521 h1:3hxavr+IHMsQBrYUPQM
github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521/go.mod h1:RvLn4FgxWubrpZHtQLnOf6EwhN2hEMusxZOhcW9H3UQ= github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521/go.mod h1:RvLn4FgxWubrpZHtQLnOf6EwhN2hEMusxZOhcW9H3UQ=
github.com/status-im/go-ethereum v1.9.5-status.6 h1:ytuTO1yBIAuTVRtRQoc2mrdyngtP+XOQ9IHIibbz7/I= github.com/status-im/go-ethereum v1.9.5-status.6 h1:ytuTO1yBIAuTVRtRQoc2mrdyngtP+XOQ9IHIibbz7/I=
github.com/status-im/go-ethereum v1.9.5-status.6/go.mod h1:08JvQWE+IOnAFSe4UD4ACLNe2fDd9XmWMCq5Yzy9mk0= github.com/status-im/go-ethereum v1.9.5-status.6/go.mod h1:08JvQWE+IOnAFSe4UD4ACLNe2fDd9XmWMCq5Yzy9mk0=
github.com/status-im/go-ethereum v1.9.5-status.7 h1:DKH1GiF52LwaZaw6YDBliFEgm/JDsbIT+hn7ph6X94Q=
github.com/status-im/go-ethereum v1.9.5-status.7/go.mod h1:YyH5DKB6+z+Vaya7eIm67pnuPZ1oiUMbbsZW41ktN0g= github.com/status-im/go-ethereum v1.9.5-status.7/go.mod h1:YyH5DKB6+z+Vaya7eIm67pnuPZ1oiUMbbsZW41ktN0g=
github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU= github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -108,6 +109,7 @@ golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf h1:fnPsqIDRbCSgumaMCRpoIoF2s4qxv0xSSS0BVZUE/ss= golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf h1:fnPsqIDRbCSgumaMCRpoIoF2s4qxv0xSSS0BVZUE/ss=
golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20191119213627-4f8c1d86b1ba h1:9bFeDpN3gTqNanMVqNcoR/pJQuP5uroC3t1D7eXozTE=
golang.org/x/crypto v0.0.0-20191119213627-4f8c1d86b1ba/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20191119213627-4f8c1d86b1ba/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=

View File

@ -43,6 +43,8 @@ type Peer struct {
bloomFilter []byte bloomFilter []byte
fullNode bool fullNode bool
confirmationsEnabled bool confirmationsEnabled bool
rateLimitsMu sync.Mutex
rateLimits RateLimits
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
@ -89,8 +91,9 @@ func (peer *Peer) handshake() error {
powConverted := math.Float64bits(pow) powConverted := math.Float64bits(pow)
bloom := peer.host.BloomFilter() bloom := peer.host.BloomFilter()
confirmationsEnabled := !peer.host.disableConfirmations confirmationsEnabled := !peer.host.disableConfirmations
rateLimits := peer.host.RateLimits()
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, confirmationsEnabled) errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, confirmationsEnabled, rateLimits)
}() }()
// Fetch the remote status packet and verify protocol match // Fetch the remote status packet and verify protocol match
@ -145,6 +148,13 @@ func (peer *Peer) handshake() error {
peer.confirmationsEnabled = confirmationsEnabled peer.confirmationsEnabled = confirmationsEnabled
} }
var rateLimits RateLimits
if err := s.Decode(&rateLimits); err != nil {
log.Info("rate limiting disabled", "err", err)
} else {
peer.setRateLimits(rateLimits)
}
if err := <-errc; err != nil { if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err) return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
} }
@ -270,6 +280,12 @@ func (peer *Peer) setBloomFilter(bloom []byte) {
} }
} }
func (peer *Peer) setRateLimits(r RateLimits) {
peer.rateLimitsMu.Lock()
peer.rateLimits = r
peer.rateLimitsMu.Unlock()
}
func MakeFullNodeBloom() []byte { func MakeFullNodeBloom() []byte {
bloom := make([]byte, BloomFilterSize) bloom := make([]byte, BloomFilterSize)
for i := 0; i < BloomFilterSize; i++ { for i := 0; i < BloomFilterSize; i++ {

View File

@ -551,6 +551,21 @@ func TestTwoLightPeerHandshakeError(t *testing.T) {
} }
} }
func TestRateLimitsInHandshake(t *testing.T) {
w1 := Whisper{}
rateLimits := RateLimits{IPLimits: 20, PeerIDLimits: 10}
p1 := newPeer(&w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), &rwStub{
payload: []interface{}{ProtocolVersion, uint64(123), make([]byte, BloomFilterSize), true, true, rateLimits},
})
err := p1.handshake()
if err != nil {
t.Fatal(err)
}
if p1.rateLimits != rateLimits {
t.Errorf("rate limits from handshake is not stored properly in Peer object")
}
}
type rwStub struct { type rwStub struct {
payload []interface{} payload []interface{}
} }

View File

@ -14,14 +14,14 @@ import (
type runLoop func(p *Peer, rw p2p.MsgReadWriter) error type runLoop func(p *Peer, rw p2p.MsgReadWriter) error
type RateLimiterHandler interface { type RateLimiterHandler interface {
ExceedPeerLimit() ExceedPeerLimit() error
ExceedIPLimit() ExceedIPLimit() error
} }
type MetricsRateLimiterHandler struct{} type MetricsRateLimiterHandler struct{}
func (MetricsRateLimiterHandler) ExceedPeerLimit() { rateLimiterPeerExceeded.Inc(1) } func (MetricsRateLimiterHandler) ExceedPeerLimit() error { rateLimiterPeerExceeded.Inc(1); return nil }
func (MetricsRateLimiterHandler) ExceedIPLimit() { rateLimiterIPExceeded.Inc(1) } func (MetricsRateLimiterHandler) ExceedIPLimit() error { rateLimiterIPExceeded.Inc(1); return nil }
type PeerRateLimiterConfig struct { type PeerRateLimiterConfig struct {
LimitPerSecIP int64 LimitPerSecIP int64
@ -89,7 +89,10 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
ip = p.peer.Node().IP().String() ip = p.peer.Node().IP().String()
} }
if halted := r.throttleIP(ip); halted { if halted := r.throttleIP(ip); halted {
r.handler.ExceedIPLimit() if err := r.handler.ExceedIPLimit(); err != nil {
errC <- fmt.Errorf("exceeded rate limit by IP: %v", err)
return
}
} }
var peerID []byte var peerID []byte
@ -97,7 +100,10 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
peerID = p.ID() peerID = p.ID()
} }
if halted := r.throttlePeer(peerID); halted { if halted := r.throttlePeer(peerID); halted {
r.handler.ExceedPeerLimit() if err := r.handler.ExceedPeerLimit(); err != nil {
errC <- fmt.Errorf("exceeded rate limit by peer: %v", err)
return
}
} }
if err := in.WriteMsg(packet); err != nil { if err := in.WriteMsg(packet); err != nil {

View File

@ -153,5 +153,5 @@ type mockRateLimiterHandler struct {
exceedIPLimit int exceedIPLimit int
} }
func (m *mockRateLimiterHandler) ExceedPeerLimit() { m.exceedPeerLimit += 1 } func (m *mockRateLimiterHandler) ExceedPeerLimit() error { m.exceedPeerLimit += 1; return nil }
func (m *mockRateLimiterHandler) ExceedIPLimit() { m.exceedIPLimit += 1 } func (m *mockRateLimiterHandler) ExceedIPLimit() error { m.exceedIPLimit += 1; return nil }

View File

@ -337,6 +337,8 @@ func (whisper *Whisper) SetLightClientMode(v bool) {
whisper.settings.Store(lightClientModeIdx, v) whisper.settings.Store(lightClientModeIdx, v)
} }
// SetRateLimiter sets an active rate limiter.
// It must be run before Whisper is started.
func (whisper *Whisper) SetRateLimiter(r *PeerRateLimiter) { func (whisper *Whisper) SetRateLimiter(r *PeerRateLimiter) {
whisper.rateLimiter = r whisper.rateLimiter = r
} }
@ -361,6 +363,17 @@ func (whisper *Whisper) LightClientModeConnectionRestricted() bool {
return v && ok return v && ok
} }
// RateLimiting returns RateLimits information.
func (whisper *Whisper) RateLimits() RateLimits {
if whisper.rateLimiter == nil {
return RateLimits{}
}
return RateLimits{
IPLimits: uint64(whisper.rateLimiter.limitPerSecIP),
PeerIDLimits: uint64(whisper.rateLimiter.limitPerSecPeerID),
}
}
func (whisper *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { func (whisper *Whisper) notifyPeersAboutPowRequirementChange(pow float64) {
arr := whisper.getPeers() arr := whisper.getPeers()
for _, p := range arr { for _, p := range arr {
@ -1027,6 +1040,13 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
return errors.New("invalid bloom filter exchange message") return errors.New("invalid bloom filter exchange message")
} }
p.setBloomFilter(bloom) p.setBloomFilter(bloom)
case rateLimitingCode:
var rateLimits RateLimits
if err := packet.Decode(&rateLimits); err != nil {
log.Warn("invalid rate limits information", "peer", p.peer.ID(), "err", err)
return errors.New("invalid rate limits exchange message")
}
p.setRateLimits(rateLimits)
case p2pMessageCode: case p2pMessageCode:
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc. // peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
// this message is not supposed to be forwarded to other peers, and // this message is not supposed to be forwarded to other peers, and