Decouple ratelimiter from status-go

This commit is contained in:
Dmitry 2018-10-02 11:49:16 +03:00
parent 71461f9361
commit ffcf1b3cc0
6 changed files with 141 additions and 70 deletions

112
ratelimiter/db.go Normal file
View File

@ -0,0 +1,112 @@
package ratelimiter
import (
"encoding/binary"
time "time"
"github.com/syndtr/goleveldb/leveldb/opt"
)
const (
BlacklistBucket byte = 1 + iota
CapacityBucket
)
// DBInterface defines leveldb methods used by ratelimiter.
type DBInterface interface {
Put(key, value []byte, wo *opt.WriteOptions) error
Get(key []byte, ro *opt.ReadOptions) (value []byte, err error)
Delete(key []byte, wo *opt.WriteOptions) error
}
func WithPrefix(db DBInterface, prefix []byte) IsolatedDB {
return IsolatedDB{db: db, prefix: prefix}
}
type IsolatedDB struct {
db DBInterface
prefix []byte
}
func (db IsolatedDB) withPrefix(key []byte) []byte {
fkey := make([]byte, len(db.prefix)+len(key))
copy(fkey, db.prefix)
copy(fkey[len(db.prefix):], key)
return fkey
}
func (db IsolatedDB) Put(key, value []byte, wo *opt.WriteOptions) error {
return db.db.Put(db.withPrefix(key), value, wo)
}
func (db IsolatedDB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
return db.db.Get(db.withPrefix(key), ro)
}
func (db IsolatedDB) Delete(key []byte, wo *opt.WriteOptions) error {
return db.db.Delete(db.withPrefix(key), wo)
}
// BlacklistRecord is a record with information of a deadline for a particular ID.
type BlacklistRecord struct {
ID []byte
Deadline time.Time
}
func (r BlacklistRecord) Key() []byte {
key := make([]byte, len(r.ID)+1)
key[0] = BlacklistBucket
copy(key[1:], r.ID)
return key
}
func (r BlacklistRecord) Write(db DBInterface) error {
buf := [8]byte{}
binary.BigEndian.PutUint64(buf[:], uint64(r.Deadline.Unix()))
return db.Put(r.Key(), buf[:], nil)
}
func (r *BlacklistRecord) Read(db DBInterface) error {
val, err := db.Get(r.Key(), nil)
if err != nil {
return err
}
deadline := binary.BigEndian.Uint64(val)
r.Deadline = time.Unix(int64(deadline), 0)
return nil
}
func (r BlacklistRecord) Remove(db DBInterface) error {
return db.Delete(r.Key(), nil)
}
// CapacityRecord tracks how much was taken from a bucket and a time.
type CapacityRecord struct {
ID []byte
Taken int64
Timestamp time.Time
}
func (r CapacityRecord) Key() []byte {
key := make([]byte, len(r.ID)+1)
key[0] = CapacityBucket
copy(key[1:], r.ID)
return key
}
func (r CapacityRecord) Write(db DBInterface) error {
buf := [16]byte{}
binary.BigEndian.PutUint64(buf[:], uint64(r.Taken))
binary.BigEndian.PutUint64(buf[8:], uint64(r.Timestamp.Unix()))
return db.Put(r.Key(), buf[:], nil)
}
func (r *CapacityRecord) Read(db DBInterface) error {
val, err := db.Get(r.Key(), nil)
if err != nil {
return err
}
r.Taken = int64(binary.BigEndian.Uint64(val[:8]))
r.Timestamp = time.Unix(int64(binary.BigEndian.Uint64(val[8:])), 0)
return nil
}

View File

@ -77,8 +77,8 @@ type Whisper struct {
func ForWhisper(mode int, db DBInterface, ingress, egress Config) Whisper {
return Whisper{
ingress: NewP2PRateLimiter(mode, NewPersisted(db, ingress, []byte("i"))),
egress: NewP2PRateLimiter(mode, NewPersisted(db, egress, []byte("e"))),
ingress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("i")), ingress)),
egress: NewP2PRateLimiter(mode, NewPersisted(WithPrefix(db, []byte("e")), egress)),
}
}

View File

