Allow multiple db implementations

This commit creates an interface to use with the db so that we can
abstract what kind of db we use, therefore allowing us to chose db based
on config.
This commit is contained in:
Andrea Maria Piana 2019-05-10 12:26:57 +02:00
parent 10fc860a5f
commit 9e89efd859
7 changed files with 236 additions and 191 deletions

View File

@ -4,12 +4,7 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
)
const (
@ -86,40 +81,5 @@ func (c *dbCleaner) schedule(period time.Duration, cancel <-chan struct{}) {
// PruneEntriesOlderThan removes messages sent between lower and upper timestamps
// and returns how many have been removed.
func (c *dbCleaner) PruneEntriesOlderThan(t time.Time) (int, error) {
var zero common.Hash
var emptyTopic whisper.TopicType
kl := NewDBKey(0, emptyTopic, zero)
ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero)
i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil)
defer i.Release()
return c.prune(i)
}
func (c *dbCleaner) prune(i iterator.Iterator) (int, error) {
batch := leveldb.Batch{}
removed := 0
for i.Next() {
batch.Delete(i.Key())
if batch.Len() == c.batchSize {
if err := c.db.Write(&batch, nil); err != nil {
return removed, err
}
removed = removed + batch.Len()
batch.Reset()
}
}
if batch.Len() > 0 {
if err := c.db.Write(&batch, nil); err != nil {
return removed, err
}
removed = removed + batch.Len()
}
return removed, nil
return c.db.Prune(t, c.batchSize)
}

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
func TestCleaner(t *testing.T) {
@ -89,7 +88,9 @@ func BenchmarkCleanerPruneM100_000_B100(b *testing.B) {
func setupTestServer(t *testing.T) *WMailServer {
var s WMailServer
s.db, _ = leveldb.Open(storage.NewMemStorage(), nil)
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
s.db = &LevelDBImpl{ldb: db}
s.pow = powRequirement
return &s
}
@ -123,7 +124,13 @@ func countMessages(t *testing.T, db dbImpl) int {
now := time.Now()
kl := NewDBKey(uint32(0), emptyTopic, zero)
ku := NewDBKey(uint32(now.Unix()), emptyTopic, zero)
i := db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
query := CursorQuery{
start: kl.raw,
end: ku.raw,
}
i := db.BuildIterator(query)
defer i.Release()
for i.Next() {

View File

@ -29,13 +29,8 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/params"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
const (
@ -58,20 +53,6 @@ const (
processRequestTimeout = time.Minute
)
// dbImpl is an interface introduced to be able to test some unexpected
// panics from leveldb that are difficult to reproduce.
// normally the db implementation is leveldb.DB, but in TestMailServerDBPanicSuite
// we use panicDB to test panics from the db.
// more info about the panic errors:
// https://github.com/syndtr/goleveldb/issues/224
type dbImpl interface {
Close() error
Write(*leveldb.Batch, *opt.WriteOptions) error
Put([]byte, []byte, *opt.WriteOptions) error
Get([]byte, *opt.ReadOptions) ([]byte, error)
NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator
}
// WMailServer whisper mailserver.
type WMailServer struct {
db dbImpl
@ -111,7 +92,7 @@ func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) e
// Open database in the last step in order not to init with error
// and leave the database open by accident.
database, err := db.Open(config.DataDir, nil)
database, err := NewLevelDBImpl(config)
if err != nil {
return fmt.Errorf("open DB: %s", err)
}
@ -183,39 +164,15 @@ func (s *WMailServer) Close() {
}
}
func recoverLevelDBPanics(calleMethodName string) {
// Recover from possible goleveldb panics
if r := recover(); r != nil {
if errString, ok := r.(string); ok {
log.Error(fmt.Sprintf("recovered from panic in %s: %s", calleMethodName, errString))
}
}
}
// Archive a whisper envelope.
func (s *WMailServer) Archive(env *whisper.Envelope) {
defer recoverLevelDBPanics("Archive")
log.Debug("Archiving envelope", "hash", env.Hash().Hex())
key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash())
rawEnvelope, err := rlp.EncodeToBytes(env)
if err != nil {
log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
archivedErrorsCounter.Inc(1)
} else {
if err = s.db.Put(key.Bytes(), rawEnvelope, nil); err != nil {
log.Error(fmt.Sprintf("Writing to DB failed: %s", err))
archivedErrorsCounter.Inc(1)
}
archivedMeter.Mark(1)
archivedSizeMeter.Mark(int64(whisper.EnvelopeHeaderLength + len(env.Data)))
if err := s.db.SaveEnvelope(env); err != nil {
log.Error("Could not save envelope", "hash", env.Hash())
}
}
// DeliverMail sends mail to specified whisper peer.
func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) {
defer recoverLevelDBPanics("DeliverMail")
startMethod := time.Now()
defer deliverMailTimer.UpdateSince(startMethod)
@ -296,7 +253,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
requestsBatchedCounter.Inc(1)
}
iter := s.createIterator(lower, upper, cursor)
iter := s.createIterator(lower, upper, cursor, bloom, limit)
defer iter.Release()
bundles := make(chan []rlp.RawValue, 5)
@ -375,8 +332,6 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailRequest) error {
log.Info("Started syncing envelopes", "peer", peerIDString(peer), "req", request)
defer recoverLevelDBPanics("SyncMail")
requestID := fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Intn(1000))
syncRequestsMeter.Mark(1)
@ -392,7 +347,7 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque
return fmt.Errorf("request is invalid: %v", err)
}
iter := s.createIterator(request.Lower, request.Upper, request.Cursor)
iter := s.createIterator(request.Lower, request.Upper, request.Cursor, nil, 0)
defer iter.Release()
bundles := make(chan []rlp.RawValue, 5)
@ -473,7 +428,7 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool {
return true
}
func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterator.Iterator {
func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte, bloom []byte, limit uint32) Iterator {
var (
emptyHash common.Hash
emptyTopic whisper.TopicType
@ -483,18 +438,20 @@ func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterato
ku = NewDBKey(upper+1, emptyTopic, emptyHash)
kl = NewDBKey(lower, emptyTopic, emptyHash)
i := s.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil)
// seek to the end as we want to return envelopes in a descending order
if len(cursor) == CursorLength {
i.Seek(cursor)
query := CursorQuery{
start: kl.Bytes(),
end: ku.Bytes(),
cursor: cursor,
bloom: bloom,
limit: limit,
}
return i
return s.db.BuildIterator(query)
}
// processRequestInBundles processes envelopes using an iterator and passes them
// to the output channel in bundles.
func (s *WMailServer) processRequestInBundles(
iter iterator.Iterator,
iter Iterator,
bloom []byte,
limit int,
timeout time.Duration,
@ -524,31 +481,22 @@ func (s *WMailServer) processRequestInBundles(
// current envelope, and leave if we hit the limit
for iter.Next() {
rawValue := make([]byte, len(iter.Value()))
copy(rawValue, iter.Value())
rawValue, err := iter.GetEnvelope(bloom)
key := &DBKey{
raw: iter.Key(),
}
var envelopeBloom []byte
// Old key, we extract the topic from the envelope
if len(key.Bytes()) != DBKeyLength {
var err error
envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue)
if err != nil {
log.Error("[mailserver:processRequestInBundles] failed to decode RLP",
log.Error("Failed to get envelope from iterator",
"err", err,
"requestID", requestID)
continue
}
} else {
envelopeBloom = whisper.TopicToBloom(key.Topic())
}
if !whisper.BloomFilterMatch(bloom, envelopeBloom) {
if rawValue == nil {
continue
}
key := iter.DBKey()
lastEnvelopeHash = key.EnvelopeHash()
processedEnvelopes++
envelopeSize := uint32(len(rawValue))

View File

@ -0,0 +1,38 @@
package mailserver
import (
whisper "github.com/status-im/whisper/whisperv6"
"time"
)
// dbImpl is an interface to abstract interactions with the db so that the mailserver
// is agnostic to the underlaying technology used
type dbImpl interface {
Close() error
// SaveEnvelope stores an envelope
SaveEnvelope(*whisper.Envelope) error
// GetEnvelope returns an rlp encoded envelope from the datastore
GetEnvelope(*DBKey) ([]byte, error)
// Prune removes envelopes older than time
Prune(time.Time, int) (int, error)
// BuildIterator returns an iterator over envelopes
BuildIterator(query CursorQuery) Iterator
}
type Iterator interface {
Next() bool
Prev() bool
DBKey() *DBKey
Value() []byte
Release()
Error() error
GetEnvelope(bloom []byte) ([]byte, error)
}
type CursorQuery struct {
start []byte
end []byte
cursor []byte
limit uint32
bloom []byte
}

View File

@ -0,0 +1,157 @@
package mailserver
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/params"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
"time"
)
type LevelDBImpl struct {
// We can't embed as there are some state problems with go-routines
ldb *leveldb.DB
}
type LevelDBIterator struct {
iterator.Iterator
}
func (i *LevelDBIterator) DBKey() *DBKey {
return &DBKey{
raw: i.Key(),
}
}
func (i *LevelDBIterator) GetEnvelope(bloom []byte) ([]byte, error) {
var envelopeBloom []byte
rawValue := make([]byte, len(i.Value()))
copy(rawValue, i.Value())
key := i.DBKey()
if len(key.Bytes()) != DBKeyLength {
var err error
envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue)
if err != nil {
return nil, err
}
} else {
envelopeBloom = whisper.TopicToBloom(key.Topic())
}
if !whisper.BloomFilterMatch(bloom, envelopeBloom) {
return nil, nil
}
return rawValue, nil
}
func NewLevelDBImpl(config *params.WhisperConfig) (*LevelDBImpl, error) {
// Open opens an existing leveldb database
db, err := leveldb.OpenFile(config.DataDir, nil)
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
log.Info("database is corrupted trying to recover", "path", config.DataDir)
db, err = leveldb.RecoverFile(config.DataDir, nil)
}
return &LevelDBImpl{ldb: db}, err
}
// Build iterator returns an iterator given a start/end and a cursor
func (db *LevelDBImpl) BuildIterator(query CursorQuery) Iterator {
defer recoverLevelDBPanics("BuildIterator")
i := db.ldb.NewIterator(&util.Range{Start: query.start, Limit: query.end}, nil)
// seek to the end as we want to return envelopes in a descending order
if len(query.cursor) == CursorLength {
i.Seek(query.cursor)
}
return &LevelDBIterator{i}
}
// GetEnvelope get an envelope by its key
func (db *LevelDBImpl) GetEnvelope(key *DBKey) ([]byte, error) {
defer recoverLevelDBPanics("GetEnvelope")
return db.ldb.Get(key.Bytes(), nil)
}
// Prune removes envelopes older than time
func (db *LevelDBImpl) Prune(t time.Time, batchSize int) (int, error) {
defer recoverLevelDBPanics("Prune")
var zero common.Hash
var emptyTopic whisper.TopicType
kl := NewDBKey(0, emptyTopic, zero)
ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero)
query := CursorQuery{
start: kl.Bytes(),
end: ku.Bytes(),
}
i := db.BuildIterator(query)
defer i.Release()
batch := leveldb.Batch{}
removed := 0
for i.Next() {
batch.Delete(i.DBKey().Bytes())
if batch.Len() == batchSize {
if err := db.ldb.Write(&batch, nil); err != nil {
return removed, err
}
removed = removed + batch.Len()
batch.Reset()
}
}
if batch.Len() > 0 {
if err := db.ldb.Write(&batch, nil); err != nil {
return removed, err
}
removed = removed + batch.Len()
}
return removed, nil
}
// SaveEnvelope stores an envelope in leveldb and increments the metrics
func (db *LevelDBImpl) SaveEnvelope(env *whisper.Envelope) error {
defer recoverLevelDBPanics("SaveEnvelope")
key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash())
rawEnvelope, err := rlp.EncodeToBytes(env)
if err != nil {
log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
archivedErrorsCounter.Inc(1)
return err
}
if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil {
log.Error(fmt.Sprintf("Writing to DB failed: %s", err))
archivedErrorsCounter.Inc(1)
}
archivedMeter.Mark(1)
archivedSizeMeter.Mark(int64(whisper.EnvelopeHeaderLength + len(env.Data)))
return err
}
func (db *LevelDBImpl) Close() error {
return db.ldb.Close()
}
func recoverLevelDBPanics(calleMethodName string) {
// Recover from possible goleveldb panics
if r := recover(); r != nil {
if errString, ok := r.(string); ok {
log.Error(fmt.Sprintf("recovered from panic in %s: %s", calleMethodName, errString))
}
}
}

View File

@ -1,64 +0,0 @@
package mailserver
import (
"testing"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
type panicDB struct{}
func (db *panicDB) Close() error {
panic("panicDB panic on Close")
}
func (db *panicDB) Write(b *leveldb.Batch, opts *opt.WriteOptions) error {
panic("panicDB panic on Write")
}
func (db *panicDB) Put(k []byte, v []byte, opts *opt.WriteOptions) error {
panic("panicDB panic on Put")
}
func (db *panicDB) Get(k []byte, opts *opt.ReadOptions) ([]byte, error) {
panic("panicDB panic on Get")
}
func (db *panicDB) NewIterator(r *util.Range, opts *opt.ReadOptions) iterator.Iterator {
panic("panicDB panic on NewIterator")
}
func TestMailServerDBPanicSuite(t *testing.T) {
suite.Run(t, new(MailServerDBPanicSuite))
}
type MailServerDBPanicSuite struct {
suite.Suite
server *WMailServer
}
func (s *MailServerDBPanicSuite) SetupTest() {
s.server = &WMailServer{}
s.server.db = &panicDB{}
}
func (s *MailServerDBPanicSuite) TestArchive() {
defer s.testPanicRecover("Archive")
s.server.Archive(&whisper.Envelope{})
}
func (s *MailServerDBPanicSuite) TestDeliverMail() {
defer s.testPanicRecover("DeliverMail")
s.server.DeliverMail(&whisper.Peer{}, &whisper.Envelope{})
}
func (s *MailServerDBPanicSuite) testPanicRecover(method string) {
if r := recover(); r != nil {
s.Failf("error recovering panic", "expected recover to return nil, got: %+v", r)
}
}

View File

@ -28,7 +28,6 @@ import (
"time"
"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
@ -267,7 +266,7 @@ func (s *MailserverSuite) TestArchive() {
s.server.Archive(env)
key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash())
archivedEnvelope, err := s.server.db.Get(key.Bytes(), nil)
archivedEnvelope, err := s.server.db.GetEnvelope(key)
s.NoError(err)
s.Equal(rawEnvelope, archivedEnvelope)
@ -522,7 +521,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
Name string
Timeout time.Duration
Verify func(
iterator.Iterator,
Iterator,
time.Duration, // processRequestInBundles timeout
chan []rlp.RawValue,
)
@ -531,7 +530,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
Name: "finish processing using `done` channel",
Timeout: time.Second * 5,
Verify: func(
iter iterator.Iterator,
iter Iterator,
timeout time.Duration,
bundles chan []rlp.RawValue,
) {
@ -555,7 +554,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
Name: "finish processing due to timeout",
Timeout: time.Second,
Verify: func(
iter iterator.Iterator,
iter Iterator,
timeout time.Duration,
bundles chan []rlp.RawValue,
) {
@ -578,7 +577,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
for _, tc := range testCases {
s.T().Run(tc.Name, func(t *testing.T) {
iter := s.server.createIterator(lower, upper, cursor)
iter := s.server.createIterator(lower, upper, cursor, nil, 0)
defer iter.Release()
// Nothing reads from this unbuffered channel which simulates a situation
@ -777,7 +776,7 @@ func generateEnvelope(sentTime time.Time) (*whisper.Envelope, error) {
func processRequestAndCollectHashes(
server *WMailServer, lower, upper uint32, cursor []byte, bloom []byte, limit int,
) ([]common.Hash, []byte, common.Hash) {
iter := server.createIterator(lower, upper, cursor)
iter := server.createIterator(lower, upper, cursor, nil, 0)
defer iter.Release()
bundles := make(chan []rlp.RawValue, 10)
done := make(chan struct{})