Apply recommended changes

This commit is contained in:
Dmitry 2018-10-10 13:37:13 +03:00
parent e8b417579a
commit 631a8ac4aa
5 changed files with 20 additions and 27 deletions

View File

@ -32,7 +32,7 @@ type IsolatedDB struct {
prefix []byte prefix []byte
} }
func (db IsolatedDB) withPrefix(key []byte) []byte { func (db IsolatedDB) keyWithPrefix(key []byte) []byte {
fkey := make([]byte, len(db.prefix)+len(key)) fkey := make([]byte, len(db.prefix)+len(key))
copy(fkey, db.prefix) copy(fkey, db.prefix)
copy(fkey[len(db.prefix):], key) copy(fkey[len(db.prefix):], key)
@ -41,17 +41,17 @@ func (db IsolatedDB) withPrefix(key []byte) []byte {
// Put writes a value at the key location. // Put writes a value at the key location.
func (db IsolatedDB) Put(key, value []byte, wo *opt.WriteOptions) error { func (db IsolatedDB) Put(key, value []byte, wo *opt.WriteOptions) error {
return db.db.Put(db.withPrefix(key), value, wo) return db.db.Put(db.keyWithPrefix(key), value, wo)
} }
// Get gets a value of key. // Get gets a value of key.
func (db IsolatedDB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { func (db IsolatedDB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
return db.db.Get(db.withPrefix(key), ro) return db.db.Get(db.keyWithPrefix(key), ro)
} }
// Delete record at the location of key. // Delete record at the location of key.
func (db IsolatedDB) Delete(key []byte, wo *opt.WriteOptions) error { func (db IsolatedDB) Delete(key []byte, wo *opt.WriteOptions) error {
return db.db.Delete(db.withPrefix(key), wo) return db.db.Delete(db.keyWithPrefix(key), wo)
} }
// BlacklistRecord is a record with information of a deadline for a particular ID. // BlacklistRecord is a record with information of a deadline for a particular ID.

View File

@ -36,51 +36,51 @@ func selectFunc(mode int) modeFunc {
return byID return byID
} }
// NewP2PRateLimiter returns an instance of P2PRateLimiter. // NewPeerRateLimiter returns an instance of PeerRateLimiter.
func NewP2PRateLimiter(mode int, ratelimiter Interface) P2PRateLimiter { func NewPeerRateLimiter(mode int, ratelimiter Interface) PeerRateLimiter {
return P2PRateLimiter{ return PeerRateLimiter{
getID: selectFunc(mode), getID: selectFunc(mode),
ratelimiter: ratelimiter, ratelimiter: ratelimiter,
} }
} }
// P2PRateLimiter implements rate limiter that accepts p2p.Peer as identifier. // PeerRateLimiter implements rate limiter that accepts p2p.Peer as identifier.
type P2PRateLimiter struct { type PeerRateLimiter struct {
getID modeFunc getID modeFunc
ratelimiter Interface ratelimiter Interface
} }
// Create instantiates rate limiter with for a peer. // Create instantiates rate limiter with for a peer.
func (r P2PRateLimiter) Create(peer *p2p.Peer, cfg Config) error { func (r PeerRateLimiter) Create(peer *p2p.Peer, cfg Config) error {
return r.ratelimiter.Create(r.getID(peer), cfg) return r.ratelimiter.Create(r.getID(peer), cfg)
} }
// Remove drops peer from in-memory rate limiter. If duration is non-zero peer will be blacklisted. // Remove drops peer from in-memory rate limiter. If duration is non-zero peer will be blacklisted.
func (r P2PRateLimiter) Remove(peer *p2p.Peer, duration time.Duration) error { func (r PeerRateLimiter) Remove(peer *p2p.Peer, duration time.Duration) error {
return r.ratelimiter.Remove(r.getID(peer), duration) return r.ratelimiter.Remove(r.getID(peer), duration)
} }
// TakeAvailable subtracts given amount up to the available limit. // TakeAvailable subtracts given amount up to the available limit.
func (r P2PRateLimiter) TakeAvailable(peer *p2p.Peer, count int64) int64 { func (r PeerRateLimiter) TakeAvailable(peer *p2p.Peer, count int64) int64 {
return r.ratelimiter.TakeAvailable(r.getID(peer), count) return r.ratelimiter.TakeAvailable(r.getID(peer), count)
} }
// Available peeks into the current available limit. // Available peeks into the current available limit.
func (r P2PRateLimiter) Available(peer *p2p.Peer) int64 { func (r PeerRateLimiter) Available(peer *p2p.Peer) int64 {
return r.ratelimiter.Available(r.getID(peer)) return r.ratelimiter.Available(r.getID(peer))
} }
// Whisper is a convenience wrapper for whisper. // Whisper is a convenience wrapper for whisper.
type Whisper struct { type Whisper struct {
Ingress, Egress P2PRateLimiter Ingress, Egress PeerRateLimiter
Config Config Config Config
} }
// ForWhisper returns a convenient wrapper to be used in whisper. // ForWhisper returns a convenient wrapper to be used in whisper.
func ForWhisper(mode int, db DBInterface, ingress Config) Whisper { func ForWhisper(mode int, db DBInterface, ingress Config) Whisper {
return Whisper{ return Whisper{
Ingress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")))), Ingress: NewPeerRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")))),
Egress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")))), Egress: NewPeerRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")))),
Config: ingress, Config: ingress,
} }
} }

