diff --git a/mailserver/cleaner.go b/mailserver/cleaner.go index b4a4e22d4..4db0779aa 100644 --- a/mailserver/cleaner.go +++ b/mailserver/cleaner.go @@ -11,12 +11,12 @@ const batchSize = 1000 // Cleaner removes old messages from a db type Cleaner struct { - db *leveldb.DB + db dbImpl batchSize int } // NewCleanerWithDB returns a new Cleaner for db -func NewCleanerWithDB(db *leveldb.DB) *Cleaner { +func NewCleanerWithDB(db dbImpl) *Cleaner { return &Cleaner{ db: db, batchSize: batchSize, diff --git a/mailserver/cleaner_test.go b/mailserver/cleaner_test.go index 8926dd526..ecab0803f 100644 --- a/mailserver/cleaner_test.go +++ b/mailserver/cleaner_test.go @@ -95,7 +95,7 @@ func testMessagesCount(t *testing.T, expected int, s *WMailServer) { require.Equal(t, expected, count, fmt.Sprintf("expected %d message, got: %d", expected, count)) } -func countMessages(t *testing.T, db *leveldb.DB) int { +func countMessages(t *testing.T, db dbImpl) int { var ( count int zero common.Hash diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 562992258..3ac5a411b 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -31,6 +31,8 @@ import ( whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" "github.com/status-im/status-go/params" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" ) @@ -53,9 +55,23 @@ var ( archivedErrorsCounter = metrics.NewRegisteredCounter("mailserver/archiveErrors", nil) ) +// dbImpl is an interface introduced to be able to test some unexpected +// panics from leveldb that are difficult to reproduce. +// normally the db implementation is leveldb.DB, but in TestMailServerDBPanicSuite +// we use panicDB to test panics from the db. +// more info about the panic errors: +// https://github.com/syndtr/goleveldb/issues/224 +type dbImpl interface { + Close() error + Write(*leveldb.Batch, *opt.WriteOptions) error + Put([]byte, []byte, *opt.WriteOptions) error + Get([]byte, *opt.ReadOptions) ([]byte, error) + NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator +} + // WMailServer whisper mailserver. type WMailServer struct { - db *leveldb.DB + db dbImpl w *whisper.Whisper pow float64 key []byte @@ -94,11 +110,12 @@ func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) e return errPasswordNotProvided } - s.db, err = leveldb.OpenFile(config.DataDir, nil) + db, err := leveldb.OpenFile(config.DataDir, nil) if err != nil { return fmt.Errorf("open DB: %s", err) } + s.db = db s.w = shh s.pow = config.MinimumPoW @@ -202,7 +219,12 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) defer recoverLevelDBPanics("DeliverMail") if ok, lower, upper, bloom := s.validateRequest(peer.ID(), request); ok { - s.processRequest(peer, lower, upper, bloom) + _, err := s.processRequest(peer, lower, upper, bloom) + if err != nil { + log.Error(fmt.Sprintf("error in DeliverMail: %s", err)) + return + } + if err := s.sendHistoricMessageResponse(peer, request); err != nil { log.Error(fmt.Sprintf("SendHistoricMessageResponse error: %s", err)) } @@ -225,9 +247,14 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool { // processRequest processes the current request and re-sends all stored messages // accomplishing lower and upper limits. -func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bloom []byte) []*whisper.Envelope { - ret := make([]*whisper.Envelope, 0) - var err error +func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bloom []byte) (ret []*whisper.Envelope, err error) { + // Recover from possible goleveldb panics + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("recovered from panic in processRequest: %v", r) + } + }() + var zero common.Hash kl := NewDbKey(lower, zero) ku := NewDbKey(upper+1, zero) // LevelDB is exclusive, while the Whisper API is inclusive @@ -243,9 +270,10 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl for i.Next() { var envelope whisper.Envelope - err = rlp.DecodeBytes(i.Value(), &envelope) - if err != nil { - log.Error(fmt.Sprintf("RLP decoding failed: %s", err)) + decodeErr := rlp.DecodeBytes(i.Value(), &envelope) + if decodeErr != nil { + log.Error(fmt.Sprintf("RLP decoding failed: %s", decodeErr)) + continue } if whisper.BloomFilterMatch(bloom, envelope.Bloom()) { @@ -256,7 +284,7 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl err = s.w.SendP2PDirect(peer, &envelope) if err != nil { log.Error(fmt.Sprintf("Failed to send direct message to peer: %s", err)) - return nil + return } } sentEnvelopes++ @@ -273,7 +301,7 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl log.Error(fmt.Sprintf("Level DB iterator error: %s", err)) } - return ret + return } func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope) error { diff --git a/mailserver/mailserver_db_panic_test.go b/mailserver/mailserver_db_panic_test.go new file mode 100644 index 000000000..a596ae2c0 --- /dev/null +++ b/mailserver/mailserver_db_panic_test.go @@ -0,0 +1,66 @@ +package mailserver + +import ( + "testing" + + whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" + "github.com/stretchr/testify/suite" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" +) + +type panicDB struct{} + +func (db *panicDB) Close() error { + panic("panicDB panic on Close") +} + +func (db *panicDB) Write(b *leveldb.Batch, opts *opt.WriteOptions) error { + panic("panicDB panic on Write") +} + +func (db *panicDB) Put(k []byte, v []byte, opts *opt.WriteOptions) error { + panic("panicDB panic on Put") +} + +func (db *panicDB) Get(k []byte, opts *opt.ReadOptions) ([]byte, error) { + panic("panicDB panic on Get") +} + +func (db *panicDB) NewIterator(r *util.Range, opts *opt.ReadOptions) iterator.Iterator { + panic("panicDB panic on NewIterator") +} + +func TestMailServerDBPanicSuite(t *testing.T) { + suite.Run(t, new(MailServerDBPanicSuite)) +} + +type MailServerDBPanicSuite struct { + suite.Suite + server *WMailServer +} + +func (s *MailServerDBPanicSuite) SetupTest() { + s.server = &WMailServer{} + s.server.db = &panicDB{} +} + +func (s *MailServerDBPanicSuite) TestArchive() { + defer s.testPanicRecover("Archive") + s.server.Archive(&whisper.Envelope{}) +} + +func (s *MailServerDBPanicSuite) TestDeliverMail() { + defer s.testPanicRecover("DeliverMail") + _, err := s.server.processRequest(nil, 10, 20, []byte{}) + s.Error(err) + s.Equal("recovered from panic in processRequest: panicDB panic on NewIterator", err.Error()) +} + +func (s *MailServerDBPanicSuite) testPanicRecover(method string) { + if r := recover(); r != nil { + s.Failf("error recovering panic", "expected recover to return nil, got: %+v", r) + } +} diff --git a/mailserver/mailserver_test.go b/mailserver/mailserver_test.go index 984e97711..134de5208 100644 --- a/mailserver/mailserver_test.go +++ b/mailserver/mailserver_test.go @@ -251,7 +251,8 @@ func (s *MailserverSuite) TestMailServer() { func (s *MailserverSuite) messageExists(envelope *whisper.Envelope, low, upp uint32, bloom []byte) bool { var exist bool - mail := s.server.processRequest(nil, low, upp, bloom) + mail, err := s.server.processRequest(nil, low, upp, bloom) + s.NoError(err) for _, msg := range mail { if msg.Hash() == envelope.Hash() { exist = true