mailserver: build pruning into WMailServer (#1342)

MailServer pruning was implemented as a separate command but that required stopping a mail server and executing the command manually. This change builds pruning into MailServer and can be set using MailServerDataRetention in WhisperConfig.
This commit is contained in:
Adam Babik 2019-01-23 14:32:45 +01:00 committed by GitHub
parent 1a365a5140
commit d20b5dc3b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 109 deletions

View File

@ -1,9 +0,0 @@
#statusd-prune
##Usage
```
cd $STATUS_GO_HOME/cmd/statusd-prune && \
go build && \
./statusd-prune -db WNODE_DB_PATH -upper TIMESTAMP
```

View File

@ -1,70 +0,0 @@
package main
import (
"flag"
"fmt"
"log"
"os"
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/mailserver"
)
var (
dbPath = flag.String("db", "", "Path to wnode database folder")
lowerTimestamp = flag.Int("lower", 0, "Removes messages sent starting from this timestamp")
upperTimestamp = flag.Int("upper", 0, "Removes messages sent up to this timestamp")
)
func missingFlag(f string) {
log.Printf("flag -%s is required", f)
flag.Usage()
os.Exit(1)
}
func validateRange(lower, upper int) error {
if upper <= lower {
return fmt.Errorf("upper value must be greater than lower value")
}
if lower < 0 || upper < 0 {
return fmt.Errorf("upper and lower values must be greater than zero")
}
return nil
}
func init() {
flag.Parse()
if *dbPath == "" {
missingFlag("db")
}
if *upperTimestamp == 0 {
missingFlag("upper")
}
}
func main() {
db, err := db.Open(*dbPath, nil)
if err != nil {
log.Fatal(err)
}
c := mailserver.NewCleanerWithDB(db)
if err = validateRange(*lowerTimestamp, *upperTimestamp); err != nil {
log.Fatal(err)
}
lower := uint32(*lowerTimestamp)
upper := uint32(*upperTimestamp)
n, err := c.Prune(lower, upper)
if err != nil {
log.Fatal(err)
}
log.Printf("removed %d messages.\n", n)
}

View File

@ -1,40 +1,100 @@
package mailserver package mailserver
import ( import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
) )
const batchSize = 1000 const (
dbCleanerBatchSize = 1000
dbCleanerPeriod = time.Hour
)
// dbCleaner removes old messages from a db.
type dbCleaner struct {
sync.RWMutex
// Cleaner removes old messages from a db
type Cleaner struct {
db dbImpl db dbImpl
batchSize int batchSize int
retention time.Duration
period time.Duration
cancel chan struct{}
} }
// NewCleanerWithDB returns a new Cleaner for db // newDBCleaner returns a new cleaner for db.
func NewCleanerWithDB(db dbImpl) *Cleaner { func newDBCleaner(db dbImpl, retention time.Duration) *dbCleaner {
return &Cleaner{ return &dbCleaner{
db: db, db: db,
batchSize: batchSize, retention: retention,
batchSize: dbCleanerBatchSize,
period: dbCleanerPeriod,
} }
} }
// Prune removes messages sent between lower and upper timestamps and returns how many has been removed // Start starts a loop that cleans up old messages.
func (c *Cleaner) Prune(lower, upper uint32) (int, error) { func (c *dbCleaner) Start() {
log.Info("Starting cleaning envelopes", "period", c.period, "retention", c.retention)
cancel := make(chan struct{})
c.Lock()
c.cancel = cancel
c.Unlock()
go c.schedule(c.period, cancel)
}
// Stops stops the cleaning loop.
func (c *dbCleaner) Stop() {
c.Lock()
defer c.Unlock()
if c.cancel == nil {
return
}
close(c.cancel)
c.cancel = nil
}
func (c *dbCleaner) schedule(period time.Duration, cancel <-chan struct{}) {
t := time.NewTicker(period)
defer t.Stop()
for {
select {
case <-t.C:
count, err := c.PruneEntriesOlderThan(time.Now().Add(-c.retention))
if err != nil {
log.Error("failed to prune data", "err", err)
}
log.Info("Prunned some some messages successfully", "count", count)
case <-cancel:
return
}
}
}
// PruneEntriesOlderThan removes messages sent between lower and upper timestamps
// and returns how many have been removed.
func (c *dbCleaner) PruneEntriesOlderThan(t time.Time) (int, error) {
var zero common.Hash var zero common.Hash
kl := NewDBKey(lower, zero) kl := NewDBKey(0, zero)
ku := NewDBKey(upper, zero) ku := NewDBKey(uint32(t.Unix()), zero)
i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil)
defer i.Release() defer i.Release()
return c.prune(i) return c.prune(i)
} }
func (c *Cleaner) prune(i iterator.Iterator) (int, error) { func (c *dbCleaner) prune(i iterator.Iterator) (int, error) {
batch := leveldb.Batch{} batch := leveldb.Batch{}
removed := 0 removed := 0

View File

@ -17,8 +17,8 @@ import (
func TestCleaner(t *testing.T) { func TestCleaner(t *testing.T) {
now := time.Now() now := time.Now()
server := setupTestServer(t) server := setupTestServer(t)
cleaner := NewCleanerWithDB(server.db)
defer server.Close() defer server.Close()
cleaner := newDBCleaner(server.db, time.Hour)
archiveEnvelope(t, now.Add(-10*time.Second), server) archiveEnvelope(t, now.Add(-10*time.Second), server)
archiveEnvelope(t, now.Add(-3*time.Second), server) archiveEnvelope(t, now.Add(-3*time.Second), server)
@ -26,9 +26,30 @@ func TestCleaner(t *testing.T) {
testMessagesCount(t, 3, server) testMessagesCount(t, 3, server)
testPrune(t, now.Add(-5*time.Second), 2, cleaner, server) testPrune(t, now.Add(-5*time.Second), 1, cleaner, server)
testPrune(t, now.Add(-2*time.Second), 1, cleaner, server) testPrune(t, now.Add(-2*time.Second), 1, cleaner, server)
testPrune(t, now, 0, cleaner, server) testPrune(t, now, 1, cleaner, server)
testMessagesCount(t, 0, server)
}
func TestCleanerSchedule(t *testing.T) {
now := time.Now()
server := setupTestServer(t)
defer server.Close()
cleaner := newDBCleaner(server.db, time.Hour)
cleaner.period = time.Millisecond * 10
cleaner.Start()
defer cleaner.Stop()
archiveEnvelope(t, now.Add(-3*time.Hour), server)
archiveEnvelope(t, now.Add(-2*time.Hour), server)
archiveEnvelope(t, now.Add(-1*time.Minute), server)
time.Sleep(time.Millisecond * 50)
testMessagesCount(t, 1, server)
} }
func benchmarkCleanerPrune(b *testing.B, messages int, batchSize int) { func benchmarkCleanerPrune(b *testing.B, messages int, batchSize int) {
@ -38,7 +59,7 @@ func benchmarkCleanerPrune(b *testing.B, messages int, batchSize int) {
server := setupTestServer(t) server := setupTestServer(t)
defer server.Close() defer server.Close()
cleaner := NewCleanerWithDB(server.db) cleaner := newDBCleaner(server.db, time.Hour)
cleaner.batchSize = batchSize cleaner.batchSize = batchSize
for i := 0; i < messages; i++ { for i := 0; i < messages; i++ {
@ -81,13 +102,10 @@ func archiveEnvelope(t *testing.T, sentTime time.Time, server *WMailServer) *whi
return env return env
} }
func testPrune(t *testing.T, u time.Time, expected int, c *Cleaner, s *WMailServer) { func testPrune(t *testing.T, u time.Time, expected int, c *dbCleaner, s *WMailServer) {
upper := uint32(u.Unix()) n, err := c.PruneEntriesOlderThan(u)
_, err := c.Prune(0, upper)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expected, n)
count := countMessages(t, s.db)
require.Equal(t, expected, count)
} }
func testMessagesCount(t *testing.T, expected int, s *WMailServer) { func testMessagesCount(t *testing.T, expected int, s *WMailServer) {

View File

@ -77,6 +77,8 @@ type WMailServer struct {
muRateLimiter sync.RWMutex muRateLimiter sync.RWMutex
rateLimiter *rateLimiter rateLimiter *rateLimiter
cleaner *dbCleaner // removes old envelopes
} }
// Init initializes mailServer. // Init initializes mailServer.
@ -97,7 +99,10 @@ func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) e
if err := s.setupRequestMessageDecryptor(config); err != nil { if err := s.setupRequestMessageDecryptor(config); err != nil {
return err return err
} }
s.setupRateLimiter(time.Duration(config.MailServerRateLimit) * time.Second)
if config.MailServerRateLimit > 0 {
s.setupRateLimiter(time.Duration(config.MailServerRateLimit) * time.Second)
}
// Open database in the last step in order not to init with error // Open database in the last step in order not to init with error
// and leave the database open by accident. // and leave the database open by accident.
@ -107,16 +112,24 @@ func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) e
} }
s.db = database s.db = database
if config.MailServerDataRetention > 0 {
// MailServerDataRetention is a number of days.
s.setupCleaner(time.Duration(config.MailServerDataRetention) * time.Hour * 24)
}
return nil return nil
} }
// setupRateLimiter in case limit is bigger than 0 it will setup an automated // setupRateLimiter in case limit is bigger than 0 it will setup an automated
// limit db cleanup. // limit db cleanup.
func (s *WMailServer) setupRateLimiter(limit time.Duration) { func (s *WMailServer) setupRateLimiter(limit time.Duration) {
if limit > 0 { s.rateLimiter = newRateLimiter(limit)
s.rateLimiter = newRateLimiter(limit) s.rateLimiter.Start()
s.rateLimiter.Start() }
}
func (s *WMailServer) setupCleaner(retention time.Duration) {
s.cleaner = newDBCleaner(s.db, retention)
s.cleaner.Start()
} }
// setupRequestMessageDecryptor setup a Whisper filter to decrypt // setupRequestMessageDecryptor setup a Whisper filter to decrypt
@ -160,6 +173,9 @@ func (s *WMailServer) Close() {
if s.rateLimiter != nil { if s.rateLimiter != nil {
s.rateLimiter.Stop() s.rateLimiter.Stop()
} }
if s.cleaner != nil {
s.cleaner.Stop()
}
} }
func recoverLevelDBPanics(calleMethodName string) { func recoverLevelDBPanics(calleMethodName string) {

View File

@ -70,11 +70,11 @@ type WhisperConfig struct {
// MailServerAsymKey is an hex-encoded asymmetric key to decrypt messages sent to MailServer. // MailServerAsymKey is an hex-encoded asymmetric key to decrypt messages sent to MailServer.
MailServerAsymKey string MailServerAsymKey string
// RateLimit minimum time between queries to mail server per peer // MailServerRateLimit minimum time between queries to mail server per peer.
MailServerRateLimit int MailServerRateLimit int
// MailServerCleanupPeriod time in seconds to wait to run mail server cleanup // MailServerDataRetention is a number of days data should be stored by MailServer.
MailServerCleanupPeriod int MailServerDataRetention int
// TTL time to live for messages, in seconds // TTL time to live for messages, in seconds
TTL int TTL int