add cmd/statusd-prune (#957)

add mailserver cleaner

use memstorage for leveldb in tests

avoid write if batch size is 0

add comments

add cmd/statusd-prune

rmeove batch size var in prune method

validate range values

pass only flag name to missingFlag

refactor Cleaner.prune method

update batch not to be a pointer

removed extra batch counter increment

don't increment counter if batch returns errors

add README
This commit is contained in:
Andrea Franz 2018-05-17 19:24:00 +02:00 committed by GitHub
parent 4b17ea1ced
commit 4317b8f687
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 264 additions and 4 deletions

2
Gopkg.lock generated
View File

@ -407,6 +407,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "a84fc7d6cd7878f73ea281695f71514f7dc5b769dc1d1cbe3076b41068ca8c3d"
inputs-digest = "ae9e87f81c4bfcd1c190314d52248c8851a58936c149947a9e1bbad8b53a542c"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -0,0 +1,9 @@
#statusd-prune
##Usage
```
cd $STATUS_GO_HOME/cmd/statusd-prune && \
go build && \
./statusd-prune -db WNODE_DB_PATH -upper TIMESTAMP
```

70
cmd/statusd-prune/main.go Normal file
View File

@ -0,0 +1,70 @@
package main
import (
"flag"
"fmt"
"log"
"os"
"github.com/status-im/status-go/mailserver"
"github.com/syndtr/goleveldb/leveldb"
)
var (
dbPath = flag.String("db", "", "Path to wnode database folder")
lowerTimestamp = flag.Int("lower", 0, "Removes messages sent starting from this timestamp")
upperTimestamp = flag.Int("upper", 0, "Removes messages sent up to this timestamp")
)
func missingFlag(f string) {
log.Printf("flag -%s is required", f)
flag.Usage()
os.Exit(1)
}
func validateRange(lower, upper int) error {
if upper <= lower {
return fmt.Errorf("upper value must be greater than lower value")
}
if lower < 0 || upper < 0 {
return fmt.Errorf("upper and lower values must be greater than zero")
}
return nil
}
func init() {
flag.Parse()
if *dbPath == "" {
missingFlag("db")
}
if *upperTimestamp == 0 {
missingFlag("upper")
}
}
func main() {
db, err := leveldb.OpenFile(*dbPath, nil)
if err != nil {
log.Fatal(err)
}
c := mailserver.NewCleanerWithDB(db)
if err = validateRange(*lowerTimestamp, *upperTimestamp); err != nil {
log.Fatal(err)
}
lower := uint32(*lowerTimestamp)
upper := uint32(*upperTimestamp)
n, err := c.Prune(lower, upper)
if err != nil {
log.Fatal(err)
}
log.Printf("removed %d messages.\n", n)
}

63
mailserver/cleaner.go Normal file
View File

@ -0,0 +1,63 @@
package mailserver
import (
"github.com/ethereum/go-ethereum/common"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
)
const batchSize = 1000
// Cleaner removes old messages from a db
type Cleaner struct {
db *leveldb.DB
batchSize int
}
// NewCleanerWithDB returns a new Cleaner for db
func NewCleanerWithDB(db *leveldb.DB) *Cleaner {
return &Cleaner{
db: db,
batchSize: batchSize,
}
}
// Prune removes messages sent between lower and upper timestamps and returns how many has been removed
func (c *Cleaner) Prune(lower, upper uint32) (int, error) {
var zero common.Hash
kl := NewDbKey(lower, zero)
ku := NewDbKey(upper, zero)
i := c.db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
defer i.Release()
return c.prune(i)
}
func (c *Cleaner) prune(i iterator.Iterator) (int, error) {
batch := leveldb.Batch{}
removed := 0
for i.Next() {
batch.Delete(i.Key())
if batch.Len() == c.batchSize {
if err := c.db.Write(&batch, nil); err != nil {
return removed, err
}
removed = removed + batch.Len()
batch.Reset()
}
}
if batch.Len() > 0 {
if err := c.db.Write(&batch, nil); err != nil {
return removed, err
}
removed = removed + batch.Len()
}
return removed, nil
}

118
mailserver/cleaner_test.go Normal file
View File

@ -0,0 +1,118 @@
package mailserver
import (
"fmt"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
func TestCleaner(t *testing.T) {
now := time.Now()
server := setupTestServer(t)
cleaner := NewCleanerWithDB(server.db)
defer server.Close()
archiveEnvelope(t, now.Add(-10*time.Second), server)
archiveEnvelope(t, now.Add(-3*time.Second), server)
archiveEnvelope(t, now.Add(-1*time.Second), server)
testMessagesCount(t, 3, server)
testPrune(t, now.Add(-5*time.Second), 2, cleaner, server)
testPrune(t, now.Add(-2*time.Second), 1, cleaner, server)
testPrune(t, now, 0, cleaner, server)
}
func benchmarkCleanerPrune(b *testing.B, messages int, batchSize int) {
t := &testing.T{}
now := time.Now()
sentTime := now.Add(-10 * time.Second)
server := setupTestServer(t)
defer server.Close()
cleaner := NewCleanerWithDB(server.db)
cleaner.batchSize = batchSize
for i := 0; i < messages; i++ {
archiveEnvelope(t, sentTime, server)
}
for i := 0; i < b.N; i++ {
testPrune(t, now, 0, cleaner, server)
}
}
func BenchmarkCleanerPruneM100_000_B100_000(b *testing.B) {
benchmarkCleanerPrune(b, 100000, 100000)
}
func BenchmarkCleanerPruneM100_000_B10_000(b *testing.B) {
benchmarkCleanerPrune(b, 100000, 10000)
}
func BenchmarkCleanerPruneM100_000_B1000(b *testing.B) {
benchmarkCleanerPrune(b, 100000, 1000)
}
func BenchmarkCleanerPruneM100_000_B100(b *testing.B) {
benchmarkCleanerPrune(b, 100000, 100)
}
func setupTestServer(t *testing.T) *WMailServer {
var s WMailServer
s.db, _ = leveldb.Open(storage.NewMemStorage(), nil)
s.pow = powRequirement
return &s
}
func archiveEnvelope(t *testing.T, sentTime time.Time, server *WMailServer) *whisper.Envelope {
env := generateEnvelope(t, sentTime)
server.Archive(env)
return env
}
func testPrune(t *testing.T, u time.Time, expected int, c *Cleaner, s *WMailServer) {
upper := uint32(u.Unix())
_, err := c.Prune(0, upper)
assert(err == nil, "", t)
count := countMessages(t, s.db)
assert(count == expected, fmt.Sprintf("expected %d message, got: %d", expected, count), t)
}
func testMessagesCount(t *testing.T, expected int, s *WMailServer) {
count := countMessages(t, s.db)
assert(count == expected, fmt.Sprintf("expected %d message, got: %d", expected, count), t)
}
func countMessages(t *testing.T, db *leveldb.DB) int {
var (
count int
zero common.Hash
)
now := time.Now()
kl := NewDbKey(uint32(0), zero)
ku := NewDbKey(uint32(now.Unix()), zero)
i := db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
defer i.Release()
for i.Next() {
var env whisper.Envelope
err := rlp.DecodeBytes(i.Value(), &env)
if err != nil {
t.Fatal(err)
}
count++
}
return count
}

View File

@ -66,7 +66,7 @@ func TestMailServer(t *testing.T) {
setupServer(t, &server)
defer server.Close()
env := generateEnvelope(t)
env := generateEnvelope(t, time.Now())
server.Archive(env)
deliverTest(t, &server, env)
}
@ -93,7 +93,7 @@ func TestRemoveExpiredRateLimits(t *testing.T) {
assert(ok, "Non expired peer should exist, but it doesn't", t)
}
func generateEnvelope(t *testing.T) *whisper.Envelope {
func generateEnvelope(t *testing.T, now time.Time) *whisper.Envelope {
h := crypto.Keccak256Hash([]byte("test sample data"))
params := &whisper.MessageParams{
KeySym: h[:],
@ -107,7 +107,7 @@ func generateEnvelope(t *testing.T) *whisper.Envelope {
if err != nil {
t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
}
env, err := msg.Wrap(params, time.Now())
env, err := msg.Wrap(params, now)
if err != nil {
t.Fatalf("failed to wrap with seed %d: %s.", seed, err)
}