Improve doc strings and complete verify handshake on the wire

This commit is contained in:
Dmitry 2018-10-04 10:08:42 +03:00
parent e0b2bf1f77
commit 326803d0a2
4 changed files with 32 additions and 6 deletions

View File

@ -8,7 +8,9 @@ import (
)
const (
// BlacklistBucket is a delimiter for blacklist data.
BlacklistBucket byte = 1 + iota
// CapacityBucket is a delimiter for capacity data.
CapacityBucket
)
@ -19,10 +21,12 @@ type DBInterface interface {
Delete(key []byte, wo *opt.WriteOptions) error
}
// WithPrefix returns an instance of IsolatedDB.
func WithPrefix(db DBInterface, prefix []byte) IsolatedDB {
return IsolatedDB{db: db, prefix: prefix}
}
// IsolatedDB adds a prefix for every operation on a database.
type IsolatedDB struct {
db DBInterface
prefix []byte
@ -35,14 +39,17 @@ func (db IsolatedDB) withPrefix(key []byte) []byte {
return fkey
}
// Put writes a value at the key location.
func (db IsolatedDB) Put(key, value []byte, wo *opt.WriteOptions) error {
return db.db.Put(db.withPrefix(key), value, wo)
}
// Get gets a value of key.
func (db IsolatedDB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
return db.db.Get(db.withPrefix(key), ro)
}
// Delete record at the location of key.
func (db IsolatedDB) Delete(key []byte, wo *opt.WriteOptions) error {
return db.db.Delete(db.withPrefix(key), wo)
}
@ -53,6 +60,7 @@ type BlacklistRecord struct {
Deadline time.Time
}
// Key reurns unique identifier with blacklist delimiter.
func (r BlacklistRecord) Key() []byte {
key := make([]byte, len(r.ID)+1)
key[0] = BlacklistBucket
@ -60,12 +68,14 @@ func (r BlacklistRecord) Key() []byte {
return key
}
// Write stores blacklist record to provided db.
func (r BlacklistRecord) Write(db DBInterface) error {
buf := [8]byte{}
binary.BigEndian.PutUint64(buf[:], uint64(r.Deadline.Unix()))
return db.Put(r.Key(), buf[:], nil)
}
// Read reads blacklist record from db. If error is nil internal data will be assigned.
func (r *BlacklistRecord) Read(db DBInterface) error {
val, err := db.Get(r.Key(), nil)
if err != nil {
@ -76,6 +86,7 @@ func (r *BlacklistRecord) Read(db DBInterface) error {
return nil
}
// Remove cleans blacklist record from db.
func (r BlacklistRecord) Remove(db DBInterface) error {
return db.Delete(r.Key(), nil)
}
@ -87,6 +98,7 @@ type CapacityRecord struct {
Timestamp time.Time
}
// Key returns unique ID with capacity bucket.
func (r CapacityRecord) Key() []byte {
key := make([]byte, len(r.ID)+1)
key[0] = CapacityBucket
@ -94,6 +106,7 @@ func (r CapacityRecord) Key() []byte {
return key
}
// Write stores capacity record in the database.
func (r CapacityRecord) Write(db DBInterface) error {
buf := [16]byte{}
binary.BigEndian.PutUint64(buf[:], uint64(r.Taken))
@ -101,6 +114,7 @@ func (r CapacityRecord) Write(db DBInterface) error {
return db.Put(r.Key(), buf[:], nil)
}
// Read capacity record from db.
func (r *CapacityRecord) Read(db DBInterface) error {
val, err := db.Get(r.Key(), nil)
if err != nil {

View File

@ -47,34 +47,42 @@ type P2PRateLimiter struct {
ratelimiter Interface
}
// Config returns default configuration used for a particular rate limiter.
func (r P2PRateLimiter) Config() Config {
return r.ratelimiter.Config()
}
// Create instantiates rate limiter with for a peer.
func (r P2PRateLimiter) Create(peer *p2p.Peer) error {
return r.ratelimiter.Create(r.modeFunc(peer))
}
// Remove drops peer from in-memory rate limiter. If duration is non-zero peer will be blacklisted.
func (r P2PRateLimiter) Remove(peer *p2p.Peer, duration time.Duration) error {
return r.ratelimiter.Remove(r.modeFunc(peer), duration)
}
// TakeAvailable subtracts given amount up to the available limit.
func (r P2PRateLimiter) TakeAvailable(peer *p2p.Peer, count int64) int64 {
return r.ratelimiter.TakeAvailable(r.modeFunc(peer), count)
}
// Available peeks into the current available limit.
func (r P2PRateLimiter) Available(peer *p2p.Peer) int64 {
return r.ratelimiter.Available(r.modeFunc(peer))
}
// UpdateConfig update capacity and rate for the given peer.
func (r P2PRateLimiter) UpdateConfig(peer *p2p.Peer, config Config) error {
return r.ratelimiter.UpdateConfig(r.modeFunc(peer), config)
}
// Whisper is a convenience wrapper for whisper.
type Whisper struct {
ingress, egress P2PRateLimiter
}
// ForWhisper returns a convenient wrapper to be used in whisper.
func ForWhisper(mode int, db DBInterface, ingress, egress Config) Whisper {
return Whisper{
ingress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")), ingress)),
@ -82,10 +90,12 @@ func ForWhisper(mode int, db DBInterface, ingress, egress Config) Whisper {
}
}
// I returns instance of p2p rate limiter for ingress traffic.
func (w Whisper) I() P2PRateLimiter {
return w.ingress
}
// E returns instance of p2p rate limiter egress traffic.
func (w Whisper) E() P2PRateLimiter {
return w.egress
}

View File

@ -66,6 +66,7 @@ func (r *PersistedRateLimiter) blacklist(id []byte, duration time.Duration) erro
return nil
}
// Config returns default config.
func (r *PersistedRateLimiter) Config() Config {
return r.defaultConfig
}
@ -83,6 +84,7 @@ func (r *PersistedRateLimiter) getOrCreate(id []byte, config Config) (bucket *ra
return
}
// Create creates an instance for a provided ID. If ID was blacklisted error is returned.
func (r *PersistedRateLimiter) Create(id []byte) error {
bl := BlacklistRecord{ID: id}
if err := bl.Read(r.db); err != leveldb.ErrNotFound {
@ -132,6 +134,7 @@ func (r *PersistedRateLimiter) store(id []byte, bucket *ratelimit.Bucket) error
return nil
}
// TakeAvailable subtracts requested amount from a rate limiter with ID.
func (r *PersistedRateLimiter) TakeAvailable(id []byte, count int64) int64 {
bucket := r.getOrCreate(id, r.defaultConfig)
rst := bucket.TakeAvailable(count)
@ -141,10 +144,12 @@ func (r *PersistedRateLimiter) TakeAvailable(id []byte, count int64) int64 {
return rst
}
// TakeAvailable peeks into available amount with a given ID.
func (r *PersistedRateLimiter) Available(id []byte) int64 {
return r.getOrCreate(id, r.defaultConfig).Available()
}
// UpdateConfig updates config for a provided ID.
func (r *PersistedRateLimiter) UpdateConfig(id []byte, config Config) error {
r.mu.Lock()
old, _ := r.initialized[string(id)]

View File

@ -38,11 +38,8 @@ func setupOneConnection(t *testing.T, rlconf ratelimiter.Config, egressConf rate
go func() {
errorc <- w.HandlePeer(p, rw2)
}()
msg, err := rw1.ReadMsg()
require.NoError(t, err)
require.Equal(t, uint64(0), msg.Code)
require.NoError(t, msg.Discard())
require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, &egressConf))
require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, rlconf}))
require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, egressConf))
return w, rw1, errorc
}
@ -61,7 +58,7 @@ func TestRatePeerDropsConnection(t *testing.T) {
func TestRateLimitedDelivery(t *testing.T) {
cfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10}
ecfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 1 << 10, Quantum: 1 << 10}
ecfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 2 << 10, Quantum: 1 << 10}
w, rw1, _ := setupOneConnection(t, cfg, ecfg)
small1 := Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),