From e8b417579af5b24fbf0956305b07dd7857f6dbac Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 9 Oct 2018 09:17:02 +0300 Subject: [PATCH] Improve changes after review --- ratelimiter/db.go | 2 +- ratelimiter/peer.go | 35 +++++++++++++------------ ratelimiter/ratelimiter.go | 45 ++++++++++++++++++--------------- ratelimiter/ratelimiter_test.go | 4 +-- whisperv6/peer.go | 6 ++--- whisperv6/ratelimit_test.go | 3 --- whisperv6/whisper.go | 22 +++++++++------- 7 files changed, 63 insertions(+), 54 deletions(-) diff --git a/ratelimiter/db.go b/ratelimiter/db.go index bd59172..0b92284 100644 --- a/ratelimiter/db.go +++ b/ratelimiter/db.go @@ -60,7 +60,7 @@ type BlacklistRecord struct { Deadline time.Time } -// Key reurns unique identifier with blacklist delimiter. +// Key returns unique identifier with blacklist delimiter. func (r BlacklistRecord) Key() []byte { key := make([]byte, len(r.ID)+1) key[0] = BlacklistBucket diff --git a/ratelimiter/peer.go b/ratelimiter/peer.go index 7bacb48..dfba2c2 100644 --- a/ratelimiter/peer.go +++ b/ratelimiter/peer.go @@ -15,69 +15,72 @@ const ( IPMode ) -func ipModeFunc(peer *p2p.Peer) []byte { +func byIP(peer *p2p.Peer) []byte { addr := peer.RemoteAddr().Network() ip := net.ParseIP(strings.Split(addr, ":")[0]) return []byte(ip) } -func idModeFunc(peer *p2p.Peer) []byte { +func byID(peer *p2p.Peer) []byte { return peer.ID().Bytes() } +// modeFunc specifies function to obtain ID value from peer. +type modeFunc func(peer *p2p.Peer) []byte + // selectFunc returns idModeFunc by default. -func selectFunc(mode int) func(*p2p.Peer) []byte { +func selectFunc(mode int) modeFunc { if mode == IPMode { - return ipModeFunc + return byIP } - return idModeFunc + return byID } // NewP2PRateLimiter returns an instance of P2PRateLimiter. func NewP2PRateLimiter(mode int, ratelimiter Interface) P2PRateLimiter { return P2PRateLimiter{ - modeFunc: selectFunc(mode), + getID: selectFunc(mode), ratelimiter: ratelimiter, } } // P2PRateLimiter implements rate limiter that accepts p2p.Peer as identifier. type P2PRateLimiter struct { - modeFunc func(*p2p.Peer) []byte + getID modeFunc ratelimiter Interface } // Create instantiates rate limiter with for a peer. func (r P2PRateLimiter) Create(peer *p2p.Peer, cfg Config) error { - return r.ratelimiter.Create(r.modeFunc(peer), cfg) + return r.ratelimiter.Create(r.getID(peer), cfg) } // Remove drops peer from in-memory rate limiter. If duration is non-zero peer will be blacklisted. func (r P2PRateLimiter) Remove(peer *p2p.Peer, duration time.Duration) error { - return r.ratelimiter.Remove(r.modeFunc(peer), duration) + return r.ratelimiter.Remove(r.getID(peer), duration) } // TakeAvailable subtracts given amount up to the available limit. func (r P2PRateLimiter) TakeAvailable(peer *p2p.Peer, count int64) int64 { - return r.ratelimiter.TakeAvailable(r.modeFunc(peer), count) + return r.ratelimiter.TakeAvailable(r.getID(peer), count) } // Available peeks into the current available limit. func (r P2PRateLimiter) Available(peer *p2p.Peer) int64 { - return r.ratelimiter.Available(r.modeFunc(peer)) + return r.ratelimiter.Available(r.getID(peer)) } // Whisper is a convenience wrapper for whisper. type Whisper struct { - I, E P2PRateLimiter - Config Config + Ingress, Egress P2PRateLimiter + Config Config } // ForWhisper returns a convenient wrapper to be used in whisper. func ForWhisper(mode int, db DBInterface, ingress Config) Whisper { return Whisper{ - I: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")))), - E: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")))), - Config: ingress, + Ingress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")))), + Egress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")))), + Config: ingress, } } diff --git a/ratelimiter/ratelimiter.go b/ratelimiter/ratelimiter.go index 52fc0d5..6a1bd83 100644 --- a/ratelimiter/ratelimiter.go +++ b/ratelimiter/ratelimiter.go @@ -24,12 +24,6 @@ type Config struct { Interval, Capacity, Quantum uint64 } -// compare config with existing ratelimited bucket. -func compare(c Config, bucket *ratelimit.Bucket) bool { - return int64(c.Capacity) == bucket.Capacity() && - 1e9*float64(c.Quantum)/float64(c.Interval) == bucket.Rate() -} - func newBucket(c Config) *ratelimit.Bucket { return ratelimit.NewBucketWithQuantum(time.Duration(c.Interval), int64(c.Capacity), int64(c.Quantum)) } @@ -39,7 +33,7 @@ func NewPersisted(db DBInterface) *PersistedRateLimiter { return &PersistedRateLimiter{ db: db, initialized: map[string]*ratelimit.Bucket{}, - timeFunc: time.Now, + now: time.Now, } } @@ -50,16 +44,16 @@ type PersistedRateLimiter struct { mu sync.Mutex initialized map[string]*ratelimit.Bucket - timeFunc func() time.Time + now func() time.Time } func (r *PersistedRateLimiter) blacklist(id []byte, duration time.Duration) error { if duration == 0 { return nil } - record := BlacklistRecord{ID: id, Deadline: r.timeFunc().Add(duration)} + record := BlacklistRecord{ID: id, Deadline: r.now().Add(duration)} if err := record.Write(r.db); err != nil { - return fmt.Errorf("error blacklisting %x: %v", id, err) + return fmt.Errorf("error blacklisting %#x: %v", id, err) } return nil } @@ -71,24 +65,35 @@ func (r *PersistedRateLimiter) get(id []byte) (bucket *ratelimit.Bucket, exist b return } -// Create creates an instance for a provided ID. If ID was blacklisted error is returned. -func (r *PersistedRateLimiter) Create(id []byte, cfg Config) error { +func (r *PersistedRateLimiter) isBlacklisted(id []byte) bool { bl := BlacklistRecord{ID: id} if err := bl.Read(r.db); err != leveldb.ErrNotFound { - if bl.Deadline.After(r.timeFunc()) { - return fmt.Errorf("identity %x is blacklisted", id) + if bl.Deadline.After(r.now()) { + return true } bl.Remove(r.db) } + return false +} + +func (r *PersistedRateLimiter) adjustBucket(bucket *ratelimit.Bucket, id []byte) { + capacity := CapacityRecord{ID: id} + if err := capacity.Read(r.db); err != nil { + return + } + bucket.TakeAvailable(capacity.Taken) +} + +// Create creates an instance for a provided ID. If ID was blacklisted error is returned. +func (r *PersistedRateLimiter) Create(id []byte, cfg Config) error { + if r.isBlacklisted(id) { + return fmt.Errorf("identity %#x is blacklisted", id) + } bucket := newBucket(cfg) r.mu.Lock() r.initialized[string(id)] = bucket r.mu.Unlock() - capacity := CapacityRecord{ID: id} - if err := capacity.Read(r.db); err != nil { - return nil - } - bucket.TakeAvailable(capacity.Taken) + r.adjustBucket(bucket, id) // TODO refill rate limiter due to time difference. e.g. if record was stored at T and C seconds passed since T. // we need to add RATE_PER_SECOND*C to a bucket return nil @@ -115,7 +120,7 @@ func (r *PersistedRateLimiter) store(id []byte, bucket *ratelimit.Bucket) error capacity := CapacityRecord{ ID: id, Taken: bucket.Capacity() - bucket.Available(), - Timestamp: r.timeFunc(), + Timestamp: r.now(), } if err := capacity.Write(r.db); err != nil { return fmt.Errorf("failed to write current capacicity %d for id %x: %v", diff --git a/ratelimiter/ratelimiter_test.go b/ratelimiter/ratelimiter_test.go index be4a2cf..2748712 100644 --- a/ratelimiter/ratelimiter_test.go +++ b/ratelimiter/ratelimiter_test.go @@ -37,8 +37,8 @@ func TestBlacklistedEntityReturnsError(t *testing.T) { ) require.NoError(t, rl.Create(tid, cfg)) require.NoError(t, rl.Remove(tid, 10*time.Minute)) - require.EqualError(t, fmt.Errorf("identity %x is blacklisted", tid), rl.Create(tid, cfg).Error()) - rl.timeFunc = func() time.Time { + require.EqualError(t, fmt.Errorf("identity %#x is blacklisted", tid), rl.Create(tid, cfg).Error()) + rl.now = func() time.Time { return time.Now().Add(11 * time.Minute) } require.NoError(t, rl.Create(tid, cfg)) diff --git a/whisperv6/peer.go b/whisperv6/peer.go index 6050851..2d5221e 100644 --- a/whisperv6/peer.go +++ b/whisperv6/peer.go @@ -142,7 +142,7 @@ func (peer *Peer) handshake() error { egressCfg := ratelimiter.Config{} if err := s.Decode(&egressCfg); err == nil && peer.host.ratelimiter != nil { - peer.host.ratelimiter.E.Create(peer.peer, egressCfg) + peer.host.ratelimiter.Egress.Create(peer.peer, egressCfg) } if err := <-errc; err != nil { return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err) @@ -207,10 +207,10 @@ func (peer *Peer) reduceBundle(bundle []*Envelope) []*Envelope { } for i := range bundle { size := int64(bundle[i].size()) - if peer.host.ratelimiter.E.Available(peer.peer) < size { + if peer.host.ratelimiter.Egress.Available(peer.peer) < size { return bundle[:i] } - peer.host.ratelimiter.E.TakeAvailable(peer.peer, size) + peer.host.ratelimiter.Egress.TakeAvailable(peer.peer, size) } return bundle } diff --git a/whisperv6/ratelimit_test.go b/whisperv6/ratelimit_test.go index 41c2dce..f325b7c 100644 --- a/whisperv6/ratelimit_test.go +++ b/whisperv6/ratelimit_test.go @@ -93,15 +93,12 @@ func TestRateLimitedDelivery(t *testing.T) { Data: make([]byte, 1<<10), Nonce: 1, } - rand.Read(small1.Data) small2 := small1 small2.Nonce = 2 small2.Data = make([]byte, 3<<10) - rand.Read(small2.Data) big := small1 big.Nonce = 3 big.Data = make([]byte, 11<<10) - rand.Read(big.Data) w, rw1, _ := setupOneConnection(t, cfg, tc.cfg) diff --git a/whisperv6/whisper.go b/whisperv6/whisper.go index ca1ede4..7c06141 100644 --- a/whisperv6/whisper.go +++ b/whisperv6/whisper.go @@ -769,11 +769,11 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { whisper.peerMu.Unlock() }() if whisper.ratelimiter != nil { - if err := whisper.ratelimiter.I.Create(whisperPeer.peer, whisper.ratelimiter.Config); err != nil { + if err := whisper.ratelimiter.Ingress.Create(whisperPeer.peer, whisper.ratelimiter.Config); err != nil { return err } - defer whisper.ratelimiter.I.Remove(whisperPeer.peer, 0) - defer whisper.ratelimiter.E.Remove(whisperPeer.peer, 0) + defer whisper.ratelimiter.Ingress.Remove(whisperPeer.peer, 0) + defer whisper.ratelimiter.Egress.Remove(whisperPeer.peer, 0) } // Run the peer handshake and state updates @@ -797,6 +797,7 @@ func (whisper *Whisper) advertiseEgressLimit(p *Peer, rw p2p.MsgReadWriter) erro // runMessageLoop reads and processes inbound messages directly to merge into client-global state. func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { + blacklist := false for { // fetch the next packet packet, err := rw.ReadMsg() @@ -809,6 +810,12 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { return errors.New("oversized message received") } + if packet.Code != p2pMessageCode && whisper.ratelimiter != nil { + if whisper.ratelimiter.Ingress.TakeAvailable(p.peer, int64(packet.Size))+300 < int64(packet.Size) { + blacklist = true + } + } + switch packet.Code { case statusCode: // this should not happen, but no need to panic; just ignore this message. @@ -939,12 +946,9 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { packet.Discard() - if packet.Code != p2pMessageCode && whisper.ratelimiter != nil { - // TODO 300 should be a quantum size - if whisper.ratelimiter.I.TakeAvailable(p.peer, int64(packet.Size))+300 < int64(packet.Size) { - whisper.ratelimiter.I.Remove(p.peer, 10*time.Minute) - return fmt.Errorf("peer %v reached traffic limit capacity", p.peer.ID()) - } + if blacklist { + whisper.ratelimiter.Ingress.Remove(p.peer, 10*time.Minute) + return fmt.Errorf("peer %v reached traffic limit capacity", p.peer.ID()) } } }