View File

@ -47,7 +47,6 @@ const (
messagesCode = 1 // normal whisper message messagesCode = 1 // normal whisper message
powRequirementCode = 2 // PoW requirement powRequirementCode = 2 // PoW requirement
bloomFilterExCode = 3 // bloom filter exchange bloomFilterExCode = 3 // bloom filter exchange
peerRateLimitCode = 8 // update of the peer rate limit
p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)

View File

@ -140,6 +140,8 @@ func (peer *Peer) handshake() error {
return fmt.Errorf("peer [%x] is useless: two light client communication restricted", peer.ID()) return fmt.Errorf("peer [%x] is useless: two light client communication restricted", peer.ID())
} }
// Decode received egress configuration from the peer. In case it's not defined,
// sending data to that peer won't be limited.
egressCfg := ratelimiter.Config{} egressCfg := ratelimiter.Config{}
if err := s.Decode(&egressCfg); err == nil && peer.host.ratelimiter != nil { if err := s.Decode(&egressCfg); err == nil && peer.host.ratelimiter != nil {
peer.host.ratelimiter.Egress.Create(peer.peer, egressCfg) peer.host.ratelimiter.Egress.Create(peer.peer, egressCfg)
@ -229,6 +231,8 @@ func (peer *Peer) broadcast() error {
bundle = append(bundle, envelope) bundle = append(bundle, envelope)
} }
} }
// bundle will be reduce according to a rate limiter
// if rate limiter is nil - this operation is noop
bundle = peer.reduceBundle(bundle) bundle = peer.reduceBundle(bundle)
if len(bundle) > 0 { if len(bundle) > 0 {
// transmit the batch of envelopes // transmit the batch of envelopes

View File

@ -785,16 +785,6 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
return whisper.runMessageLoop(whisperPeer, rw) return whisper.runMessageLoop(whisperPeer, rw)
} }
func (whisper *Whisper) advertiseEgressLimit(p *Peer, rw p2p.MsgReadWriter) error {
if whisper.ratelimiter == nil {
return nil
}
if err := p2p.Send(rw, peerRateLimitCode, whisper.ratelimiter.Config); err != nil {
return fmt.Errorf("failed to send ingress rate limit to a peer %v: %v", p.peer.ID(), err)
}
return nil
}
// runMessageLoop reads and processes inbound messages directly to merge into client-global state. // runMessageLoop reads and processes inbound messages directly to merge into client-global state.
func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
blacklist := false blacklist := false