From 326803d0a2df245584b74da13390d5dcedee13ab Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 4 Oct 2018 10:08:42 +0300 Subject: [PATCH] Improve doc strings and complete verify handshake on the wire --- ratelimiter/db.go | 14 ++++++++++++++ ratelimiter/peer.go | 10 ++++++++++ ratelimiter/ratelimiter.go | 5 +++++ whisperv6/ratelimit_test.go | 9 +++------ 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/ratelimiter/db.go b/ratelimiter/db.go index 7605a40..bd59172 100644 --- a/ratelimiter/db.go +++ b/ratelimiter/db.go @@ -8,7 +8,9 @@ import ( ) const ( + // BlacklistBucket is a delimiter for blacklist data. BlacklistBucket byte = 1 + iota + // CapacityBucket is a delimiter for capacity data. CapacityBucket ) @@ -19,10 +21,12 @@ type DBInterface interface { Delete(key []byte, wo *opt.WriteOptions) error } +// WithPrefix returns an instance of IsolatedDB. func WithPrefix(db DBInterface, prefix []byte) IsolatedDB { return IsolatedDB{db: db, prefix: prefix} } +// IsolatedDB adds a prefix for every operation on a database. type IsolatedDB struct { db DBInterface prefix []byte @@ -35,14 +39,17 @@ func (db IsolatedDB) withPrefix(key []byte) []byte { return fkey } +// Put writes a value at the key location. func (db IsolatedDB) Put(key, value []byte, wo *opt.WriteOptions) error { return db.db.Put(db.withPrefix(key), value, wo) } +// Get gets a value of key. func (db IsolatedDB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { return db.db.Get(db.withPrefix(key), ro) } +// Delete record at the location of key. func (db IsolatedDB) Delete(key []byte, wo *opt.WriteOptions) error { return db.db.Delete(db.withPrefix(key), wo) } @@ -53,6 +60,7 @@ type BlacklistRecord struct { Deadline time.Time } +// Key reurns unique identifier with blacklist delimiter. func (r BlacklistRecord) Key() []byte { key := make([]byte, len(r.ID)+1) key[0] = BlacklistBucket @@ -60,12 +68,14 @@ func (r BlacklistRecord) Key() []byte { return key } +// Write stores blacklist record to provided db. func (r BlacklistRecord) Write(db DBInterface) error { buf := [8]byte{} binary.BigEndian.PutUint64(buf[:], uint64(r.Deadline.Unix())) return db.Put(r.Key(), buf[:], nil) } +// Read reads blacklist record from db. If error is nil internal data will be assigned. func (r *BlacklistRecord) Read(db DBInterface) error { val, err := db.Get(r.Key(), nil) if err != nil { @@ -76,6 +86,7 @@ func (r *BlacklistRecord) Read(db DBInterface) error { return nil } +// Remove cleans blacklist record from db. func (r BlacklistRecord) Remove(db DBInterface) error { return db.Delete(r.Key(), nil) } @@ -87,6 +98,7 @@ type CapacityRecord struct { Timestamp time.Time } +// Key returns unique ID with capacity bucket. func (r CapacityRecord) Key() []byte { key := make([]byte, len(r.ID)+1) key[0] = CapacityBucket @@ -94,6 +106,7 @@ func (r CapacityRecord) Key() []byte { return key } +// Write stores capacity record in the database. func (r CapacityRecord) Write(db DBInterface) error { buf := [16]byte{} binary.BigEndian.PutUint64(buf[:], uint64(r.Taken)) @@ -101,6 +114,7 @@ func (r CapacityRecord) Write(db DBInterface) error { return db.Put(r.Key(), buf[:], nil) } +// Read capacity record from db. func (r *CapacityRecord) Read(db DBInterface) error { val, err := db.Get(r.Key(), nil) if err != nil { diff --git a/ratelimiter/peer.go b/ratelimiter/peer.go index 12069fd..fe2a86d 100644 --- a/ratelimiter/peer.go +++ b/ratelimiter/peer.go @@ -47,34 +47,42 @@ type P2PRateLimiter struct { ratelimiter Interface } +// Config returns default configuration used for a particular rate limiter. func (r P2PRateLimiter) Config() Config { return r.ratelimiter.Config() } +// Create instantiates rate limiter with for a peer. func (r P2PRateLimiter) Create(peer *p2p.Peer) error { return r.ratelimiter.Create(r.modeFunc(peer)) } +// Remove drops peer from in-memory rate limiter. If duration is non-zero peer will be blacklisted. func (r P2PRateLimiter) Remove(peer *p2p.Peer, duration time.Duration) error { return r.ratelimiter.Remove(r.modeFunc(peer), duration) } +// TakeAvailable subtracts given amount up to the available limit. func (r P2PRateLimiter) TakeAvailable(peer *p2p.Peer, count int64) int64 { return r.ratelimiter.TakeAvailable(r.modeFunc(peer), count) } +// Available peeks into the current available limit. func (r P2PRateLimiter) Available(peer *p2p.Peer) int64 { return r.ratelimiter.Available(r.modeFunc(peer)) } +// UpdateConfig update capacity and rate for the given peer. func (r P2PRateLimiter) UpdateConfig(peer *p2p.Peer, config Config) error { return r.ratelimiter.UpdateConfig(r.modeFunc(peer), config) } +// Whisper is a convenience wrapper for whisper. type Whisper struct { ingress, egress P2PRateLimiter } +// ForWhisper returns a convenient wrapper to be used in whisper. func ForWhisper(mode int, db DBInterface, ingress, egress Config) Whisper { return Whisper{ ingress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")), ingress)), @@ -82,10 +90,12 @@ func ForWhisper(mode int, db DBInterface, ingress, egress Config) Whisper { } } +// I returns instance of p2p rate limiter for ingress traffic. func (w Whisper) I() P2PRateLimiter { return w.ingress } +// E returns instance of p2p rate limiter egress traffic. func (w Whisper) E() P2PRateLimiter { return w.egress } diff --git a/ratelimiter/ratelimiter.go b/ratelimiter/ratelimiter.go index 7ace2bc..f8298a7 100644 --- a/ratelimiter/ratelimiter.go +++ b/ratelimiter/ratelimiter.go @@ -66,6 +66,7 @@ func (r *PersistedRateLimiter) blacklist(id []byte, duration time.Duration) erro return nil } +// Config returns default config. func (r *PersistedRateLimiter) Config() Config { return r.defaultConfig } @@ -83,6 +84,7 @@ func (r *PersistedRateLimiter) getOrCreate(id []byte, config Config) (bucket *ra return } +// Create creates an instance for a provided ID. If ID was blacklisted error is returned. func (r *PersistedRateLimiter) Create(id []byte) error { bl := BlacklistRecord{ID: id} if err := bl.Read(r.db); err != leveldb.ErrNotFound { @@ -132,6 +134,7 @@ func (r *PersistedRateLimiter) store(id []byte, bucket *ratelimit.Bucket) error return nil } +// TakeAvailable subtracts requested amount from a rate limiter with ID. func (r *PersistedRateLimiter) TakeAvailable(id []byte, count int64) int64 { bucket := r.getOrCreate(id, r.defaultConfig) rst := bucket.TakeAvailable(count) @@ -141,10 +144,12 @@ func (r *PersistedRateLimiter) TakeAvailable(id []byte, count int64) int64 { return rst } +// TakeAvailable peeks into available amount with a given ID. func (r *PersistedRateLimiter) Available(id []byte) int64 { return r.getOrCreate(id, r.defaultConfig).Available() } +// UpdateConfig updates config for a provided ID. func (r *PersistedRateLimiter) UpdateConfig(id []byte, config Config) error { r.mu.Lock() old, _ := r.initialized[string(id)] diff --git a/whisperv6/ratelimit_test.go b/whisperv6/ratelimit_test.go index b60a610..3cdb753 100644 --- a/whisperv6/ratelimit_test.go +++ b/whisperv6/ratelimit_test.go @@ -38,11 +38,8 @@ func setupOneConnection(t *testing.T, rlconf ratelimiter.Config, egressConf rate go func() { errorc <- w.HandlePeer(p, rw2) }() - msg, err := rw1.ReadMsg() - require.NoError(t, err) - require.Equal(t, uint64(0), msg.Code) - require.NoError(t, msg.Discard()) - require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, &egressConf)) + require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, rlconf})) + require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, egressConf)) return w, rw1, errorc } @@ -61,7 +58,7 @@ func TestRatePeerDropsConnection(t *testing.T) { func TestRateLimitedDelivery(t *testing.T) { cfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10} - ecfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 1 << 10, Quantum: 1 << 10} + ecfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 2 << 10, Quantum: 1 << 10} w, rw1, _ := setupOneConnection(t, cfg, ecfg) small1 := Envelope{ Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),