Improve changes after review

This commit is contained in:
Dmitry 2018-10-09 09:17:02 +03:00
parent 966233ce95
commit e8b417579a
7 changed files with 63 additions and 54 deletions

View File

@ -60,7 +60,7 @@ type BlacklistRecord struct {
Deadline time.Time
}
// Key reurns unique identifier with blacklist delimiter.
// Key returns unique identifier with blacklist delimiter.
func (r BlacklistRecord) Key() []byte {
key := make([]byte, len(r.ID)+1)
key[0] = BlacklistBucket

View File

@ -15,69 +15,72 @@ const (
IPMode
)
func ipModeFunc(peer *p2p.Peer) []byte {
func byIP(peer *p2p.Peer) []byte {
addr := peer.RemoteAddr().Network()
ip := net.ParseIP(strings.Split(addr, ":")[0])
return []byte(ip)
}
func idModeFunc(peer *p2p.Peer) []byte {
func byID(peer *p2p.Peer) []byte {
return peer.ID().Bytes()
}
// modeFunc specifies function to obtain ID value from peer.
type modeFunc func(peer *p2p.Peer) []byte
// selectFunc returns idModeFunc by default.
func selectFunc(mode int) func(*p2p.Peer) []byte {
func selectFunc(mode int) modeFunc {
if mode == IPMode {
return ipModeFunc
return byIP
}
return idModeFunc
return byID
}
// NewP2PRateLimiter returns an instance of P2PRateLimiter.
func NewP2PRateLimiter(mode int, ratelimiter Interface) P2PRateLimiter {
return P2PRateLimiter{
modeFunc: selectFunc(mode),
getID: selectFunc(mode),
ratelimiter: ratelimiter,
}
}
// P2PRateLimiter implements rate limiter that accepts p2p.Peer as identifier.
type P2PRateLimiter struct {
modeFunc func(*p2p.Peer) []byte
getID modeFunc
ratelimiter Interface
}
// Create instantiates rate limiter with for a peer.
func (r P2PRateLimiter) Create(peer *p2p.Peer, cfg Config) error {
return r.ratelimiter.Create(r.modeFunc(peer), cfg)
return r.ratelimiter.Create(r.getID(peer), cfg)
}
// Remove drops peer from in-memory rate limiter. If duration is non-zero peer will be blacklisted.
func (r P2PRateLimiter) Remove(peer *p2p.Peer, duration time.Duration) error {
return r.ratelimiter.Remove(r.modeFunc(peer), duration)
return r.ratelimiter.Remove(r.getID(peer), duration)
}
// TakeAvailable subtracts given amount up to the available limit.
func (r P2PRateLimiter) TakeAvailable(peer *p2p.Peer, count int64) int64 {
return r.ratelimiter.TakeAvailable(r.modeFunc(peer), count)
return r.ratelimiter.TakeAvailable(r.getID(peer), count)
}
// Available peeks into the current available limit.
func (r P2PRateLimiter) Available(peer *p2p.Peer) int64 {
return r.ratelimiter.Available(r.modeFunc(peer))
return r.ratelimiter.Available(r.getID(peer))
}
// Whisper is a convenience wrapper for whisper.
type Whisper struct {
I, E P2PRateLimiter
Ingress, Egress P2PRateLimiter
Config Config
}
// ForWhisper returns a convenient wrapper to be used in whisper.
func ForWhisper(mode int, db DBInterface, ingress Config) Whisper {
return Whisper{
I: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")))),
E: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")))),
Ingress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")))),
Egress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")))),
Config: ingress,
}
}

View File

@ -24,12 +24,6 @@ type Config struct {
Interval, Capacity, Quantum uint64
}
// compare config with existing ratelimited bucket.
func compare(c Config, bucket *ratelimit.Bucket) bool {
return int64(c.Capacity) == bucket.Capacity() &&
1e9*float64(c.Quantum)/float64(c.Interval) == bucket.Rate()
}
func newBucket(c Config) *ratelimit.Bucket {
return ratelimit.NewBucketWithQuantum(time.Duration(c.Interval), int64(c.Capacity), int64(c.Quantum))
}
@ -39,7 +33,7 @@ func NewPersisted(db DBInterface) *PersistedRateLimiter {
return &PersistedRateLimiter{
db: db,
initialized: map[string]*ratelimit.Bucket{},
timeFunc: time.Now,
now: time.Now,
}
}
@ -50,16 +44,16 @@ type PersistedRateLimiter struct {
mu sync.Mutex
initialized map[string]*ratelimit.Bucket
timeFunc func() time.Time
now func() time.Time
}
func (r *PersistedRateLimiter) blacklist(id []byte, duration time.Duration) error {
if duration == 0 {
return nil
}
record := BlacklistRecord{ID: id, Deadline: r.timeFunc().Add(duration)}
record := BlacklistRecord{ID: id, Deadline: r.now().Add(duration)}
if err := record.Write(r.db); err != nil {
return fmt.Errorf("error blacklisting %x: %v", id, err)
return fmt.Errorf("error blacklisting %#x: %v", id, err)
}
return nil
}
@ -71,24 +65,35 @@ func (r *PersistedRateLimiter) get(id []byte) (bucket *ratelimit.Bucket, exist b
return
}
// Create creates an instance for a provided ID. If ID was blacklisted error is returned.
func (r *PersistedRateLimiter) Create(id []byte, cfg Config) error {
func (r *PersistedRateLimiter) isBlacklisted(id []byte) bool {
bl := BlacklistRecord{ID: id}
if err := bl.Read(r.db); err != leveldb.ErrNotFound {
if bl.Deadline.After(r.timeFunc()) {
return fmt.Errorf("identity %x is blacklisted", id)
if bl.Deadline.After(r.now()) {
return true
}
bl.Remove(r.db)
}
return false
}
func (r *PersistedRateLimiter) adjustBucket(bucket *ratelimit.Bucket, id []byte) {
capacity := CapacityRecord{ID: id}
if err := capacity.Read(r.db); err != nil {
return
}
bucket.TakeAvailable(capacity.Taken)
}
// Create creates an instance for a provided ID. If ID was blacklisted error is returned.
func (r *PersistedRateLimiter) Create(id []byte, cfg Config) error {
if r.isBlacklisted(id) {
return fmt.Errorf("identity %#x is blacklisted", id)
}
bucket := newBucket(cfg)
r.mu.Lock()
r.initialized[string(id)] = bucket
r.mu.Unlock()
capacity := CapacityRecord{ID: id}
if err := capacity.Read(r.db); err != nil {
return nil
}
bucket.TakeAvailable(capacity.Taken)
r.adjustBucket(bucket, id)
// TODO refill rate limiter due to time difference. e.g. if record was stored at T and C seconds passed since T.
// we need to add RATE_PER_SECOND*C to a bucket
return nil
@ -115,7 +120,7 @@ func (r *PersistedRateLimiter) store(id []byte, bucket *ratelimit.Bucket) error
capacity := CapacityRecord{
ID: id,
Taken: bucket.Capacity() - bucket.Available(),
Timestamp: r.timeFunc(),
Timestamp: r.now(),
}
if err := capacity.Write(r.db); err != nil {
return fmt.Errorf("failed to write current capacicity %d for id %x: %v",

View File

@ -37,8 +37,8 @@ func TestBlacklistedEntityReturnsError(t *testing.T) {
)
require.NoError(t, rl.Create(tid, cfg))
require.NoError(t, rl.Remove(tid, 10*time.Minute))
require.EqualError(t, fmt.Errorf("identity %x is blacklisted", tid), rl.Create(tid, cfg).Error())
rl.timeFunc = func() time.Time {
require.EqualError(t, fmt.Errorf("identity %#x is blacklisted", tid), rl.Create(tid, cfg).Error())
rl.now = func() time.Time {
return time.Now().Add(11 * time.Minute)
}
require.NoError(t, rl.Create(tid, cfg))

View File

@ -142,7 +142,7 @@ func (peer *Peer) handshake() error {
egressCfg := ratelimiter.Config{}
if err := s.Decode(&egressCfg); err == nil && peer.host.ratelimiter != nil {
peer.host.ratelimiter.E.Create(peer.peer, egressCfg)
peer.host.ratelimiter.Egress.Create(peer.peer, egressCfg)
}
if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
@ -207,10 +207,10 @@ func (peer *Peer) reduceBundle(bundle []*Envelope) []*Envelope {
}
for i := range bundle {
size := int64(bundle[i].size())
if peer.host.ratelimiter.E.Available(peer.peer) < size {
if peer.host.ratelimiter.Egress.Available(peer.peer) < size {
return bundle[:i]
}
peer.host.ratelimiter.E.TakeAvailable(peer.peer, size)
peer.host.ratelimiter.Egress.TakeAvailable(peer.peer, size)
}
return bundle
}

View File

@ -93,15 +93,12 @@ func TestRateLimitedDelivery(t *testing.T) {
Data: make([]byte, 1<<10),
Nonce: 1,
}
rand.Read(small1.Data)
small2 := small1
small2.Nonce = 2
small2.Data = make([]byte, 3<<10)
rand.Read(small2.Data)
big := small1
big.Nonce = 3
big.Data = make([]byte, 11<<10)
rand.Read(big.Data)
w, rw1, _ := setupOneConnection(t, cfg, tc.cfg)

View File

@ -769,11 +769,11 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
whisper.peerMu.Unlock()
}()
if whisper.ratelimiter != nil {
if err := whisper.ratelimiter.I.Create(whisperPeer.peer, whisper.ratelimiter.Config); err != nil {
if err := whisper.ratelimiter.Ingress.Create(whisperPeer.peer, whisper.ratelimiter.Config); err != nil {
return err
}
defer whisper.ratelimiter.I.Remove(whisperPeer.peer, 0)
defer whisper.ratelimiter.E.Remove(whisperPeer.peer, 0)
defer whisper.ratelimiter.Ingress.Remove(whisperPeer.peer, 0)
defer whisper.ratelimiter.Egress.Remove(whisperPeer.peer, 0)
}
// Run the peer handshake and state updates
@ -797,6 +797,7 @@ func (whisper *Whisper) advertiseEgressLimit(p *Peer, rw p2p.MsgReadWriter) erro
// runMessageLoop reads and processes inbound messages directly to merge into client-global state.
func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
blacklist := false
for {
// fetch the next packet
packet, err := rw.ReadMsg()
@ -809,6 +810,12 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
return errors.New("oversized message received")
}
if packet.Code != p2pMessageCode && whisper.ratelimiter != nil {
if whisper.ratelimiter.Ingress.TakeAvailable(p.peer, int64(packet.Size))+300 < int64(packet.Size) {
blacklist = true
}
}
switch packet.Code {
case statusCode:
// this should not happen, but no need to panic; just ignore this message.
@ -939,14 +946,11 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
packet.Discard()
if packet.Code != p2pMessageCode && whisper.ratelimiter != nil {
// TODO 300 should be a quantum size
if whisper.ratelimiter.I.TakeAvailable(p.peer, int64(packet.Size))+300 < int64(packet.Size) {
whisper.ratelimiter.I.Remove(p.peer, 10*time.Minute)
if blacklist {
whisper.ratelimiter.Ingress.Remove(p.peer, 10*time.Minute)
return fmt.Errorf("peer %v reached traffic limit capacity", p.peer.ID())
}
}
}
}
// add inserts a new envelope into the message pool to be distributed within the