Simplify rate limiter interface

This commit is contained in:
Dmitry 2018-10-04 10:33:16 +03:00
parent 326803d0a2
commit 4e1281431d
8 changed files with 60 additions and 157 deletions

View File

@ -1,14 +1,13 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: ratelimiter/interface.go
// Source: ratelimiter/ratelimiter.go
// Package ratelimiter is a generated GoMock package.
package ratelimiter
import (
gomock "github.com/golang/mock/gomock"
reflect "reflect"
time "time"
gomock "github.com/golang/mock/gomock"
)
// MockInterface is a mock of Interface interface
@ -35,15 +34,15 @@ func (m *MockInterface) EXPECT() *MockInterfaceMockRecorder {
}
// Create mocks base method
func (m *MockInterface) Create(arg0 []byte) error {
ret := m.ctrl.Call(m, "Create", arg0)
func (m *MockInterface) Create(arg0 []byte, arg1 Config) error {
ret := m.ctrl.Call(m, "Create", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// Create indicates an expected call of Create
func (mr *MockInterfaceMockRecorder) Create(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockInterface)(nil).Create), arg0)
func (mr *MockInterfaceMockRecorder) Create(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockInterface)(nil).Create), arg0, arg1)
}
// Remove mocks base method
@ -81,27 +80,3 @@ func (m *MockInterface) Available(arg0 []byte) int64 {
func (mr *MockInterfaceMockRecorder) Available(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Available", reflect.TypeOf((*MockInterface)(nil).Available), arg0)
}
// UpdateConfig mocks base method
func (m *MockInterface) UpdateConfig(arg0 []byte, arg1 Config) error {
ret := m.ctrl.Call(m, "UpdateConfig", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateConfig indicates an expected call of UpdateConfig
func (mr *MockInterfaceMockRecorder) UpdateConfig(arg0, arg1 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfig", reflect.TypeOf((*MockInterface)(nil).UpdateConfig), arg0, arg1)
}
// Config mocks base method
func (m *MockInterface) Config() Config {
ret := m.ctrl.Call(m, "Config")
ret0, _ := ret[0].(Config)
return ret0
}
// Config indicates an expected call of Config
func (mr *MockInterfaceMockRecorder) Config() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Config", reflect.TypeOf((*MockInterface)(nil).Config))
}

View File

@ -47,14 +47,9 @@ type P2PRateLimiter struct {
ratelimiter Interface
}
// Config returns default configuration used for a particular rate limiter.
func (r P2PRateLimiter) Config() Config {
return r.ratelimiter.Config()
}
// Create instantiates rate limiter with for a peer.
func (r P2PRateLimiter) Create(peer *p2p.Peer) error {
return r.ratelimiter.Create(r.modeFunc(peer))
func (r P2PRateLimiter) Create(peer *p2p.Peer, cfg Config) error {
return r.ratelimiter.Create(r.modeFunc(peer), cfg)
}
// Remove drops peer from in-memory rate limiter. If duration is non-zero peer will be blacklisted.
@ -72,30 +67,17 @@ func (r P2PRateLimiter) Available(peer *p2p.Peer) int64 {
return r.ratelimiter.Available(r.modeFunc(peer))
}
// UpdateConfig update capacity and rate for the given peer.
func (r P2PRateLimiter) UpdateConfig(peer *p2p.Peer, config Config) error {
return r.ratelimiter.UpdateConfig(r.modeFunc(peer), config)
}
// Whisper is a convenience wrapper for whisper.
type Whisper struct {
ingress, egress P2PRateLimiter
I, E P2PRateLimiter
Config Config
}
// ForWhisper returns a convenient wrapper to be used in whisper.
func ForWhisper(mode int, db DBInterface, ingress, egress Config) Whisper {
func ForWhisper(mode int, db DBInterface, ingress Config) Whisper {
return Whisper{
ingress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")), ingress)),
egress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")), egress)),
I: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")))),
E: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")))),
Config: ingress,
}
}
// I returns instance of p2p rate limiter for ingress traffic.
func (w Whisper) I() P2PRateLimiter {
return w.ingress
}
// E returns instance of p2p rate limiter egress traffic.
func (w Whisper) E() P2PRateLimiter {
return w.egress
}

View File

@ -15,15 +15,13 @@ func TestIDMode(t *testing.T) {
peer := p2p.NewPeer(discover.NodeID{1}, "test", nil)
ctrl := gomock.NewController(t)
rl := NewMockInterface(ctrl)
rl.EXPECT().Create(peer.ID().Bytes())
rl.EXPECT().Create(peer.ID().Bytes(), cfg)
rl.EXPECT().TakeAvailable(peer.ID().Bytes(), int64(0))
rl.EXPECT().Available(peer.ID().Bytes())
rl.EXPECT().Remove(peer.ID().Bytes(), time.Duration(0))
rl.EXPECT().UpdateConfig(peer.ID().Bytes(), cfg)
peerrl := NewP2PRateLimiter(IDMode, rl)
require.NoError(t, peerrl.Create(peer))
require.NoError(t, peerrl.Create(peer, cfg))
peerrl.TakeAvailable(peer, 0)
peerrl.Available(peer)
require.NoError(t, peerrl.Remove(peer, 0))
require.NoError(t, peerrl.UpdateConfig(peer, cfg))
}

View File

@ -2,6 +2,7 @@ package ratelimiter
import (
"fmt"
"math"
"sync"
"time"
@ -12,12 +13,10 @@ import (
// Interface describes common ratelimiter methods.
type Interface interface {
Create([]byte) error
Create([]byte, Config) error
Remove([]byte, time.Duration) error
TakeAvailable([]byte, int64) int64
Available([]byte) int64
UpdateConfig([]byte, Config) error
Config() Config
}
// Config is a set of options used by rate limiter.
@ -35,19 +34,18 @@ func newBucket(c Config) *ratelimit.Bucket {
return ratelimit.NewBucketWithQuantum(time.Duration(c.Interval), int64(c.Capacity), int64(c.Quantum))
}
func NewPersisted(db DBInterface, config Config) *PersistedRateLimiter {
// NewPersisted returns instance of rate limiter with persisted black listed records and capacity before peer was removed.
func NewPersisted(db DBInterface) *PersistedRateLimiter {
return &PersistedRateLimiter{
db: db,
defaultConfig: config,
initialized: map[string]*ratelimit.Bucket{},
timeFunc: time.Now,
db: db,
initialized: map[string]*ratelimit.Bucket{},
timeFunc: time.Now,
}
}
// PersistedRateLimiter persists latest capacity and updated config per unique ID.
type PersistedRateLimiter struct {
db DBInterface
defaultConfig Config
db DBInterface
mu sync.Mutex
initialized map[string]*ratelimit.Bucket
@ -66,26 +64,15 @@ func (r *PersistedRateLimiter) blacklist(id []byte, duration time.Duration) erro
return nil
}
// Config returns default config.
func (r *PersistedRateLimiter) Config() Config {
return r.defaultConfig
}
func (r *PersistedRateLimiter) getOrCreate(id []byte, config Config) (bucket *ratelimit.Bucket) {
func (r *PersistedRateLimiter) get(id []byte) (bucket *ratelimit.Bucket, exist bool) {
r.mu.Lock()
defer r.mu.Unlock()
old, exist := r.initialized[string(id)]
if !exist {
bucket = newBucket(config)
r.initialized[string(id)] = bucket
} else {
bucket = old
}
bucket, exist = r.initialized[string(id)]
return
}
// Create creates an instance for a provided ID. If ID was blacklisted error is returned.
func (r *PersistedRateLimiter) Create(id []byte) error {
func (r *PersistedRateLimiter) Create(id []byte, cfg Config) error {
bl := BlacklistRecord{ID: id}
if err := bl.Read(r.db); err != leveldb.ErrNotFound {
if bl.Deadline.After(r.timeFunc()) {
@ -93,7 +80,10 @@ func (r *PersistedRateLimiter) Create(id []byte) error {
}
bl.Remove(r.db)
}
bucket := r.getOrCreate(id, r.defaultConfig)
bucket := newBucket(cfg)
r.mu.Lock()
r.initialized[string(id)] = bucket
r.mu.Unlock()
capacity := CapacityRecord{ID: id}
if err := capacity.Read(r.db); err != nil {
return nil
@ -136,7 +126,10 @@ func (r *PersistedRateLimiter) store(id []byte, bucket *ratelimit.Bucket) error
// TakeAvailable subtracts requested amount from a rate limiter with ID.
func (r *PersistedRateLimiter) TakeAvailable(id []byte, count int64) int64 {
bucket := r.getOrCreate(id, r.defaultConfig)
bucket, exist := r.get(id)
if !exist {
return math.MaxInt64
}
rst := bucket.TakeAvailable(count)
if err := r.store(id, bucket); err != nil {
log.Error(err.Error())
@ -144,25 +137,11 @@ func (r *PersistedRateLimiter) TakeAvailable(id []byte, count int64) int64 {
return rst
}
// TakeAvailable peeks into available amount with a given ID.
// Available peeks into available amount with a given ID.
func (r *PersistedRateLimiter) Available(id []byte) int64 {
return r.getOrCreate(id, r.defaultConfig).Available()
}
// UpdateConfig updates config for a provided ID.
func (r *PersistedRateLimiter) UpdateConfig(id []byte, config Config) error {
r.mu.Lock()
old, _ := r.initialized[string(id)]
if compare(config, old) {
r.mu.Unlock()
return nil
bucket, exist := r.get(id)
if !exist {
return math.MaxInt64
}
delete(r.initialized, string(id))
r.mu.Unlock()
taken := int64(0)
if old != nil {
taken = old.Capacity() - old.Available()
}
r.getOrCreate(id, config).TakeAvailable(taken)
return nil
return bucket.Available()
}

View File

@ -15,47 +15,31 @@ func TestLimitIsPersisted(t *testing.T) {
require.NoError(t, err)
var (
total int64 = 10000
rl = NewPersisted(db, Config{1 << 62, uint64(10000), 1})
cfg = Config{1 << 62, uint64(10000), 1}
rl = NewPersisted(db)
tid = []byte("test")
)
require.NoError(t, rl.Create(tid))
require.NoError(t, rl.Create(tid, cfg))
taken := rl.TakeAvailable(tid, total/2)
require.Equal(t, total/2, taken)
require.NoError(t, rl.Remove(tid, 0))
require.NoError(t, rl.Create(tid))
require.NoError(t, rl.Create(tid, cfg))
require.Equal(t, total/2, rl.Available(tid))
}
func TestConfigFixedOnUpdate(t *testing.T) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
require.NoError(t, err)
var (
total int64 = 10000
cfg = Config{1 << 62, uint64(10000), 1}
rl = NewPersisted(db, cfg)
tid = []byte("test")
)
require.NoError(t, rl.Create(tid))
taken := rl.TakeAvailable(tid, total/2)
require.Equal(t, total/2, taken)
cfg.Capacity = 6000
require.NoError(t, rl.UpdateConfig(tid, cfg))
require.Equal(t, int64(cfg.Capacity)-total/2, rl.Available(tid))
}
func TestBlacklistedEntityReturnsError(t *testing.T) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
require.NoError(t, err)
var (
cfg = Config{1 << 62, uint64(10000), 1}
rl = NewPersisted(db, cfg)
rl = NewPersisted(db)
tid = []byte("test")
)
require.NoError(t, rl.Create(tid))
require.NoError(t, rl.Create(tid, cfg))
require.NoError(t, rl.Remove(tid, 10*time.Minute))
require.EqualError(t, fmt.Errorf("identity %x is blacklisted", tid), rl.Create(tid).Error())
require.EqualError(t, fmt.Errorf("identity %x is blacklisted", tid), rl.Create(tid, cfg).Error())
rl.timeFunc = func() time.Time {
return time.Now().Add(11 * time.Minute)
}
require.NoError(t, rl.Create(tid))
require.NoError(t, rl.Create(tid, cfg))
}

View File

@ -85,8 +85,7 @@ func (peer *Peer) handshake() error {
isRestrictedLightNodeConnection := peer.host.LightClientModeConnectionRestricted()
var rlCfg *ratelimiter.Config
if peer.host.ratelimiter != nil {
tmp := peer.host.ratelimiter.I().Config()
rlCfg = &tmp
rlCfg = &peer.host.ratelimiter.Config
}
go func() {
pow := peer.host.MinPow()
@ -143,10 +142,8 @@ func (peer *Peer) handshake() error {
}
egressCfg := ratelimiter.Config{}
if err := s.Decode(&egressCfg); err == nil {
if peer.host.ratelimiter != nil {
peer.host.ratelimiter.E().UpdateConfig(peer.peer, egressCfg)
}
if err := s.Decode(&egressCfg); err == nil && peer.host.ratelimiter != nil {
peer.host.ratelimiter.E.Create(peer.peer, egressCfg)
}
if err := <-errc; err != nil {
@ -215,10 +212,10 @@ func (peer *Peer) reduceBundle(bundle []*Envelope) []*Envelope {
})
for i := range bundle {
size := int64(bundle[i].size())
if peer.host.ratelimiter.E().Available(peer.peer) < size {
if peer.host.ratelimiter.E.Available(peer.peer) < size {
return bundle[:i]
}
peer.host.ratelimiter.E().TakeAvailable(peer.peer, size)
peer.host.ratelimiter.E.TakeAvailable(peer.peer, size)
}
return bundle
}

View File

@ -24,7 +24,7 @@ const (
func setupOneConnection(t *testing.T, rlconf ratelimiter.Config, egressConf ratelimiter.Config) (*Whisper, *p2p.MsgPipeRW, chan error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
require.NoError(t, err)
rl := ratelimiter.ForWhisper(ratelimiter.IDMode, db, rlconf, rlconf)
rl := ratelimiter.ForWhisper(ratelimiter.IDMode, db, rlconf)
conf := &Config{
MinimumAcceptedPOW: 0,
MaxMessageSize: 100 << 10,

View File

@ -769,12 +769,11 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
whisper.peerMu.Unlock()
}()
if whisper.ratelimiter != nil {
if err := whisper.ratelimiter.I().Create(whisperPeer.peer); err != nil {
if err := whisper.ratelimiter.I.Create(whisperPeer.peer, whisper.ratelimiter.Config); err != nil {
return err
}
defer whisper.ratelimiter.I().Remove(whisperPeer.peer, 0)
whisper.ratelimiter.E().Create(whisperPeer.peer)
defer whisper.ratelimiter.E().Remove(whisperPeer.peer, 0)
defer whisper.ratelimiter.I.Remove(whisperPeer.peer, 0)
defer whisper.ratelimiter.E.Remove(whisperPeer.peer, 0)
}
// Run the peer handshake and state updates
@ -790,7 +789,7 @@ func (whisper *Whisper) advertiseEgressLimit(p *Peer, rw p2p.MsgReadWriter) erro
if whisper.ratelimiter == nil {
return nil
}
if err := p2p.Send(rw, peerRateLimitCode, whisper.ratelimiter.I().Config()); err != nil {
if err := p2p.Send(rw, peerRateLimitCode, whisper.ratelimiter.Config); err != nil {
return fmt.Errorf("failed to send ingress rate limit to a peer %v: %v", p.peer.ID(), err)
}
return nil
@ -886,17 +885,6 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
whisper.mailServer.DeliverMail(p, &request)
}
case peerRateLimitCode:
if whisper.ratelimiter == nil {
continue
}
var conf ratelimiter.Config
if err := packet.Decode(&conf); err != nil {
return fmt.Errorf("peer %v sent wrong payload for a rate limiter config", p.peer.ID())
}
if err := whisper.ratelimiter.E().UpdateConfig(p.peer, conf); err != nil {
log.Error("error updaing rate limiter config", "peer", p.peer)
}
case p2pRequestCompleteCode:
if p.trusted {
var payload []byte
@ -953,8 +941,8 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if packet.Code != p2pMessageCode && whisper.ratelimiter != nil {
// TODO 300 should be a quantum size
if whisper.ratelimiter.I().TakeAvailable(p.peer, int64(packet.Size))+300 < int64(packet.Size) {
whisper.ratelimiter.I().Remove(p.peer, 10*time.Minute)
if whisper.ratelimiter.I.TakeAvailable(p.peer, int64(packet.Size))+300 < int64(packet.Size) {
whisper.ratelimiter.I.Remove(p.peer, 10*time.Minute)
return fmt.Errorf("peer %v reached traffic limit capacity", p.peer.ID())
}
}