[#ISSUE-947/ISSUE-946] Rate limit and batch limit on any mail server query (#949)
This commit is contained in:
parent
6da469140f
commit
cf8ad7e002
|
@ -227,12 +227,7 @@ func activateShhService(stack *node.Node, config *params.NodeConfig, db *leveldb
|
|||
|
||||
var mailServer mailserver.WMailServer
|
||||
whisperService.RegisterServer(&mailServer)
|
||||
err := mailServer.Init(
|
||||
whisperService,
|
||||
config.WhisperConfig.DataDir,
|
||||
config.WhisperConfig.Password,
|
||||
config.WhisperConfig.MinimumPoW,
|
||||
)
|
||||
err := mailServer.Init(whisperService, config.WhisperConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -105,6 +105,12 @@ type WhisperConfig struct {
|
|||
// MinimumPoW minimum PoW for Whisper messages
|
||||
MinimumPoW float64
|
||||
|
||||
// RateLimit minimum time between queries to mail server per peer
|
||||
MailServerRateLimit int
|
||||
|
||||
// MailServerCleanupPeriod time in seconds to wait to run mail server cleanup
|
||||
MailServerCleanupPeriod int
|
||||
|
||||
// TTL time to live for messages, in seconds
|
||||
TTL int
|
||||
|
||||
|
|
|
@ -19,22 +19,32 @@ package mailserver
|
|||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
||||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
)
|
||||
|
||||
const (
|
||||
maxQueryRange = 24 * time.Hour
|
||||
)
|
||||
|
||||
// WMailServer whisper mailserver
|
||||
type WMailServer struct {
|
||||
db *leveldb.DB
|
||||
w *whisper.Whisper
|
||||
pow float64
|
||||
key []byte
|
||||
db *leveldb.DB
|
||||
w *whisper.Whisper
|
||||
pow float64
|
||||
key []byte
|
||||
limit *limiter
|
||||
tick *ticker
|
||||
}
|
||||
|
||||
// DBKey key to be stored on db
|
||||
|
@ -57,25 +67,27 @@ func NewDbKey(t uint32, h common.Hash) *DBKey {
|
|||
}
|
||||
|
||||
// Init initializes mailServer
|
||||
func (s *WMailServer) Init(shh *whisper.Whisper, path string, password string, pow float64) error {
|
||||
func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) error {
|
||||
var err error
|
||||
if len(path) == 0 {
|
||||
return fmt.Errorf("DB file is not specified")
|
||||
|
||||
if len(config.DataDir) == 0 {
|
||||
return fmt.Errorf("data directory not provided")
|
||||
}
|
||||
|
||||
if len(password) == 0 {
|
||||
path := filepath.Join(config.DataDir, "mailserver", "data")
|
||||
if len(config.Password) == 0 {
|
||||
return fmt.Errorf("password is not specified")
|
||||
}
|
||||
|
||||
s.db, err = leveldb.OpenFile(path, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open DB file: %s", err)
|
||||
return fmt.Errorf("open DB: %s", err)
|
||||
}
|
||||
|
||||
s.w = shh
|
||||
s.pow = pow
|
||||
s.pow = config.MinimumPoW
|
||||
|
||||
MailServerKeyID, err := s.w.AddSymKeyFromPassword(password)
|
||||
MailServerKeyID, err := s.w.AddSymKeyFromPassword(config.Password)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create symmetric key: %s", err)
|
||||
}
|
||||
|
@ -83,17 +95,34 @@ func (s *WMailServer) Init(shh *whisper.Whisper, path string, password string, p
|
|||
if err != nil {
|
||||
return fmt.Errorf("save symmetric key: %s", err)
|
||||
}
|
||||
limit := time.Duration(config.MailServerRateLimit) * time.Second
|
||||
if limit > 0 {
|
||||
s.limit = newLimiter(limit)
|
||||
s.setupMailServerCleanup(limit)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *WMailServer) setupMailServerCleanup(period time.Duration) {
|
||||
if period <= 0 {
|
||||
return
|
||||
}
|
||||
if s.tick == nil {
|
||||
s.tick = &ticker{}
|
||||
}
|
||||
go s.tick.run(period, s.limit.deleteExpired)
|
||||
}
|
||||
|
||||
// Close the mailserver and its associated db connection
|
||||
func (s *WMailServer) Close() {
|
||||
if s.db != nil {
|
||||
func() {
|
||||
if err := s.db.Close(); err != nil {
|
||||
log.Error(fmt.Sprintf("s.db.Close failed: %s", err))
|
||||
}
|
||||
}()
|
||||
if err := s.db.Close(); err != nil {
|
||||
log.Error(fmt.Sprintf("s.db.Close failed: %s", err))
|
||||
}
|
||||
}
|
||||
if s.tick != nil {
|
||||
s.tick.stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -117,6 +146,14 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
|
|||
log.Error("Whisper peer is nil")
|
||||
return
|
||||
}
|
||||
if s.limit != nil {
|
||||
peerID := string(peer.ID())
|
||||
if !s.limit.isAllowed(peerID) {
|
||||
log.Info("peerID exceeded the number of requests per second")
|
||||
return
|
||||
}
|
||||
s.limit.add(peerID)
|
||||
}
|
||||
|
||||
ok, lower, upper, bloom := s.validateRequest(peer.ID(), request)
|
||||
if ok {
|
||||
|
@ -200,5 +237,57 @@ func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope)
|
|||
|
||||
lower := binary.BigEndian.Uint32(decrypted.Payload[:4])
|
||||
upper := binary.BigEndian.Uint32(decrypted.Payload[4:8])
|
||||
|
||||
lowerTime := time.Unix(int64(lower), 0)
|
||||
upperTime := time.Unix(int64(upper), 0)
|
||||
if upperTime.Sub(lowerTime) > maxQueryRange {
|
||||
log.Warn(fmt.Sprintf("Query range too big for peer %s", string(peerID)))
|
||||
return false, 0, 0, nil
|
||||
}
|
||||
|
||||
return true, lower, upper, bloom
|
||||
}
|
||||
|
||||
type limiter struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
timeout time.Duration
|
||||
db map[string]time.Time
|
||||
}
|
||||
|
||||
func newLimiter(timeout time.Duration) *limiter {
|
||||
return &limiter{
|
||||
timeout: timeout,
|
||||
db: make(map[string]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *limiter) add(id string) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
l.db[id] = time.Now()
|
||||
}
|
||||
|
||||
func (l *limiter) isAllowed(id string) bool {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
|
||||
if lastRequestTime, ok := l.db[id]; ok {
|
||||
return lastRequestTime.Add(l.timeout).Before(time.Now())
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (l *limiter) deleteExpired() {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for id, lastRequestTime := range l.db {
|
||||
if lastRequestTime.Add(l.timeout).Before(now) {
|
||||
delete(l.db, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,16 +21,17 @@ import (
|
|||
"crypto/ecdsa"
|
||||
"encoding/binary"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
||||
"github.com/status-im/status-go/geth/params"
|
||||
)
|
||||
|
||||
const powRequirement = 0.00001
|
||||
const peerID = "peerID"
|
||||
|
||||
var keyID string
|
||||
var shh *whisper.Whisper
|
||||
|
@ -38,6 +39,7 @@ var seed = time.Now().Unix()
|
|||
|
||||
type ServerTestParams struct {
|
||||
topic whisper.TopicType
|
||||
birth uint32
|
||||
low uint32
|
||||
upp uint32
|
||||
key *ecdsa.PrivateKey
|
||||
|
@ -58,6 +60,39 @@ func TestDBKey(t *testing.T) {
|
|||
assert(byte(i/0x1000000) == k.raw[0], "big endian expected", t)
|
||||
}
|
||||
|
||||
func TestMailServer(t *testing.T) {
|
||||
var server WMailServer
|
||||
|
||||
setupServer(t, &server)
|
||||
defer server.Close()
|
||||
|
||||
env := generateEnvelope(t)
|
||||
server.Archive(env)
|
||||
deliverTest(t, &server, env)
|
||||
}
|
||||
|
||||
func TestRateLimits(t *testing.T) {
|
||||
l := newLimiter(time.Duration(5 * time.Millisecond))
|
||||
assert(l.isAllowed(peerID), "Expected limiter not to allow with empty db", t)
|
||||
|
||||
l.db[peerID] = time.Now().Add(time.Duration(-10 * time.Millisecond))
|
||||
assert(l.isAllowed(peerID), "Expected limiter to allow with peer on its db", t)
|
||||
|
||||
l.db[peerID] = time.Now().Add(time.Duration(-1 * time.Millisecond))
|
||||
assert(!l.isAllowed(peerID), "Expected limiter to not allow with peer on its db", t)
|
||||
}
|
||||
|
||||
func TestRemoveExpiredRateLimits(t *testing.T) {
|
||||
l := newLimiter(time.Duration(10) * time.Second)
|
||||
l.db[peerID] = time.Now().Add(time.Duration(-10) * time.Second)
|
||||
l.db[peerID+"A"] = time.Now().Add(time.Duration(10) * time.Second)
|
||||
l.deleteExpired()
|
||||
_, ok := l.db[peerID]
|
||||
assert(!ok, "Expired peer should not exist, but it does ", t)
|
||||
_, ok = l.db[peerID+"A"]
|
||||
assert(ok, "Non expired peer should exist, but it doesn't", t)
|
||||
}
|
||||
|
||||
func generateEnvelope(t *testing.T) *whisper.Envelope {
|
||||
h := crypto.Keccak256Hash([]byte("test sample data"))
|
||||
params := &whisper.MessageParams{
|
||||
|
@ -79,37 +114,7 @@ func generateEnvelope(t *testing.T) *whisper.Envelope {
|
|||
return env
|
||||
}
|
||||
|
||||
func TestMailServer(t *testing.T) {
|
||||
const password = "password_for_this_test"
|
||||
const dbPath = "whisper-server-test"
|
||||
|
||||
dir, err := ioutil.TempDir("", dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var server WMailServer
|
||||
shh = whisper.New(&whisper.DefaultConfig)
|
||||
shh.RegisterServer(&server)
|
||||
|
||||
err = server.Init(shh, dir, password, powRequirement)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
keyID, err = shh.AddSymKeyFromPassword(password)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create symmetric key for mail request: %s", err)
|
||||
}
|
||||
|
||||
rand.Seed(seed)
|
||||
env := generateEnvelope(t)
|
||||
server.Archive(env)
|
||||
deliverTest(t, &server, env)
|
||||
}
|
||||
|
||||
func deliverTest(t *testing.T, server *WMailServer, env *whisper.Envelope) {
|
||||
func serverParams(t *testing.T, env *whisper.Envelope) *ServerTestParams {
|
||||
id, err := shh.NewKeyPair()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to generate new key pair with seed %d: %s.", seed, err)
|
||||
|
@ -119,25 +124,44 @@ func deliverTest(t *testing.T, server *WMailServer, env *whisper.Envelope) {
|
|||
t.Fatalf("failed to retrieve new key pair with seed %d: %s.", seed, err)
|
||||
}
|
||||
birth := env.Expiry - env.TTL
|
||||
p := &ServerTestParams{
|
||||
|
||||
return &ServerTestParams{
|
||||
topic: env.Topic,
|
||||
birth: birth,
|
||||
low: birth - 1,
|
||||
upp: birth + 1,
|
||||
key: testPeerID,
|
||||
}
|
||||
|
||||
}
|
||||
func deliverTest(t *testing.T, server *WMailServer, env *whisper.Envelope) {
|
||||
p := serverParams(t, env)
|
||||
singleRequest(t, server, env, p, true)
|
||||
|
||||
p.low, p.upp = birth+1, 0xffffffff
|
||||
p.low = p.birth + 1
|
||||
p.upp = p.birth + 1
|
||||
singleRequest(t, server, env, p, false)
|
||||
|
||||
p.low, p.upp = 0, birth-1
|
||||
singleRequest(t, server, env, p, false)
|
||||
|
||||
p.low = birth - 1
|
||||
p.upp = birth + 1
|
||||
p.low = p.birth
|
||||
p.upp = p.birth + 1
|
||||
p.topic[0] = 0xFF
|
||||
singleRequest(t, server, env, p, false)
|
||||
|
||||
p.low = 0
|
||||
p.upp = p.birth - 1
|
||||
failRequest(t, server, p, "validation should fail due to negative query time range")
|
||||
|
||||
p.low = 0
|
||||
p.upp = p.birth + 24
|
||||
failRequest(t, server, p, "validation should fail due to query big time range")
|
||||
}
|
||||
|
||||
func failRequest(t *testing.T, server *WMailServer, p *ServerTestParams, err string) {
|
||||
request := createRequest(t, p)
|
||||
src := crypto.FromECDSAPub(&p.key.PublicKey)
|
||||
ok, _, _, _ := server.validateRequest(src, request)
|
||||
if ok {
|
||||
t.Fatalf(err)
|
||||
}
|
||||
}
|
||||
|
||||
func singleRequest(t *testing.T, server *WMailServer, env *whisper.Envelope, p *ServerTestParams, expect bool) {
|
||||
|
@ -210,3 +234,26 @@ func createRequest(t *testing.T, p *ServerTestParams) *whisper.Envelope {
|
|||
}
|
||||
return env
|
||||
}
|
||||
|
||||
func setupServer(t *testing.T, server *WMailServer) {
|
||||
const password = "password_for_this_test"
|
||||
const dbPath = "whisper-server-test"
|
||||
|
||||
dir, err := ioutil.TempDir("", dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
shh = whisper.New(&whisper.DefaultConfig)
|
||||
shh.RegisterServer(server)
|
||||
|
||||
err = server.Init(shh, ¶ms.WhisperConfig{DataDir: dir, Password: password, MinimumPoW: powRequirement})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
keyID, err = shh.AddSymKeyFromPassword(password)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create symmetric key for mail request: %s", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
package mailserver
|
||||
|
||||
import "time"
|
||||
|
||||
type ticker struct {
|
||||
timeTicker *time.Ticker
|
||||
}
|
||||
|
||||
func (t *ticker) run(period time.Duration, fn func()) {
|
||||
if t.timeTicker != nil {
|
||||
return
|
||||
}
|
||||
|
||||
t.timeTicker = time.NewTicker(period)
|
||||
go func() {
|
||||
for range t.timeTicker.C {
|
||||
fn()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (t *ticker) stop() {
|
||||
t.timeTicker.Stop()
|
||||
}
|
|
@ -95,6 +95,7 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
|
|||
// Act.
|
||||
|
||||
// Request messages (including the previous one, expired) from mailbox.
|
||||
from := senderWhisperService.GetCurrentTime().Add(-12 * time.Hour)
|
||||
reqMessagesBody := `{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
|
@ -103,7 +104,7 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
|
|||
"mailServerPeer":"` + mailboxPeerStr + `",
|
||||
"topic":"` + topic.String() + `",
|
||||
"symKeyID":"` + MailServerKeyID + `",
|
||||
"from":0,
|
||||
"from":` + strconv.FormatInt(from.Unix(), 10) + `,
|
||||
"to":` + strconv.FormatInt(senderWhisperService.GetCurrentTime().Unix(), 10) + `
|
||||
}]
|
||||
}`
|
||||
|
@ -482,6 +483,7 @@ func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) strin
|
|||
|
||||
// requestHistoricMessages asks a mailnode to resend messages.
|
||||
func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) {
|
||||
from := w.GetCurrentTime().Add(-12 * time.Hour)
|
||||
resp := rpcCli.CallRaw(`{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 2,
|
||||
|
@ -490,7 +492,7 @@ func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli
|
|||
"mailServerPeer":"` + mailboxEnode + `",
|
||||
"topic":"` + topic + `",
|
||||
"symKeyID":"` + mailServerKeyID + `",
|
||||
"from":0,
|
||||
"from":` + strconv.FormatInt(from.Unix(), 10) + `,
|
||||
"to":` + strconv.FormatInt(w.GetCurrentTime().Unix(), 10) + `
|
||||
}]
|
||||
}`)
|
||||
|
|
Loading…
Reference in New Issue