From 52dd835692a65d6f5937c505d9c1e3ec8650ca25 Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Wed, 11 Dec 2019 14:11:29 +0100 Subject: [PATCH] Implement rate limits exchange (#1729) --- services/shhext/service_test.go | 21 +++++++++++++++++++-- whisper/doc.go | 13 +++++++++++++ whisper/go.sum | 2 ++ whisper/peer.go | 18 +++++++++++++++++- whisper/peer_test.go | 15 +++++++++++++++ whisper/rate_limiter.go | 18 ++++++++++++------ whisper/rate_limiter_test.go | 4 ++-- whisper/whisper.go | 20 ++++++++++++++++++++ 8 files changed, 100 insertions(+), 11 deletions(-) diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index c2a60f5f7..d55d94432 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -429,8 +429,25 @@ func (s *WhisperNodeMockSuite) SetupTest() { errorc <- err }() whisperWrapper := gethbridge.NewGethWhisperWrapper(w) - s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{whisper.ProtocolVersion, math.Float64bits(whisperWrapper.MinPow()), whisperWrapper.BloomFilter(), false, true})) - s.Require().NoError(p2p.SendItems(rw1, statusCode, whisper.ProtocolVersion, whisper.ProtocolVersion, math.Float64bits(whisperWrapper.MinPow()), whisperWrapper.BloomFilter(), true, true)) + s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{ + whisper.ProtocolVersion, + math.Float64bits(whisperWrapper.MinPow()), + whisperWrapper.BloomFilter(), + false, + true, + whisper.RateLimits{}, + })) + s.Require().NoError(p2p.SendItems( + rw1, + statusCode, + whisper.ProtocolVersion, + whisper.ProtocolVersion, + math.Float64bits(whisperWrapper.MinPow()), + whisperWrapper.BloomFilter(), + true, + true, + whisper.RateLimits{}, + )) nodeWrapper := &testNodeWrapper{w: whisperWrapper} s.localService = New(nodeWrapper, nil, nil, db, params.ShhextConfig{MailServerConfirmations: true, MaxMessageDeliveryAttempts: 3}) diff --git a/whisper/doc.go b/whisper/doc.go index 15879ffa9..12d39ab0b 100644 --- a/whisper/doc.go +++ b/whisper/doc.go @@ -55,6 +55,7 @@ const ( bloomFilterExCode = 3 // bloom filter exchange batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received messageResponseCode = 12 // includes confirmation for delivery and information about errors + rateLimitingCode = 20 // includes peer's rate limiting settings p2pSyncRequestCode = 123 // used to sync envelopes between two mail servers p2pSyncResponseCode = 124 // used to sync envelopes between two mail servers p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol @@ -259,3 +260,15 @@ func ErrorToEnvelopeError(hash common.Hash, err error) EnvelopeError { Description: err.Error(), } } + +// RateLimits contains information about rate limit settings. +// It is exchanged using rateLimitingCode packet or in the handshake. +type RateLimits struct { + IPLimits uint64 // messages per second from a single IP (default 0, no limits) + PeerIDLimits uint64 // messages per second from a single peer ID (default 0, no limits) + TopicLimits uint64 // messages per second from a single topic (default 0, no limits) +} + +func (r RateLimits) IsZero() bool { + return r == (RateLimits{}) +} diff --git a/whisper/go.sum b/whisper/go.sum index f880d25a8..ab995d6c7 100644 --- a/whisper/go.sum +++ b/whisper/go.sum @@ -89,6 +89,7 @@ github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521 h1:3hxavr+IHMsQBrYUPQM github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521/go.mod h1:RvLn4FgxWubrpZHtQLnOf6EwhN2hEMusxZOhcW9H3UQ= github.com/status-im/go-ethereum v1.9.5-status.6 h1:ytuTO1yBIAuTVRtRQoc2mrdyngtP+XOQ9IHIibbz7/I= github.com/status-im/go-ethereum v1.9.5-status.6/go.mod h1:08JvQWE+IOnAFSe4UD4ACLNe2fDd9XmWMCq5Yzy9mk0= +github.com/status-im/go-ethereum v1.9.5-status.7 h1:DKH1GiF52LwaZaw6YDBliFEgm/JDsbIT+hn7ph6X94Q= github.com/status-im/go-ethereum v1.9.5-status.7/go.mod h1:YyH5DKB6+z+Vaya7eIm67pnuPZ1oiUMbbsZW41ktN0g= github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -108,6 +109,7 @@ golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf h1:fnPsqIDRbCSgumaMCRpoIoF2s4qxv0xSSS0BVZUE/ss= golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20191119213627-4f8c1d86b1ba h1:9bFeDpN3gTqNanMVqNcoR/pJQuP5uroC3t1D7eXozTE= golang.org/x/crypto v0.0.0-20191119213627-4f8c1d86b1ba/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= diff --git a/whisper/peer.go b/whisper/peer.go index d2a2d0600..539af91dc 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -43,6 +43,8 @@ type Peer struct { bloomFilter []byte fullNode bool confirmationsEnabled bool + rateLimitsMu sync.Mutex + rateLimits RateLimits known mapset.Set // Messages already known by the peer to avoid wasting bandwidth @@ -89,8 +91,9 @@ func (peer *Peer) handshake() error { powConverted := math.Float64bits(pow) bloom := peer.host.BloomFilter() confirmationsEnabled := !peer.host.disableConfirmations + rateLimits := peer.host.RateLimits() - errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, confirmationsEnabled) + errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, confirmationsEnabled, rateLimits) }() // Fetch the remote status packet and verify protocol match @@ -145,6 +148,13 @@ func (peer *Peer) handshake() error { peer.confirmationsEnabled = confirmationsEnabled } + var rateLimits RateLimits + if err := s.Decode(&rateLimits); err != nil { + log.Info("rate limiting disabled", "err", err) + } else { + peer.setRateLimits(rateLimits) + } + if err := <-errc; err != nil { return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err) } @@ -270,6 +280,12 @@ func (peer *Peer) setBloomFilter(bloom []byte) { } } +func (peer *Peer) setRateLimits(r RateLimits) { + peer.rateLimitsMu.Lock() + peer.rateLimits = r + peer.rateLimitsMu.Unlock() +} + func MakeFullNodeBloom() []byte { bloom := make([]byte, BloomFilterSize) for i := 0; i < BloomFilterSize; i++ { diff --git a/whisper/peer_test.go b/whisper/peer_test.go index 9662cca8a..26cda40b2 100644 --- a/whisper/peer_test.go +++ b/whisper/peer_test.go @@ -551,6 +551,21 @@ func TestTwoLightPeerHandshakeError(t *testing.T) { } } +func TestRateLimitsInHandshake(t *testing.T) { + w1 := Whisper{} + rateLimits := RateLimits{IPLimits: 20, PeerIDLimits: 10} + p1 := newPeer(&w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), &rwStub{ + payload: []interface{}{ProtocolVersion, uint64(123), make([]byte, BloomFilterSize), true, true, rateLimits}, + }) + err := p1.handshake() + if err != nil { + t.Fatal(err) + } + if p1.rateLimits != rateLimits { + t.Errorf("rate limits from handshake is not stored properly in Peer object") + } +} + type rwStub struct { payload []interface{} } diff --git a/whisper/rate_limiter.go b/whisper/rate_limiter.go index 43166210a..eb6707350 100644 --- a/whisper/rate_limiter.go +++ b/whisper/rate_limiter.go @@ -14,14 +14,14 @@ import ( type runLoop func(p *Peer, rw p2p.MsgReadWriter) error type RateLimiterHandler interface { - ExceedPeerLimit() - ExceedIPLimit() + ExceedPeerLimit() error + ExceedIPLimit() error } type MetricsRateLimiterHandler struct{} -func (MetricsRateLimiterHandler) ExceedPeerLimit() { rateLimiterPeerExceeded.Inc(1) } -func (MetricsRateLimiterHandler) ExceedIPLimit() { rateLimiterIPExceeded.Inc(1) } +func (MetricsRateLimiterHandler) ExceedPeerLimit() error { rateLimiterPeerExceeded.Inc(1); return nil } +func (MetricsRateLimiterHandler) ExceedIPLimit() error { rateLimiterIPExceeded.Inc(1); return nil } type PeerRateLimiterConfig struct { LimitPerSecIP int64 @@ -89,7 +89,10 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo ip = p.peer.Node().IP().String() } if halted := r.throttleIP(ip); halted { - r.handler.ExceedIPLimit() + if err := r.handler.ExceedIPLimit(); err != nil { + errC <- fmt.Errorf("exceeded rate limit by IP: %v", err) + return + } } var peerID []byte @@ -97,7 +100,10 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo peerID = p.ID() } if halted := r.throttlePeer(peerID); halted { - r.handler.ExceedPeerLimit() + if err := r.handler.ExceedPeerLimit(); err != nil { + errC <- fmt.Errorf("exceeded rate limit by peer: %v", err) + return + } } if err := in.WriteMsg(packet); err != nil { diff --git a/whisper/rate_limiter_test.go b/whisper/rate_limiter_test.go index 991ff6c34..205f398a8 100644 --- a/whisper/rate_limiter_test.go +++ b/whisper/rate_limiter_test.go @@ -153,5 +153,5 @@ type mockRateLimiterHandler struct { exceedIPLimit int } -func (m *mockRateLimiterHandler) ExceedPeerLimit() { m.exceedPeerLimit += 1 } -func (m *mockRateLimiterHandler) ExceedIPLimit() { m.exceedIPLimit += 1 } +func (m *mockRateLimiterHandler) ExceedPeerLimit() error { m.exceedPeerLimit += 1; return nil } +func (m *mockRateLimiterHandler) ExceedIPLimit() error { m.exceedIPLimit += 1; return nil } diff --git a/whisper/whisper.go b/whisper/whisper.go index d333d7c90..5ed5bdef1 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -337,6 +337,8 @@ func (whisper *Whisper) SetLightClientMode(v bool) { whisper.settings.Store(lightClientModeIdx, v) } +// SetRateLimiter sets an active rate limiter. +// It must be run before Whisper is started. func (whisper *Whisper) SetRateLimiter(r *PeerRateLimiter) { whisper.rateLimiter = r } @@ -361,6 +363,17 @@ func (whisper *Whisper) LightClientModeConnectionRestricted() bool { return v && ok } +// RateLimiting returns RateLimits information. +func (whisper *Whisper) RateLimits() RateLimits { + if whisper.rateLimiter == nil { + return RateLimits{} + } + return RateLimits{ + IPLimits: uint64(whisper.rateLimiter.limitPerSecIP), + PeerIDLimits: uint64(whisper.rateLimiter.limitPerSecPeerID), + } +} + func (whisper *Whisper) notifyPeersAboutPowRequirementChange(pow float64) { arr := whisper.getPeers() for _, p := range arr { @@ -1027,6 +1040,13 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { return errors.New("invalid bloom filter exchange message") } p.setBloomFilter(bloom) + case rateLimitingCode: + var rateLimits RateLimits + if err := packet.Decode(&rateLimits); err != nil { + log.Warn("invalid rate limits information", "peer", p.peer.ID(), "err", err) + return errors.New("invalid rate limits exchange message") + } + p.setRateLimits(rateLimits) case p2pMessageCode: // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. // this message is not supposed to be forwarded to other peers, and