diff --git a/mailserver/cleaner.go b/mailserver/cleaner.go index f3c36f06a..761885cf7 100644 --- a/mailserver/cleaner.go +++ b/mailserver/cleaner.go @@ -4,12 +4,7 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" - whisper "github.com/status-im/whisper/whisperv6" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/util" ) const ( @@ -86,40 +81,5 @@ func (c *dbCleaner) schedule(period time.Duration, cancel <-chan struct{}) { // PruneEntriesOlderThan removes messages sent between lower and upper timestamps // and returns how many have been removed. func (c *dbCleaner) PruneEntriesOlderThan(t time.Time) (int, error) { - var zero common.Hash - var emptyTopic whisper.TopicType - kl := NewDBKey(0, emptyTopic, zero) - ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero) - i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) - defer i.Release() - - return c.prune(i) -} - -func (c *dbCleaner) prune(i iterator.Iterator) (int, error) { - batch := leveldb.Batch{} - removed := 0 - - for i.Next() { - batch.Delete(i.Key()) - - if batch.Len() == c.batchSize { - if err := c.db.Write(&batch, nil); err != nil { - return removed, err - } - - removed = removed + batch.Len() - batch.Reset() - } - } - - if batch.Len() > 0 { - if err := c.db.Write(&batch, nil); err != nil { - return removed, err - } - - removed = removed + batch.Len() - } - - return removed, nil + return c.db.Prune(t, c.batchSize) } diff --git a/mailserver/cleaner_test.go b/mailserver/cleaner_test.go index f90057803..e8a21e65c 100644 --- a/mailserver/cleaner_test.go +++ b/mailserver/cleaner_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/storage" - "github.com/syndtr/goleveldb/leveldb/util" ) func TestCleaner(t *testing.T) { @@ -89,7 +88,9 @@ func BenchmarkCleanerPruneM100_000_B100(b *testing.B) { func setupTestServer(t *testing.T) *WMailServer { var s WMailServer - s.db, _ = leveldb.Open(storage.NewMemStorage(), nil) + db, _ := leveldb.Open(storage.NewMemStorage(), nil) + + s.db = &LevelDBImpl{ldb: db} s.pow = powRequirement return &s } @@ -123,7 +124,13 @@ func countMessages(t *testing.T, db dbImpl) int { now := time.Now() kl := NewDBKey(uint32(0), emptyTopic, zero) ku := NewDBKey(uint32(now.Unix()), emptyTopic, zero) - i := db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil) + + query := CursorQuery{ + start: kl.raw, + end: ku.raw, + } + + i := db.BuildIterator(query) defer i.Release() for i.Next() { diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 2b42b30de..caf0f1c5d 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -29,13 +29,8 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" - "github.com/status-im/status-go/db" "github.com/status-im/status-go/params" whisper "github.com/status-im/whisper/whisperv6" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" ) const ( @@ -58,20 +53,6 @@ const ( processRequestTimeout = time.Minute ) -// dbImpl is an interface introduced to be able to test some unexpected -// panics from leveldb that are difficult to reproduce. -// normally the db implementation is leveldb.DB, but in TestMailServerDBPanicSuite -// we use panicDB to test panics from the db. -// more info about the panic errors: -// https://github.com/syndtr/goleveldb/issues/224 -type dbImpl interface { - Close() error - Write(*leveldb.Batch, *opt.WriteOptions) error - Put([]byte, []byte, *opt.WriteOptions) error - Get([]byte, *opt.ReadOptions) ([]byte, error) - NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator -} - // WMailServer whisper mailserver. type WMailServer struct { db dbImpl @@ -111,7 +92,7 @@ func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) e // Open database in the last step in order not to init with error // and leave the database open by accident. - database, err := db.Open(config.DataDir, nil) + database, err := NewLevelDBImpl(config) if err != nil { return fmt.Errorf("open DB: %s", err) } @@ -183,39 +164,15 @@ func (s *WMailServer) Close() { } } -func recoverLevelDBPanics(calleMethodName string) { - // Recover from possible goleveldb panics - if r := recover(); r != nil { - if errString, ok := r.(string); ok { - log.Error(fmt.Sprintf("recovered from panic in %s: %s", calleMethodName, errString)) - } - } -} - // Archive a whisper envelope. func (s *WMailServer) Archive(env *whisper.Envelope) { - defer recoverLevelDBPanics("Archive") - - log.Debug("Archiving envelope", "hash", env.Hash().Hex()) - - key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) - rawEnvelope, err := rlp.EncodeToBytes(env) - if err != nil { - log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) - archivedErrorsCounter.Inc(1) - } else { - if err = s.db.Put(key.Bytes(), rawEnvelope, nil); err != nil { - log.Error(fmt.Sprintf("Writing to DB failed: %s", err)) - archivedErrorsCounter.Inc(1) - } - archivedMeter.Mark(1) - archivedSizeMeter.Mark(int64(whisper.EnvelopeHeaderLength + len(env.Data))) + if err := s.db.SaveEnvelope(env); err != nil { + log.Error("Could not save envelope", "hash", env.Hash()) } } // DeliverMail sends mail to specified whisper peer. func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) { - defer recoverLevelDBPanics("DeliverMail") startMethod := time.Now() defer deliverMailTimer.UpdateSince(startMethod) @@ -296,7 +253,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) requestsBatchedCounter.Inc(1) } - iter := s.createIterator(lower, upper, cursor) + iter := s.createIterator(lower, upper, cursor, bloom, limit) defer iter.Release() bundles := make(chan []rlp.RawValue, 5) @@ -375,8 +332,6 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailRequest) error { log.Info("Started syncing envelopes", "peer", peerIDString(peer), "req", request) - defer recoverLevelDBPanics("SyncMail") - requestID := fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Intn(1000)) syncRequestsMeter.Mark(1) @@ -392,7 +347,7 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque return fmt.Errorf("request is invalid: %v", err) } - iter := s.createIterator(request.Lower, request.Upper, request.Cursor) + iter := s.createIterator(request.Lower, request.Upper, request.Cursor, nil, 0) defer iter.Release() bundles := make(chan []rlp.RawValue, 5) @@ -473,7 +428,7 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool { return true } -func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterator.Iterator { +func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte, bloom []byte, limit uint32) Iterator { var ( emptyHash common.Hash emptyTopic whisper.TopicType @@ -483,18 +438,20 @@ func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterato ku = NewDBKey(upper+1, emptyTopic, emptyHash) kl = NewDBKey(lower, emptyTopic, emptyHash) - i := s.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) - // seek to the end as we want to return envelopes in a descending order - if len(cursor) == CursorLength { - i.Seek(cursor) + query := CursorQuery{ + start: kl.Bytes(), + end: ku.Bytes(), + cursor: cursor, + bloom: bloom, + limit: limit, } - return i + return s.db.BuildIterator(query) } // processRequestInBundles processes envelopes using an iterator and passes them // to the output channel in bundles. func (s *WMailServer) processRequestInBundles( - iter iterator.Iterator, + iter Iterator, bloom []byte, limit int, timeout time.Duration, @@ -524,31 +481,22 @@ func (s *WMailServer) processRequestInBundles( // current envelope, and leave if we hit the limit for iter.Next() { - rawValue := make([]byte, len(iter.Value())) - copy(rawValue, iter.Value()) + rawValue, err := iter.GetEnvelope(bloom) + + if err != nil { + log.Error("Failed to get envelope from iterator", + "err", err, + "requestID", requestID) + continue - key := &DBKey{ - raw: iter.Key(), } - var envelopeBloom []byte - // Old key, we extract the topic from the envelope - if len(key.Bytes()) != DBKeyLength { - var err error - envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue) - if err != nil { - log.Error("[mailserver:processRequestInBundles] failed to decode RLP", - "err", err, - "requestID", requestID) - continue - } - } else { - envelopeBloom = whisper.TopicToBloom(key.Topic()) - } - if !whisper.BloomFilterMatch(bloom, envelopeBloom) { + if rawValue == nil { continue } + key := iter.DBKey() + lastEnvelopeHash = key.EnvelopeHash() processedEnvelopes++ envelopeSize := uint32(len(rawValue)) diff --git a/mailserver/mailserver_db.go b/mailserver/mailserver_db.go new file mode 100644 index 000000000..414c5e2c8 --- /dev/null +++ b/mailserver/mailserver_db.go @@ -0,0 +1,38 @@ +package mailserver + +import ( + whisper "github.com/status-im/whisper/whisperv6" + "time" +) + +// dbImpl is an interface to abstract interactions with the db so that the mailserver +// is agnostic to the underlaying technology used +type dbImpl interface { + Close() error + // SaveEnvelope stores an envelope + SaveEnvelope(*whisper.Envelope) error + // GetEnvelope returns an rlp encoded envelope from the datastore + GetEnvelope(*DBKey) ([]byte, error) + // Prune removes envelopes older than time + Prune(time.Time, int) (int, error) + // BuildIterator returns an iterator over envelopes + BuildIterator(query CursorQuery) Iterator +} + +type Iterator interface { + Next() bool + Prev() bool + DBKey() *DBKey + Value() []byte + Release() + Error() error + GetEnvelope(bloom []byte) ([]byte, error) +} + +type CursorQuery struct { + start []byte + end []byte + cursor []byte + limit uint32 + bloom []byte +} diff --git a/mailserver/mailserver_db_leveldb.go b/mailserver/mailserver_db_leveldb.go new file mode 100644 index 000000000..46c95e065 --- /dev/null +++ b/mailserver/mailserver_db_leveldb.go @@ -0,0 +1,157 @@ +package mailserver + +import ( + "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/status-im/status-go/params" + whisper "github.com/status-im/whisper/whisperv6" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/util" + "time" +) + +type LevelDBImpl struct { + // We can't embed as there are some state problems with go-routines + ldb *leveldb.DB +} + +type LevelDBIterator struct { + iterator.Iterator +} + +func (i *LevelDBIterator) DBKey() *DBKey { + return &DBKey{ + raw: i.Key(), + } +} + +func (i *LevelDBIterator) GetEnvelope(bloom []byte) ([]byte, error) { + var envelopeBloom []byte + rawValue := make([]byte, len(i.Value())) + copy(rawValue, i.Value()) + + key := i.DBKey() + if len(key.Bytes()) != DBKeyLength { + var err error + envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue) + if err != nil { + return nil, err + } + } else { + envelopeBloom = whisper.TopicToBloom(key.Topic()) + } + if !whisper.BloomFilterMatch(bloom, envelopeBloom) { + return nil, nil + } + return rawValue, nil + +} + +func NewLevelDBImpl(config *params.WhisperConfig) (*LevelDBImpl, error) { + // Open opens an existing leveldb database + db, err := leveldb.OpenFile(config.DataDir, nil) + if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted { + log.Info("database is corrupted trying to recover", "path", config.DataDir) + db, err = leveldb.RecoverFile(config.DataDir, nil) + } + return &LevelDBImpl{ldb: db}, err +} + +// Build iterator returns an iterator given a start/end and a cursor +func (db *LevelDBImpl) BuildIterator(query CursorQuery) Iterator { + defer recoverLevelDBPanics("BuildIterator") + + i := db.ldb.NewIterator(&util.Range{Start: query.start, Limit: query.end}, nil) + // seek to the end as we want to return envelopes in a descending order + if len(query.cursor) == CursorLength { + i.Seek(query.cursor) + } + return &LevelDBIterator{i} +} + +// GetEnvelope get an envelope by its key +func (db *LevelDBImpl) GetEnvelope(key *DBKey) ([]byte, error) { + defer recoverLevelDBPanics("GetEnvelope") + + return db.ldb.Get(key.Bytes(), nil) +} + +// Prune removes envelopes older than time +func (db *LevelDBImpl) Prune(t time.Time, batchSize int) (int, error) { + defer recoverLevelDBPanics("Prune") + + var zero common.Hash + var emptyTopic whisper.TopicType + kl := NewDBKey(0, emptyTopic, zero) + ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero) + query := CursorQuery{ + start: kl.Bytes(), + end: ku.Bytes(), + } + i := db.BuildIterator(query) + defer i.Release() + + batch := leveldb.Batch{} + removed := 0 + + for i.Next() { + batch.Delete(i.DBKey().Bytes()) + + if batch.Len() == batchSize { + if err := db.ldb.Write(&batch, nil); err != nil { + return removed, err + } + + removed = removed + batch.Len() + batch.Reset() + } + } + + if batch.Len() > 0 { + if err := db.ldb.Write(&batch, nil); err != nil { + return removed, err + } + + removed = removed + batch.Len() + } + + return removed, nil +} + +// SaveEnvelope stores an envelope in leveldb and increments the metrics +func (db *LevelDBImpl) SaveEnvelope(env *whisper.Envelope) error { + defer recoverLevelDBPanics("SaveEnvelope") + + key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) + rawEnvelope, err := rlp.EncodeToBytes(env) + if err != nil { + log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) + archivedErrorsCounter.Inc(1) + return err + } + + if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil { + log.Error(fmt.Sprintf("Writing to DB failed: %s", err)) + archivedErrorsCounter.Inc(1) + } + archivedMeter.Mark(1) + archivedSizeMeter.Mark(int64(whisper.EnvelopeHeaderLength + len(env.Data))) + return err +} + +func (db *LevelDBImpl) Close() error { + return db.ldb.Close() +} + +func recoverLevelDBPanics(calleMethodName string) { + // Recover from possible goleveldb panics + if r := recover(); r != nil { + if errString, ok := r.(string); ok { + log.Error(fmt.Sprintf("recovered from panic in %s: %s", calleMethodName, errString)) + } + } +} diff --git a/mailserver/mailserver_db_panic_test.go b/mailserver/mailserver_db_panic_test.go deleted file mode 100644 index 61ade9888..000000000 --- a/mailserver/mailserver_db_panic_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package mailserver - -import ( - "testing" - - whisper "github.com/status-im/whisper/whisperv6" - "github.com/stretchr/testify/suite" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" -) - -type panicDB struct{} - -func (db *panicDB) Close() error { - panic("panicDB panic on Close") -} - -func (db *panicDB) Write(b *leveldb.Batch, opts *opt.WriteOptions) error { - panic("panicDB panic on Write") -} - -func (db *panicDB) Put(k []byte, v []byte, opts *opt.WriteOptions) error { - panic("panicDB panic on Put") -} - -func (db *panicDB) Get(k []byte, opts *opt.ReadOptions) ([]byte, error) { - panic("panicDB panic on Get") -} - -func (db *panicDB) NewIterator(r *util.Range, opts *opt.ReadOptions) iterator.Iterator { - panic("panicDB panic on NewIterator") -} - -func TestMailServerDBPanicSuite(t *testing.T) { - suite.Run(t, new(MailServerDBPanicSuite)) -} - -type MailServerDBPanicSuite struct { - suite.Suite - server *WMailServer -} - -func (s *MailServerDBPanicSuite) SetupTest() { - s.server = &WMailServer{} - s.server.db = &panicDB{} -} - -func (s *MailServerDBPanicSuite) TestArchive() { - defer s.testPanicRecover("Archive") - s.server.Archive(&whisper.Envelope{}) -} - -func (s *MailServerDBPanicSuite) TestDeliverMail() { - defer s.testPanicRecover("DeliverMail") - s.server.DeliverMail(&whisper.Peer{}, &whisper.Envelope{}) -} - -func (s *MailServerDBPanicSuite) testPanicRecover(method string) { - if r := recover(); r != nil { - s.Failf("error recovering panic", "expected recover to return nil, got: %+v", r) - } -} diff --git a/mailserver/mailserver_test.go b/mailserver/mailserver_test.go index 8037b53a3..b9b365460 100644 --- a/mailserver/mailserver_test.go +++ b/mailserver/mailserver_test.go @@ -28,7 +28,6 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -267,7 +266,7 @@ func (s *MailserverSuite) TestArchive() { s.server.Archive(env) key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) - archivedEnvelope, err := s.server.db.Get(key.Bytes(), nil) + archivedEnvelope, err := s.server.db.GetEnvelope(key) s.NoError(err) s.Equal(rawEnvelope, archivedEnvelope) @@ -522,7 +521,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Name string Timeout time.Duration Verify func( - iterator.Iterator, + Iterator, time.Duration, // processRequestInBundles timeout chan []rlp.RawValue, ) @@ -531,7 +530,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Name: "finish processing using `done` channel", Timeout: time.Second * 5, Verify: func( - iter iterator.Iterator, + iter Iterator, timeout time.Duration, bundles chan []rlp.RawValue, ) { @@ -555,7 +554,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Name: "finish processing due to timeout", Timeout: time.Second, Verify: func( - iter iterator.Iterator, + iter Iterator, timeout time.Duration, bundles chan []rlp.RawValue, ) { @@ -578,7 +577,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { for _, tc := range testCases { s.T().Run(tc.Name, func(t *testing.T) { - iter := s.server.createIterator(lower, upper, cursor) + iter := s.server.createIterator(lower, upper, cursor, nil, 0) defer iter.Release() // Nothing reads from this unbuffered channel which simulates a situation @@ -777,7 +776,7 @@ func generateEnvelope(sentTime time.Time) (*whisper.Envelope, error) { func processRequestAndCollectHashes( server *WMailServer, lower, upper uint32, cursor []byte, bloom []byte, limit int, ) ([]common.Hash, []byte, common.Hash) { - iter := server.createIterator(lower, upper, cursor) + iter := server.createIterator(lower, upper, cursor, nil, 0) defer iter.Release() bundles := make(chan []rlp.RawValue, 10) done := make(chan struct{})