From 4e1281431d6143bdd72f8418dc96a2deb499734d Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 4 Oct 2018 10:33:16 +0300 Subject: [PATCH] Simplify rate limiter interface --- ratelimiter/{ratelimiter_mock.go => mock.go} | 37 ++--------- ratelimiter/peer.go | 34 +++------- ratelimiter/peer_test.go | 6 +- ratelimiter/ratelimiter.go | 69 +++++++------------- ratelimiter/ratelimiter_test.go | 32 +++------ whisperv6/peer.go | 13 ++-- whisperv6/ratelimit_test.go | 2 +- whisperv6/whisper.go | 24 ++----- 8 files changed, 60 insertions(+), 157 deletions(-) rename ratelimiter/{ratelimiter_mock.go => mock.go} (68%) diff --git a/ratelimiter/ratelimiter_mock.go b/ratelimiter/mock.go similarity index 68% rename from ratelimiter/ratelimiter_mock.go rename to ratelimiter/mock.go index 34431db..52f8426 100644 --- a/ratelimiter/ratelimiter_mock.go +++ b/ratelimiter/mock.go @@ -1,14 +1,13 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ratelimiter/interface.go +// Source: ratelimiter/ratelimiter.go // Package ratelimiter is a generated GoMock package. package ratelimiter import ( + gomock "github.com/golang/mock/gomock" reflect "reflect" time "time" - - gomock "github.com/golang/mock/gomock" ) // MockInterface is a mock of Interface interface @@ -35,15 +34,15 @@ func (m *MockInterface) EXPECT() *MockInterfaceMockRecorder { } // Create mocks base method -func (m *MockInterface) Create(arg0 []byte) error { - ret := m.ctrl.Call(m, "Create", arg0) +func (m *MockInterface) Create(arg0 []byte, arg1 Config) error { + ret := m.ctrl.Call(m, "Create", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // Create indicates an expected call of Create -func (mr *MockInterfaceMockRecorder) Create(arg0 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockInterface)(nil).Create), arg0) +func (mr *MockInterfaceMockRecorder) Create(arg0, arg1 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockInterface)(nil).Create), arg0, arg1) } // Remove mocks base method @@ -81,27 +80,3 @@ func (m *MockInterface) Available(arg0 []byte) int64 { func (mr *MockInterfaceMockRecorder) Available(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Available", reflect.TypeOf((*MockInterface)(nil).Available), arg0) } - -// UpdateConfig mocks base method -func (m *MockInterface) UpdateConfig(arg0 []byte, arg1 Config) error { - ret := m.ctrl.Call(m, "UpdateConfig", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// UpdateConfig indicates an expected call of UpdateConfig -func (mr *MockInterfaceMockRecorder) UpdateConfig(arg0, arg1 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateConfig", reflect.TypeOf((*MockInterface)(nil).UpdateConfig), arg0, arg1) -} - -// Config mocks base method -func (m *MockInterface) Config() Config { - ret := m.ctrl.Call(m, "Config") - ret0, _ := ret[0].(Config) - return ret0 -} - -// Config indicates an expected call of Config -func (mr *MockInterfaceMockRecorder) Config() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Config", reflect.TypeOf((*MockInterface)(nil).Config)) -} diff --git a/ratelimiter/peer.go b/ratelimiter/peer.go index fe2a86d..7bacb48 100644 --- a/ratelimiter/peer.go +++ b/ratelimiter/peer.go @@ -47,14 +47,9 @@ type P2PRateLimiter struct { ratelimiter Interface } -// Config returns default configuration used for a particular rate limiter. -func (r P2PRateLimiter) Config() Config { - return r.ratelimiter.Config() -} - // Create instantiates rate limiter with for a peer. -func (r P2PRateLimiter) Create(peer *p2p.Peer) error { - return r.ratelimiter.Create(r.modeFunc(peer)) +func (r P2PRateLimiter) Create(peer *p2p.Peer, cfg Config) error { + return r.ratelimiter.Create(r.modeFunc(peer), cfg) } // Remove drops peer from in-memory rate limiter. If duration is non-zero peer will be blacklisted. @@ -72,30 +67,17 @@ func (r P2PRateLimiter) Available(peer *p2p.Peer) int64 { return r.ratelimiter.Available(r.modeFunc(peer)) } -// UpdateConfig update capacity and rate for the given peer. -func (r P2PRateLimiter) UpdateConfig(peer *p2p.Peer, config Config) error { - return r.ratelimiter.UpdateConfig(r.modeFunc(peer), config) -} - // Whisper is a convenience wrapper for whisper. type Whisper struct { - ingress, egress P2PRateLimiter + I, E P2PRateLimiter + Config Config } // ForWhisper returns a convenient wrapper to be used in whisper. -func ForWhisper(mode int, db DBInterface, ingress, egress Config) Whisper { +func ForWhisper(mode int, db DBInterface, ingress Config) Whisper { return Whisper{ - ingress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")), ingress)), - egress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")), egress)), + I: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")))), + E: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")))), + Config: ingress, } } - -// I returns instance of p2p rate limiter for ingress traffic. -func (w Whisper) I() P2PRateLimiter { - return w.ingress -} - -// E returns instance of p2p rate limiter egress traffic. -func (w Whisper) E() P2PRateLimiter { - return w.egress -} diff --git a/ratelimiter/peer_test.go b/ratelimiter/peer_test.go index 7a51432..759b560 100644 --- a/ratelimiter/peer_test.go +++ b/ratelimiter/peer_test.go @@ -15,15 +15,13 @@ func TestIDMode(t *testing.T) { peer := p2p.NewPeer(discover.NodeID{1}, "test", nil) ctrl := gomock.NewController(t) rl := NewMockInterface(ctrl) - rl.EXPECT().Create(peer.ID().Bytes()) + rl.EXPECT().Create(peer.ID().Bytes(), cfg) rl.EXPECT().TakeAvailable(peer.ID().Bytes(), int64(0)) rl.EXPECT().Available(peer.ID().Bytes()) rl.EXPECT().Remove(peer.ID().Bytes(), time.Duration(0)) - rl.EXPECT().UpdateConfig(peer.ID().Bytes(), cfg) peerrl := NewP2PRateLimiter(IDMode, rl) - require.NoError(t, peerrl.Create(peer)) + require.NoError(t, peerrl.Create(peer, cfg)) peerrl.TakeAvailable(peer, 0) peerrl.Available(peer) require.NoError(t, peerrl.Remove(peer, 0)) - require.NoError(t, peerrl.UpdateConfig(peer, cfg)) } diff --git a/ratelimiter/ratelimiter.go b/ratelimiter/ratelimiter.go index f8298a7..52fc0d5 100644 --- a/ratelimiter/ratelimiter.go +++ b/ratelimiter/ratelimiter.go @@ -2,6 +2,7 @@ package ratelimiter import ( "fmt" + "math" "sync" "time" @@ -12,12 +13,10 @@ import ( // Interface describes common ratelimiter methods. type Interface interface { - Create([]byte) error + Create([]byte, Config) error Remove([]byte, time.Duration) error TakeAvailable([]byte, int64) int64 Available([]byte) int64 - UpdateConfig([]byte, Config) error - Config() Config } // Config is a set of options used by rate limiter. @@ -35,19 +34,18 @@ func newBucket(c Config) *ratelimit.Bucket { return ratelimit.NewBucketWithQuantum(time.Duration(c.Interval), int64(c.Capacity), int64(c.Quantum)) } -func NewPersisted(db DBInterface, config Config) *PersistedRateLimiter { +// NewPersisted returns instance of rate limiter with persisted black listed records and capacity before peer was removed. +func NewPersisted(db DBInterface) *PersistedRateLimiter { return &PersistedRateLimiter{ - db: db, - defaultConfig: config, - initialized: map[string]*ratelimit.Bucket{}, - timeFunc: time.Now, + db: db, + initialized: map[string]*ratelimit.Bucket{}, + timeFunc: time.Now, } } // PersistedRateLimiter persists latest capacity and updated config per unique ID. type PersistedRateLimiter struct { - db DBInterface - defaultConfig Config + db DBInterface mu sync.Mutex initialized map[string]*ratelimit.Bucket @@ -66,26 +64,15 @@ func (r *PersistedRateLimiter) blacklist(id []byte, duration time.Duration) erro return nil } -// Config returns default config. -func (r *PersistedRateLimiter) Config() Config { - return r.defaultConfig -} - -func (r *PersistedRateLimiter) getOrCreate(id []byte, config Config) (bucket *ratelimit.Bucket) { +func (r *PersistedRateLimiter) get(id []byte) (bucket *ratelimit.Bucket, exist bool) { r.mu.Lock() defer r.mu.Unlock() - old, exist := r.initialized[string(id)] - if !exist { - bucket = newBucket(config) - r.initialized[string(id)] = bucket - } else { - bucket = old - } + bucket, exist = r.initialized[string(id)] return } // Create creates an instance for a provided ID. If ID was blacklisted error is returned. -func (r *PersistedRateLimiter) Create(id []byte) error { +func (r *PersistedRateLimiter) Create(id []byte, cfg Config) error { bl := BlacklistRecord{ID: id} if err := bl.Read(r.db); err != leveldb.ErrNotFound { if bl.Deadline.After(r.timeFunc()) { @@ -93,7 +80,10 @@ func (r *PersistedRateLimiter) Create(id []byte) error { } bl.Remove(r.db) } - bucket := r.getOrCreate(id, r.defaultConfig) + bucket := newBucket(cfg) + r.mu.Lock() + r.initialized[string(id)] = bucket + r.mu.Unlock() capacity := CapacityRecord{ID: id} if err := capacity.Read(r.db); err != nil { return nil @@ -136,7 +126,10 @@ func (r *PersistedRateLimiter) store(id []byte, bucket *ratelimit.Bucket) error // TakeAvailable subtracts requested amount from a rate limiter with ID. func (r *PersistedRateLimiter) TakeAvailable(id []byte, count int64) int64 { - bucket := r.getOrCreate(id, r.defaultConfig) + bucket, exist := r.get(id) + if !exist { + return math.MaxInt64 + } rst := bucket.TakeAvailable(count) if err := r.store(id, bucket); err != nil { log.Error(err.Error()) @@ -144,25 +137,11 @@ func (r *PersistedRateLimiter) TakeAvailable(id []byte, count int64) int64 { return rst } -// TakeAvailable peeks into available amount with a given ID. +// Available peeks into available amount with a given ID. func (r *PersistedRateLimiter) Available(id []byte) int64 { - return r.getOrCreate(id, r.defaultConfig).Available() -} - -// UpdateConfig updates config for a provided ID. -func (r *PersistedRateLimiter) UpdateConfig(id []byte, config Config) error { - r.mu.Lock() - old, _ := r.initialized[string(id)] - if compare(config, old) { - r.mu.Unlock() - return nil + bucket, exist := r.get(id) + if !exist { + return math.MaxInt64 } - delete(r.initialized, string(id)) - r.mu.Unlock() - taken := int64(0) - if old != nil { - taken = old.Capacity() - old.Available() - } - r.getOrCreate(id, config).TakeAvailable(taken) - return nil + return bucket.Available() } diff --git a/ratelimiter/ratelimiter_test.go b/ratelimiter/ratelimiter_test.go index ae914be..be4a2cf 100644 --- a/ratelimiter/ratelimiter_test.go +++ b/ratelimiter/ratelimiter_test.go @@ -15,47 +15,31 @@ func TestLimitIsPersisted(t *testing.T) { require.NoError(t, err) var ( total int64 = 10000 - rl = NewPersisted(db, Config{1 << 62, uint64(10000), 1}) + cfg = Config{1 << 62, uint64(10000), 1} + rl = NewPersisted(db) tid = []byte("test") ) - require.NoError(t, rl.Create(tid)) + require.NoError(t, rl.Create(tid, cfg)) taken := rl.TakeAvailable(tid, total/2) require.Equal(t, total/2, taken) require.NoError(t, rl.Remove(tid, 0)) - require.NoError(t, rl.Create(tid)) + require.NoError(t, rl.Create(tid, cfg)) require.Equal(t, total/2, rl.Available(tid)) } -func TestConfigFixedOnUpdate(t *testing.T) { - db, err := leveldb.Open(storage.NewMemStorage(), nil) - require.NoError(t, err) - var ( - total int64 = 10000 - cfg = Config{1 << 62, uint64(10000), 1} - rl = NewPersisted(db, cfg) - tid = []byte("test") - ) - require.NoError(t, rl.Create(tid)) - taken := rl.TakeAvailable(tid, total/2) - require.Equal(t, total/2, taken) - cfg.Capacity = 6000 - require.NoError(t, rl.UpdateConfig(tid, cfg)) - require.Equal(t, int64(cfg.Capacity)-total/2, rl.Available(tid)) -} - func TestBlacklistedEntityReturnsError(t *testing.T) { db, err := leveldb.Open(storage.NewMemStorage(), nil) require.NoError(t, err) var ( cfg = Config{1 << 62, uint64(10000), 1} - rl = NewPersisted(db, cfg) + rl = NewPersisted(db) tid = []byte("test") ) - require.NoError(t, rl.Create(tid)) + require.NoError(t, rl.Create(tid, cfg)) require.NoError(t, rl.Remove(tid, 10*time.Minute)) - require.EqualError(t, fmt.Errorf("identity %x is blacklisted", tid), rl.Create(tid).Error()) + require.EqualError(t, fmt.Errorf("identity %x is blacklisted", tid), rl.Create(tid, cfg).Error()) rl.timeFunc = func() time.Time { return time.Now().Add(11 * time.Minute) } - require.NoError(t, rl.Create(tid)) + require.NoError(t, rl.Create(tid, cfg)) } diff --git a/whisperv6/peer.go b/whisperv6/peer.go index b646a7a..cbbe806 100644 --- a/whisperv6/peer.go +++ b/whisperv6/peer.go @@ -85,8 +85,7 @@ func (peer *Peer) handshake() error { isRestrictedLightNodeConnection := peer.host.LightClientModeConnectionRestricted() var rlCfg *ratelimiter.Config if peer.host.ratelimiter != nil { - tmp := peer.host.ratelimiter.I().Config() - rlCfg = &tmp + rlCfg = &peer.host.ratelimiter.Config } go func() { pow := peer.host.MinPow() @@ -143,10 +142,8 @@ func (peer *Peer) handshake() error { } egressCfg := ratelimiter.Config{} - if err := s.Decode(&egressCfg); err == nil { - if peer.host.ratelimiter != nil { - peer.host.ratelimiter.E().UpdateConfig(peer.peer, egressCfg) - } + if err := s.Decode(&egressCfg); err == nil && peer.host.ratelimiter != nil { + peer.host.ratelimiter.E.Create(peer.peer, egressCfg) } if err := <-errc; err != nil { @@ -215,10 +212,10 @@ func (peer *Peer) reduceBundle(bundle []*Envelope) []*Envelope { }) for i := range bundle { size := int64(bundle[i].size()) - if peer.host.ratelimiter.E().Available(peer.peer) < size { + if peer.host.ratelimiter.E.Available(peer.peer) < size { return bundle[:i] } - peer.host.ratelimiter.E().TakeAvailable(peer.peer, size) + peer.host.ratelimiter.E.TakeAvailable(peer.peer, size) } return bundle } diff --git a/whisperv6/ratelimit_test.go b/whisperv6/ratelimit_test.go index 3cdb753..58d33dd 100644 --- a/whisperv6/ratelimit_test.go +++ b/whisperv6/ratelimit_test.go @@ -24,7 +24,7 @@ const ( func setupOneConnection(t *testing.T, rlconf ratelimiter.Config, egressConf ratelimiter.Config) (*Whisper, *p2p.MsgPipeRW, chan error) { db, err := leveldb.Open(storage.NewMemStorage(), nil) require.NoError(t, err) - rl := ratelimiter.ForWhisper(ratelimiter.IDMode, db, rlconf, rlconf) + rl := ratelimiter.ForWhisper(ratelimiter.IDMode, db, rlconf) conf := &Config{ MinimumAcceptedPOW: 0, MaxMessageSize: 100 << 10, diff --git a/whisperv6/whisper.go b/whisperv6/whisper.go index f1acf62..ca1ede4 100644 --- a/whisperv6/whisper.go +++ b/whisperv6/whisper.go @@ -769,12 +769,11 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { whisper.peerMu.Unlock() }() if whisper.ratelimiter != nil { - if err := whisper.ratelimiter.I().Create(whisperPeer.peer); err != nil { + if err := whisper.ratelimiter.I.Create(whisperPeer.peer, whisper.ratelimiter.Config); err != nil { return err } - defer whisper.ratelimiter.I().Remove(whisperPeer.peer, 0) - whisper.ratelimiter.E().Create(whisperPeer.peer) - defer whisper.ratelimiter.E().Remove(whisperPeer.peer, 0) + defer whisper.ratelimiter.I.Remove(whisperPeer.peer, 0) + defer whisper.ratelimiter.E.Remove(whisperPeer.peer, 0) } // Run the peer handshake and state updates @@ -790,7 +789,7 @@ func (whisper *Whisper) advertiseEgressLimit(p *Peer, rw p2p.MsgReadWriter) erro if whisper.ratelimiter == nil { return nil } - if err := p2p.Send(rw, peerRateLimitCode, whisper.ratelimiter.I().Config()); err != nil { + if err := p2p.Send(rw, peerRateLimitCode, whisper.ratelimiter.Config); err != nil { return fmt.Errorf("failed to send ingress rate limit to a peer %v: %v", p.peer.ID(), err) } return nil @@ -886,17 +885,6 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { whisper.mailServer.DeliverMail(p, &request) } - case peerRateLimitCode: - if whisper.ratelimiter == nil { - continue - } - var conf ratelimiter.Config - if err := packet.Decode(&conf); err != nil { - return fmt.Errorf("peer %v sent wrong payload for a rate limiter config", p.peer.ID()) - } - if err := whisper.ratelimiter.E().UpdateConfig(p.peer, conf); err != nil { - log.Error("error updaing rate limiter config", "peer", p.peer) - } case p2pRequestCompleteCode: if p.trusted { var payload []byte @@ -953,8 +941,8 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { if packet.Code != p2pMessageCode && whisper.ratelimiter != nil { // TODO 300 should be a quantum size - if whisper.ratelimiter.I().TakeAvailable(p.peer, int64(packet.Size))+300 < int64(packet.Size) { - whisper.ratelimiter.I().Remove(p.peer, 10*time.Minute) + if whisper.ratelimiter.I.TakeAvailable(p.peer, int64(packet.Size))+300 < int64(packet.Size) { + whisper.ratelimiter.I.Remove(p.peer, 10*time.Minute) return fmt.Errorf("peer %v reached traffic limit capacity", p.peer.ID()) } }