@ -1,20 +1,16 @@
package ratelimiter
import (
"encoding/binary"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/juju/ratelimit"
"github.com/status-im/status-go/db"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
// Interface describes common interface methods.
// Interface describes common ratelimiter methods.
type Interface interface {
Create([]byte) error
Remove([]byte, time.Duration) error
@ -24,13 +20,6 @@ type Interface interface {
Config() Config
}
// DBInterface defines leveldb methods used by ratelimiter.
type DBInterface interface {
Put(key, value []byte, wo *opt.WriteOptions) error
Get(key []byte, ro *opt.ReadOptions) (value []byte, err error)
Delete(key []byte, wo *opt.WriteOptions) error
}
// Config is a set of options used by rate limiter.
type Config struct {
Interval, Capacity, Quantum uint64
@ -46,12 +35,11 @@ func newBucket(c Config) *ratelimit.Bucket {
return ratelimit.NewBucketWithQuantum(time.Duration(c.Interval), int64(c.Capacity), int64(c.Quantum))
}
func NewPersisted(db DBInterface, config Config, prefix []byte) *PersistedRateLimiter {
func NewPersisted(db DBInterface, config Config) *PersistedRateLimiter {
return &PersistedRateLimiter{
db: db,
defaultConfig: config,
initialized: map[string]*ratelimit.Bucket{},
prefix: prefix,
timeFunc: time.Now,
}
}
@ -59,7 +47,6 @@ func NewPersisted(db DBInterface, config Config, prefix []byte) *PersistedRateLi
// PersistedRateLimiter persists latest capacity and updated config per unique ID.
type PersistedRateLimiter struct {
db DBInterface
prefix []byte // TODO move prefix outside of the rate limiter using database interface
defaultConfig Config
mu sync.Mutex
@ -69,10 +56,11 @@ type PersistedRateLimiter struct {
}
func (r *PersistedRateLimiter) blacklist(id []byte, duration time.Duration) error {
fkey := db.Key(db.RateLimitBlacklist, r.prefix, id)
buf := [8]byte{}
binary.BigEndian.PutUint64(buf[:], uint64(r.timeFunc().Add(duration).Unix()))
if err := r.db.Put(fkey, buf[:], nil); err != nil {
if duration == 0 {
return nil
}
record := BlacklistRecord{ID: id, Deadline: r.timeFunc().Add(duration)}
if err := record.Write(r.db); err != nil {
return fmt.Errorf("error blacklisting %x: %v", id, err)
}
return nil
@ -96,39 +84,19 @@ func (r *PersistedRateLimiter) getOrCreate(id []byte, config Config) (bucket *ra
}
func (r *PersistedRateLimiter) Create(id []byte) error {
fkey := db.Key(db.RateLimitBlacklist, r.prefix, id)
val, err := r.db.Get(fkey, nil)
if err != leveldb.ErrNotFound {
deadline := binary.BigEndian.Uint64(val)
if deadline >= uint64(r.timeFunc().Unix()) {
bl := BlacklistRecord{ID: id}
if err := bl.Read(r.db); err != leveldb.ErrNotFound {
if bl.Deadline.After(r.timeFunc()) {
return fmt.Errorf("identity %x is blacklisted", id)
}
r.db.Delete(fkey, nil)
bl.Remove(r.db)
}
fkey = db.Key(db.RateLimitConfig, r.prefix, id)
val, err = r.db.Get(fkey, nil)
var cfg Config
if err == leveldb.ErrNotFound {
cfg = r.defaultConfig
} else if err != nil {
log.Error("faield to read config from db. using default", "err", err)
cfg = r.defaultConfig
} else {
if err := rlp.DecodeBytes(val, &cfg); err != nil {
log.Error("failed to decode config. using default", "err", err)
cfg = r.defaultConfig
}
}
bucket := r.getOrCreate(id, cfg)
fkey = db.Key(db.RateLimitCapacity, r.prefix, id)
val, err = r.db.Get(fkey, nil)
if err == leveldb.ErrNotFound {
return nil
} else if len(val) != 16 {
log.Error("stored value is of unexpected length", "expected", 8, "stored", len(val))
bucket := r.getOrCreate(id, r.defaultConfig)
capacity := CapacityRecord{ID: id}
if err := capacity.Read(r.db); err != nil {
return nil
}
bucket.TakeAvailable(int64(binary.BigEndian.Uint64(val[:8])))
bucket.TakeAvailable(capacity.Taken)
// TODO refill rate limiter due to time difference. e.g. if record was stored at T and C seconds passed since T.
// we need to add RATE_PER_SECOND*C to a bucket
return nil
@ -152,11 +120,12 @@ func (r *PersistedRateLimiter) Remove(id []byte, duration time.Duration) error {
}
func (r *PersistedRateLimiter) store(id []byte, bucket *ratelimit.Bucket) error {
buf := [16]byte{}
binary.BigEndian.PutUint64(buf[:], uint64(bucket.Capacity()-bucket.Available()))
binary.BigEndian.PutUint64(buf[8:], uint64(r.timeFunc().Unix()))
err := r.db.Put(db.Key(db.RateLimitCapacity, r.prefix, id), buf[:], nil)
if err != nil {
capacity := CapacityRecord{
ID: id,
Taken: bucket.Capacity() - bucket.Available(),
Timestamp: r.timeFunc(),
}
if err := capacity.Write(r.db); err != nil {
return fmt.Errorf("failed to write current capacicity %d for id %x: %v",
bucket.Capacity(), id, err)
}
@ -190,12 +159,5 @@ func (r *PersistedRateLimiter) UpdateConfig(id []byte, config Config) error {
taken = old.Capacity() - old.Available()
}
r.getOrCreate(id, config).TakeAvailable(taken)
fkey := db.Key(db.RateLimitConfig, r.prefix, id)
data, err := rlp.EncodeToBytes(config)
if err != nil {
log.Error("failed to update config", "cfg", config, "err", err)
return nil
}
r.db.Put(fkey, data, nil)
return nil
}

View File

@ -15,7 +15,7 @@ func TestLimitIsPersisted(t *testing.T) {
require.NoError(t, err)
var (
total int64 = 10000
rl = NewPersisted(db, Config{1 << 62, uint64(10000), 1}, nil)
rl = NewPersisted(db, Config{1 << 62, uint64(10000), 1})
tid = []byte("test")
)
require.NoError(t, rl.Create(tid))
@ -26,13 +26,13 @@ func TestLimitIsPersisted(t *testing.T) {
require.Equal(t, total/2, rl.Available(tid))
}
func TestConfigIsPersistedAndFixedOnUpdate(t *testing.T) {
func TestConfigFixedOnUpdate(t *testing.T) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
require.NoError(t, err)
var (
total int64 = 10000
cfg = Config{1 << 62, uint64(10000), 1}
rl = NewPersisted(db, cfg, nil)
rl = NewPersisted(db, cfg)
tid = []byte("test")
)
require.NoError(t, rl.Create(tid))
@ -41,9 +41,6 @@ func TestConfigIsPersistedAndFixedOnUpdate(t *testing.T) {
cfg.Capacity = 6000
require.NoError(t, rl.UpdateConfig(tid, cfg))
require.Equal(t, int64(cfg.Capacity)-total/2, rl.Available(tid))
require.NoError(t, rl.Remove(tid, 0))
require.NoError(t, rl.Create(tid))
require.Equal(t, int64(cfg.Capacity)-total/2, rl.Available(tid))
}
func TestBlacklistedEntityReturnsError(t *testing.T) {
@ -51,7 +48,7 @@ func TestBlacklistedEntityReturnsError(t *testing.T) {
require.NoError(t, err)
var (
cfg = Config{1 << 62, uint64(10000), 1}
rl = NewPersisted(db, cfg, nil)
rl = NewPersisted(db, cfg)
tid = []byte("test")
)
require.NoError(t, rl.Create(tid))

View File

@ -10,7 +10,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/status-im/status-go/ratelimiter"
"github.com/status-im/whisper/ratelimiter"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb"

View File

@ -34,7 +34,7 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/ratelimiter"
"github.com/status-im/whisper/ratelimiter"
"github.com/syndtr/goleveldb/leveldb/errors"
"golang.org/x/crypto/pbkdf2"
"golang.org/x/sync/syncmap"