From 9e89efd8595582e4e1e340168f6fe2024d32a084 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Fri, 10 May 2019 12:26:57 +0200 Subject: [PATCH] Allow multiple db implementations This commit creates an interface to use with the db so that we can abstract what kind of db we use, therefore allowing us to chose db based on config. --- mailserver/cleaner.go | 42 +------ mailserver/cleaner_test.go | 13 +- mailserver/mailserver.go | 100 ++++------------ mailserver/mailserver_db.go | 38 ++++++ mailserver/mailserver_db_leveldb.go | 157 +++++++++++++++++++++++++ mailserver/mailserver_db_panic_test.go | 64 ---------- mailserver/mailserver_test.go | 13 +- 7 files changed, 236 insertions(+), 191 deletions(-) create mode 100644 mailserver/mailserver_db.go create mode 100644 mailserver/mailserver_db_leveldb.go delete mode 100644 mailserver/mailserver_db_panic_test.go diff --git a/mailserver/cleaner.go b/mailserver/cleaner.go index f3c36f06a..761885cf7 100644 --- a/mailserver/cleaner.go +++ b/mailserver/cleaner.go @@ -4,12 +4,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" - whisper "github.com/status-im/whisper/whisperv6" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/util" ) const ( @@ -86,40 +81,5 @@ func (c *dbCleaner) schedule(period time.Duration, cancel <-chan struct{}) { // PruneEntriesOlderThan removes messages sent between lower and upper timestamps // and returns how many have been removed. func (c *dbCleaner) PruneEntriesOlderThan(t time.Time) (int, error) { - var zero common.Hash - var emptyTopic whisper.TopicType - kl := NewDBKey(0, emptyTopic, zero) - ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero) - i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) - defer i.Release() - - return c.prune(i) -} - -func (c *dbCleaner) prune(i iterator.Iterator) (int, error) { - batch := leveldb.Batch{} - removed := 0 - - for i.Next() { - batch.Delete(i.Key()) - - if batch.Len() == c.batchSize { - if err := c.db.Write(&batch, nil); err != nil { - return removed, err - } - - removed = removed + batch.Len() - batch.Reset() - } - } - - if batch.Len() > 0 { - if err := c.db.Write(&batch, nil); err != nil { - return removed, err - } - - removed = removed + batch.Len() - } - - return removed, nil + return c.db.Prune(t, c.batchSize) } diff --git a/mailserver/cleaner_test.go b/mailserver/cleaner_test.go index f90057803..e8a21e65c 100644 --- a/mailserver/cleaner_test.go +++ b/mailserver/cleaner_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/storage" - "github.com/syndtr/goleveldb/leveldb/util" ) func TestCleaner(t *testing.T) { @@ -89,7 +88,9 @@ func BenchmarkCleanerPruneM100_000_B100(b *testing.B) { func setupTestServer(t *testing.T) *WMailServer { var s WMailServer - s.db, _ = leveldb.Open(storage.NewMemStorage(), nil) + db, _ := leveldb.Open(storage.NewMemStorage(), nil) + + s.db = &LevelDBImpl{ldb: db} s.pow = powRequirement return &s } @@ -123,7 +124,13 @@ func countMessages(t *testing.T, db dbImpl) int { now := time.Now() kl := NewDBKey(uint32(0), emptyTopic, zero) ku := NewDBKey(uint32(now.Unix()), emptyTopic, zero) - i := db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil) + + query := CursorQuery{ + start: kl.raw, + end: ku.raw, + } + + i := db.BuildIterator(query) defer i.Release() for i.Next() { diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 2b42b30de..caf0f1c5d 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -29,13 +29,8 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" - "github.com/status-im/status-go/db" "github.com/status-im/status-go/params" whisper "github.com/status-im/whisper/whisperv6" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" ) const ( @@ -58,20 +53,6 @@ const ( processRequestTimeout = time.Minute ) -// dbImpl is an interface introduced to be able to test some unexpected -// panics from leveldb that are difficult to reproduce. -// normally the db implementation is leveldb.DB, but in TestMailServerDBPanicSuite -// we use panicDB to test panics from the db. -// more info about the panic errors: -// https://github.com/syndtr/goleveldb/issues/224 -type dbImpl interface { - Close() error - Write(*leveldb.Batch, *opt.WriteOptions) error - Put([]byte, []byte, *opt.WriteOptions) error - Get([]byte, *opt.ReadOptions) ([]byte, error) - NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator -} - // WMailServer whisper mailserver. type WMailServer struct { db dbImpl @@ -111,7 +92,7 @@ func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) e // Open database in the last step in order not to init with error // and leave the database open by accident. - database, err := db.Open(config.DataDir, nil) + database, err := NewLevelDBImpl(config) if err != nil { return fmt.Errorf("open DB: %s", err) } @@ -183,39 +164,15 @@ func (s *WMailServer) Close() { } } -func recoverLevelDBPanics(calleMethodName string) { - // Recover from possible goleveldb panics - if r := recover(); r != nil { - if errString, ok := r.(string); ok { - log.Error(fmt.Sprintf("recovered from panic in %s: %s", calleMethodName, errString)) - } - } -} - // Archive a whisper envelope. func (s *WMailServer) Archive(env *whisper.Envelope) { - defer recoverLevelDBPanics("Archive") - - log.Debug("Archiving envelope", "hash", env.Hash().Hex()) - - key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) - rawEnvelope, err := rlp.EncodeToBytes(env) - if err != nil { - log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) - archivedErrorsCounter.Inc(1) - } else { - if err = s.db.Put(key.Bytes(), rawEnvelope, nil); err != nil { - log.Error(fmt.Sprintf("Writing to DB failed: %s", err)) - archivedErrorsCounter.Inc(1) - } - archivedMeter.Mark(1) - archivedSizeMeter.Mark(int64(whisper.EnvelopeHeaderLength + len(env.Data))) + if err := s.db.SaveEnvelope(env); err != nil { + log.Error("Could not save envelope", "hash", env.Hash()) } } // DeliverMail sends mail to specified whisper peer. func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) { - defer recoverLevelDBPanics("DeliverMail") startMethod := time.Now() defer deliverMailTimer.UpdateSince(startMethod) @@ -296,7 +253,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) requestsBatchedCounter.Inc(1) } - iter := s.createIterator(lower, upper, cursor) + iter := s.createIterator(lower, upper, cursor, bloom, limit) defer iter.Release() bundles := make(chan []rlp.RawValue, 5) @@ -375,8 +332,6 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailRequest) error { log.Info("Started syncing envelopes", "peer", peerIDString(peer), "req", request) - defer recoverLevelDBPanics("SyncMail") - requestID := fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Intn(1000)) syncRequestsMeter.Mark(1) @@ -392,7 +347,7 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque return fmt.Errorf("request is invalid: %v", err) } - iter := s.createIterator(request.Lower, request.Upper, request.Cursor) + iter := s.createIterator(request.Lower, request.Upper, request.Cursor, nil, 0) defer iter.Release() bundles := make(chan []rlp.RawValue, 5) @@ -473,7 +428,7 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool { return true } -func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterator.Iterator { +func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte, bloom []byte, limit uint32) Iterator { var ( emptyHash common.Hash emptyTopic whisper.TopicType @@ -483,18 +438,20 @@ func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterato ku = NewDBKey(upper+1, emptyTopic, emptyHash) kl = NewDBKey(lower, emptyTopic, emptyHash) - i := s.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) - // seek to the end as we want to return envelopes in a descending order - if len(cursor) == CursorLength { - i.Seek(cursor) + query := CursorQuery{ + start: kl.Bytes(), + end: ku.Bytes(), + cursor: cursor, + bloom: bloom, + limit: limit, } - return i + return s.db.BuildIterator(query) } // processRequestInBundles processes envelopes using an iterator and passes them // to the output channel in bundles. func (s *WMailServer) processRequestInBundles( - iter iterator.Iterator, + iter Iterator, bloom []byte, limit int, timeout time.Duration, @@ -524,31 +481,22 @@ func (s *WMailServer) processRequestInBundles( // current envelope, and leave if we hit the limit for iter.Next() { - rawValue := make([]byte, len(iter.Value())) - copy(rawValue, iter.Value()) + rawValue, err := iter.GetEnvelope(bloom) + + if err != nil { + log.Error("Failed to get envelope from iterator", + "err", err, + "requestID", requestID) + continue - key := &DBKey{ - raw: iter.Key(), } - var envelopeBloom []byte - // Old key, we extract the topic from the envelope - if len(key.Bytes()) != DBKeyLength { - var err error - envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue) - if err != nil { - log.Error("[mailserver:processRequestInBundles] failed to decode RLP", - "err", err, - "requestID", requestID) - continue - } - } else { - envelopeBloom = whisper.TopicToBloom(key.Topic()) - } - if !whisper.BloomFilterMatch(bloom, envelopeBloom) { + if rawValue == nil { continue } + key := iter.DBKey() + lastEnvelopeHash = key.EnvelopeHash() processedEnvelopes++ envelopeSize := uint32(len(rawValue)) diff --git a/mailserver/mailserver_db.go b/mailserver/mailserver_db.go new file mode 100644 index 000000000..414c5e2c8 --- /dev/null +++ b/mailserver/mailserver_db.go @@ -0,0 +1,38 @@ +package mailserver + +import ( + whisper "github.com/status-im/whisper/whisperv6" + "time" +) + +// dbImpl is an interface to abstract interactions with the db so that the mailserver +// is agnostic to the underlaying technology used +type dbImpl interface { + Close() error + // SaveEnvelope stores an envelope + SaveEnvelope(*whisper.Envelope) error + // GetEnvelope returns an rlp encoded envelope from the datastore + GetEnvelope(*DBKey) ([]byte, error) + // Prune removes envelopes older than time + Prune(time.Time, int) (int, error) + // BuildIterator returns an iterator over envelopes + BuildIterator(query CursorQuery) Iterator +} + +type Iterator interface { + Next() bool + Prev() bool + DBKey() *DBKey + Value() []byte + Release() + Error() error + GetEnvelope(bloom []byte) ([]byte, error) +} + +type CursorQuery struct { + start []byte + end []byte + cursor []byte + limit uint32 + bloom []byte +} diff --git a/mailserver/mailserver_db_leveldb.go b/mailserver/mailserver_db_leveldb.go new file mode 100644 index 000000000..46c95e065 --- /dev/null +++ b/mailserver/mailserver_db_leveldb.go @@ -0,0 +1,157 @@ +package mailserver + +import ( + "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/status-im/status-go/params" + whisper "github.com/status-im/whisper/whisperv6" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/util" + "time" +) + +type LevelDBImpl struct { + // We can't embed as there are some state problems with go-routines + ldb *leveldb.DB +} + +type LevelDBIterator struct { + iterator.Iterator +} + +func (i *LevelDBIterator) DBKey() *DBKey { + return &DBKey{ + raw: i.Key(), + } +} + +func (i *LevelDBIterator) GetEnvelope(bloom []byte) ([]byte, error) { + var envelopeBloom []byte + rawValue := make([]byte, len(i.Value())) + copy(rawValue, i.Value()) + + key := i.DBKey() + if len(key.Bytes()) != DBKeyLength { + var err error + envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue) + if err != nil { + return nil, err + } + } else { + envelopeBloom = whisper.TopicToBloom(key.Topic()) + } + if !whisper.BloomFilterMatch(bloom, envelopeBloom) { + return nil, nil + } + return rawValue, nil + +} + +func NewLevelDBImpl(config *params.WhisperConfig) (*LevelDBImpl, error) { + // Open opens an existing leveldb database + db, err := leveldb.OpenFile(config.DataDir, nil) + if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted { + log.Info("database is corrupted trying to recover", "path", config.DataDir) + db, err = leveldb.RecoverFile(config.DataDir, nil) + } + return &LevelDBImpl{ldb: db}, err +} + +// Build iterator returns an iterator given a start/end and a cursor +func (db *LevelDBImpl) BuildIterator(query CursorQuery) Iterator { + defer recoverLevelDBPanics("BuildIterator") + + i := db.ldb.NewIterator(&util.Range{Start: query.start, Limit: query.end}, nil) + // seek to the end as we want to return envelopes in a descending order + if len(query.cursor) == CursorLength { + i.Seek(query.cursor) + } + return &LevelDBIterator{i} +} + +// GetEnvelope get an envelope by its key +func (db *LevelDBImpl) GetEnvelope(key *DBKey) ([]byte, error) { + defer recoverLevelDBPanics("GetEnvelope") + + return db.ldb.Get(key.Bytes(), nil) +} + +// Prune removes envelopes older than time +func (db *LevelDBImpl) Prune(t time.Time, batchSize int) (int, error) { + defer recoverLevelDBPanics("Prune") + + var zero common.Hash + var emptyTopic whisper.TopicType + kl := NewDBKey(0, emptyTopic, zero) + ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero) + query := CursorQuery{ + start: kl.Bytes(), + end: ku.Bytes(), + } + i := db.BuildIterator(query) + defer i.Release() + + batch := leveldb.Batch{} + removed := 0 + + for i.Next() { + batch.Delete(i.DBKey().Bytes()) + + if batch.Len() == batchSize { + if err := db.ldb.Write(&batch, nil); err != nil { + return removed, err + } + + removed = removed + batch.Len() + batch.Reset() + } + } + + if batch.Len() > 0 { + if err := db.ldb.Write(&batch, nil); err != nil { + return removed, err + } + + removed = removed + batch.Len() + } + + return removed, nil +} + +// SaveEnvelope stores an envelope in leveldb and increments the metrics +func (db *LevelDBImpl) SaveEnvelope(env *whisper.Envelope) error { + defer recoverLevelDBPanics("SaveEnvelope") + + key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) + rawEnvelope, err := rlp.EncodeToBytes(env) + if err != nil { + log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) + archivedErrorsCounter.Inc(1) + return err + } + + if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil { + log.Error(fmt.Sprintf("Writing to DB failed: %s", err)) + archivedErrorsCounter.Inc(1) + } + archivedMeter.Mark(1) + archivedSizeMeter.Mark(int64(whisper.EnvelopeHeaderLength + len(env.Data))) + return err +} + +func (db *LevelDBImpl) Close() error { + return db.ldb.Close() +} + +func recoverLevelDBPanics(calleMethodName string) { + // Recover from possible goleveldb panics + if r := recover(); r != nil { + if errString, ok := r.(string); ok { + log.Error(fmt.Sprintf("recovered from panic in %s: %s", calleMethodName, errString)) + } + } +} diff --git a/mailserver/mailserver_db_panic_test.go b/mailserver/mailserver_db_panic_test.go deleted file mode 100644 index 61ade9888..000000000 --- a/mailserver/mailserver_db_panic_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package mailserver - -import ( - "testing" - - whisper "github.com/status-im/whisper/whisperv6" - "github.com/stretchr/testify/suite" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" -) - -type panicDB struct{} - -func (db *panicDB) Close() error { - panic("panicDB panic on Close") -} - -func (db *panicDB) Write(b *leveldb.Batch, opts *opt.WriteOptions) error { - panic("panicDB panic on Write") -} - -func (db *panicDB) Put(k []byte, v []byte, opts *opt.WriteOptions) error { - panic("panicDB panic on Put") -} - -func (db *panicDB) Get(k []byte, opts *opt.ReadOptions) ([]byte, error) { - panic("panicDB panic on Get") -} - -func (db *panicDB) NewIterator(r *util.Range, opts *opt.ReadOptions) iterator.Iterator { - panic("panicDB panic on NewIterator") -} - -func TestMailServerDBPanicSuite(t *testing.T) { - suite.Run(t, new(MailServerDBPanicSuite)) -} - -type MailServerDBPanicSuite struct { - suite.Suite - server *WMailServer -} - -func (s *MailServerDBPanicSuite) SetupTest() { - s.server = &WMailServer{} - s.server.db = &panicDB{} -} - -func (s *MailServerDBPanicSuite) TestArchive() { - defer s.testPanicRecover("Archive") - s.server.Archive(&whisper.Envelope{}) -} - -func (s *MailServerDBPanicSuite) TestDeliverMail() { - defer s.testPanicRecover("DeliverMail") - s.server.DeliverMail(&whisper.Peer{}, &whisper.Envelope{}) -} - -func (s *MailServerDBPanicSuite) testPanicRecover(method string) { - if r := recover(); r != nil { - s.Failf("error recovering panic", "expected recover to return nil, got: %+v", r) - } -} diff --git a/mailserver/mailserver_test.go b/mailserver/mailserver_test.go index 8037b53a3..b9b365460 100644 --- a/mailserver/mailserver_test.go +++ b/mailserver/mailserver_test.go @@ -28,7 +28,6 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -267,7 +266,7 @@ func (s *MailserverSuite) TestArchive() { s.server.Archive(env) key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) - archivedEnvelope, err := s.server.db.Get(key.Bytes(), nil) + archivedEnvelope, err := s.server.db.GetEnvelope(key) s.NoError(err) s.Equal(rawEnvelope, archivedEnvelope) @@ -522,7 +521,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Name string Timeout time.Duration Verify func( - iterator.Iterator, + Iterator, time.Duration, // processRequestInBundles timeout chan []rlp.RawValue, ) @@ -531,7 +530,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Name: "finish processing using `done` channel", Timeout: time.Second * 5, Verify: func( - iter iterator.Iterator, + iter Iterator, timeout time.Duration, bundles chan []rlp.RawValue, ) { @@ -555,7 +554,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Name: "finish processing due to timeout", Timeout: time.Second, Verify: func( - iter iterator.Iterator, + iter Iterator, timeout time.Duration, bundles chan []rlp.RawValue, ) { @@ -578,7 +577,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { for _, tc := range testCases { s.T().Run(tc.Name, func(t *testing.T) { - iter := s.server.createIterator(lower, upper, cursor) + iter := s.server.createIterator(lower, upper, cursor, nil, 0) defer iter.Release() // Nothing reads from this unbuffered channel which simulates a situation @@ -777,7 +776,7 @@ func generateEnvelope(sentTime time.Time) (*whisper.Envelope, error) { func processRequestAndCollectHashes( server *WMailServer, lower, upper uint32, cursor []byte, bloom []byte, limit int, ) ([]common.Hash, []byte, common.Hash) { - iter := server.createIterator(lower, upper, cursor) + iter := server.createIterator(lower, upper, cursor, nil, 0) defer iter.Release() bundles := make(chan []rlp.RawValue, 10) done := make(chan struct{})