Implement rate limit exchange within handshake

This commit is contained in:
Dmitry 2018-10-03 14:30:54 +03:00
parent ffcf1b3cc0
commit e0b2bf1f77
3 changed files with 33 additions and 21 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/whisper/ratelimiter"
)
// Peer represents a whisper protocol peer connection.
@ -82,12 +83,17 @@ func (peer *Peer) handshake() error {
errc := make(chan error, 1)
isLightNode := peer.host.LightClientMode()
isRestrictedLightNodeConnection := peer.host.LightClientModeConnectionRestricted()
var rlCfg *ratelimiter.Config
if peer.host.ratelimiter != nil {
tmp := peer.host.ratelimiter.I().Config()
rlCfg = &tmp
}
go func() {
pow := peer.host.MinPow()
powConverted := math.Float64bits(pow)
bloom := peer.host.BloomFilter()
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode)
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, rlCfg)
}()
// Fetch the remote status packet and verify protocol match
@ -136,6 +142,13 @@ func (peer *Peer) handshake() error {
return fmt.Errorf("peer [%x] is useless: two light client communication restricted", peer.ID())
}
egressCfg := ratelimiter.Config{}
if err := s.Decode(&egressCfg); err == nil {
if peer.host.ratelimiter != nil {
peer.host.ratelimiter.E().UpdateConfig(peer.peer, egressCfg)
}
}
if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
}

View File

@ -21,7 +21,7 @@ const (
testCode = 42 // any non-defined code will work
)
func setupOneConnection(t *testing.T, rlconf ratelimiter.Config) (*Whisper, *p2p.MsgPipeRW, chan error) {
func setupOneConnection(t *testing.T, rlconf ratelimiter.Config, egressConf ratelimiter.Config) (*Whisper, *p2p.MsgPipeRW, chan error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
require.NoError(t, err)
rl := ratelimiter.ForWhisper(ratelimiter.IDMode, db, rlconf, rlconf)
@ -42,14 +42,13 @@ func setupOneConnection(t *testing.T, rlconf ratelimiter.Config) (*Whisper, *p2p
require.NoError(t, err)
require.Equal(t, uint64(0), msg.Code)
require.NoError(t, msg.Discard())
require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true))
require.NoError(t, p2p.ExpectMsg(rw1, peerRateLimitCode, nil), "peer must send ingress rate limit after handshake")
require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, &egressConf))
return w, rw1, errorc
}
func TestRatePeerDropsConnection(t *testing.T) {
cfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10}
_, rw1, errorc := setupOneConnection(t, cfg)
_, rw1, errorc := setupOneConnection(t, cfg, cfg)
require.NoError(t, p2p.Send(rw1, testCode, make([]byte, 11<<10))) // limit is 1024
select {
@ -62,7 +61,8 @@ func TestRatePeerDropsConnection(t *testing.T) {
func TestRateLimitedDelivery(t *testing.T) {
cfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10}
w, rw1, _ := setupOneConnection(t, cfg)
ecfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 1 << 10, Quantum: 1 << 10}
w, rw1, _ := setupOneConnection(t, cfg, ecfg)
small1 := Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),
TTL: 10,
@ -72,6 +72,7 @@ func TestRateLimitedDelivery(t *testing.T) {
}
small2 := small1
small2.Nonce = 2
small2.Data = make([]byte, 3<<10)
big := small1
big.Nonce = 3
big.Data = make([]byte, 11<<10)
@ -91,7 +92,7 @@ func TestRateLimitedDelivery(t *testing.T) {
msg, err := rw1.ReadMsg()
if err == p2p.ErrPipeClosed {
require.Contains(t, received, small1.Hash())
require.Contains(t, received, small2.Hash())
require.NotContains(t, received, small2.Hash())
require.NotContains(t, received, big.Hash())
break
}
@ -107,9 +108,9 @@ func TestRateLimitedDelivery(t *testing.T) {
func TestRateRandomizedDelivery(t *testing.T) {
cfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10}
w1, rw1, _ := setupOneConnection(t, cfg)
w2, rw2, _ := setupOneConnection(t, cfg)
w3, rw3, _ := setupOneConnection(t, cfg)
w1, rw1, _ := setupOneConnection(t, cfg, cfg)
w2, rw2, _ := setupOneConnection(t, cfg, cfg)
w3, rw3, _ := setupOneConnection(t, cfg, cfg)
var (
mu sync.Mutex
wg sync.WaitGroup

View File

@ -768,13 +768,6 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
delete(whisper.peers, whisperPeer)
whisper.peerMu.Unlock()
}()
// Run the peer handshake and state updates
if err := whisperPeer.handshake(); err != nil {
return err
}
whisperPeer.start()
defer whisperPeer.stop()
if whisper.ratelimiter != nil {
if err := whisper.ratelimiter.I().Create(whisperPeer.peer); err != nil {
return err
@ -783,6 +776,13 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
whisper.ratelimiter.E().Create(whisperPeer.peer)
defer whisper.ratelimiter.E().Remove(whisperPeer.peer, 0)
}
// Run the peer handshake and state updates
if err := whisperPeer.handshake(); err != nil {
return err
}
whisperPeer.start()
defer whisperPeer.stop()
return whisper.runMessageLoop(whisperPeer, rw)
}
@ -798,9 +798,6 @@ func (whisper *Whisper) advertiseEgressLimit(p *Peer, rw p2p.MsgReadWriter) erro
// runMessageLoop reads and processes inbound messages directly to merge into client-global state.
func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if err := whisper.advertiseEgressLimit(p, rw); err != nil {
return err
}
for {
// fetch the next packet
packet, err := rw.ReadMsg()
@ -955,7 +952,8 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
packet.Discard()
if packet.Code != p2pMessageCode && whisper.ratelimiter != nil {
if whisper.ratelimiter.I().TakeAvailable(p.peer, int64(packet.Size)) < int64(packet.Size) {
// TODO 300 should be a quantum size
if whisper.ratelimiter.I().TakeAvailable(p.peer, int64(packet.Size))+300 < int64(packet.Size) {
whisper.ratelimiter.I().Remove(p.peer, 10*time.Minute)
return fmt.Errorf("peer %v reached traffic limit capacity", p.peer.ID())
}