diff --git a/mailserver/cleaner.go b/mailserver/cleaner.go index 4db0779aa..2076be898 100644 --- a/mailserver/cleaner.go +++ b/mailserver/cleaner.go @@ -26,9 +26,9 @@ func NewCleanerWithDB(db dbImpl) *Cleaner { // Prune removes messages sent between lower and upper timestamps and returns how many has been removed func (c *Cleaner) Prune(lower, upper uint32) (int, error) { var zero common.Hash - kl := NewDbKey(lower, zero) - ku := NewDbKey(upper, zero) - i := c.db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil) + kl := NewDBKey(lower, zero) + ku := NewDBKey(upper, zero) + i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) defer i.Release() return c.prune(i) diff --git a/mailserver/cleaner_test.go b/mailserver/cleaner_test.go index 0611e1927..d4e024df7 100644 --- a/mailserver/cleaner_test.go +++ b/mailserver/cleaner_test.go @@ -102,8 +102,8 @@ func countMessages(t *testing.T, db dbImpl) int { ) now := time.Now() - kl := NewDbKey(uint32(0), zero) - ku := NewDbKey(uint32(now.Unix()), zero) + kl := NewDBKey(uint32(0), zero) + ku := NewDBKey(uint32(now.Unix()), zero) i := db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil) defer i.Release() diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 6e5f44ca3..93e8c1f9e 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -31,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/status-im/status-go/db" "github.com/status-im/status-go/params" - "github.com/status-im/status-go/services/shhext" whisper "github.com/status-im/whisper/whisperv6" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -66,14 +65,14 @@ var ( ) const ( + // DBKeyLength is a size of the envelope key. + DBKeyLength = common.HashLength + timestampLength + timestampLength = 4 - dbKeyLength = common.HashLength + timestampLength requestLimitLength = 4 requestTimeRangeLength = timestampLength * 2 ) -type cursorType []byte - // dbImpl is an interface introduced to be able to test some unexpected // panics from leveldb that are difficult to reproduce. // normally the db implementation is leveldb.DB, but in TestMailServerDBPanicSuite @@ -108,17 +107,31 @@ type DBKey struct { raw []byte } -// NewDbKey creates a new DBKey with the given values. -func NewDbKey(t uint32, h common.Hash) *DBKey { +// Bytes returns a bytes representation of the DBKey. +func (k *DBKey) Bytes() []byte { + return k.raw +} + +// NewDBKey creates a new DBKey with the given values. +func NewDBKey(t uint32, h common.Hash) *DBKey { var k DBKey k.timestamp = t k.hash = h - k.raw = make([]byte, dbKeyLength) + k.raw = make([]byte, DBKeyLength) binary.BigEndian.PutUint32(k.raw, k.timestamp) copy(k.raw[4:], k.hash[:]) return &k } +// NewDBKeyFromBytes creates a DBKey from a byte slice. +func NewDBKeyFromBytes(b []byte) *DBKey { + return &DBKey{ + raw: b, + timestamp: binary.BigEndian.Uint32(b), + hash: common.BytesToHash(b[4:]), + } +} + // Init initializes mailServer. func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) error { var err error @@ -226,13 +239,13 @@ func (s *WMailServer) Archive(env *whisper.Envelope) { log.Debug("Archiving envelope", "hash", env.Hash().Hex()) - key := NewDbKey(env.Expiry-env.TTL, env.Hash()) + key := NewDBKey(env.Expiry-env.TTL, env.Hash()) rawEnvelope, err := rlp.EncodeToBytes(env) if err != nil { log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) archivedErrorsCounter.Inc(1) } else { - if err = s.db.Put(key.raw, rawEnvelope, nil); err != nil { + if err = s.db.Put(key.Bytes(), rawEnvelope, nil); err != nil { log.Error(fmt.Sprintf("Writing to DB failed: %s", err)) archivedErrorsCounter.Inc(1) } @@ -265,7 +278,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) lower, upper uint32 bloom []byte limit uint32 - cursor cursorType + cursor []byte batch bool err error ) @@ -274,7 +287,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) if err == nil { lower, upper = payload.Lower, payload.Upper bloom = payload.Bloom - cursor = cursorType(payload.Cursor) + cursor = payload.Cursor limit = payload.Limit batch = payload.Batch } else { @@ -447,23 +460,22 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool { return false } -func (s *WMailServer) createIterator(lower, upper uint32, cursor cursorType) iterator.Iterator { +func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterator.Iterator { var ( emptyHash common.Hash - ku []byte - kl []byte + ku, kl *DBKey ) - kl = NewDbKey(lower, emptyHash).raw - if len(cursor) == dbKeyLength { - ku = cursor + kl = NewDBKey(lower, emptyHash) + if len(cursor) == DBKeyLength { + ku = NewDBKeyFromBytes(cursor) } else { - ku = NewDbKey(upper+1, emptyHash).raw + ku = NewDBKey(upper+1, emptyHash) } - i := s.db.NewIterator(&util.Range{Start: kl, Limit: ku}, nil) + i := s.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) // seek to the end as we want to return envelopes in a descending order - i.Seek(ku) + i.Seek(ku.Bytes()) return i } @@ -472,13 +484,13 @@ func (s *WMailServer) createIterator(lower, upper uint32, cursor cursorType) ite // to the output channel in bundles. func (s *WMailServer) processRequestInBundles( iter iterator.Iterator, bloom []byte, limit int, output chan<- []*whisper.Envelope, -) (cursorType, common.Hash) { +) ([]byte, common.Hash) { var ( bundle []*whisper.Envelope bundleSize uint32 processedEnvelopes int processedEnvelopesSize int64 - nextCursor cursorType + nextCursor []byte lastEnvelopeHash common.Hash ) @@ -559,7 +571,7 @@ func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Env return nil } -func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope, lastEnvelopeHash common.Hash, cursor cursorType) error { +func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope, lastEnvelopeHash common.Hash, cursor []byte) error { payload := whisper.CreateMailServerRequestCompletedPayload(request.Hash(), lastEnvelopeHash, cursor) return s.w.SendHistoricMessageResponse(peer, payload) } @@ -592,8 +604,8 @@ func (s *WMailServer) openEnvelope(request *whisper.Envelope) *whisper.ReceivedM return nil } -func (s *WMailServer) decodeRequest(peerID []byte, request *whisper.Envelope) (shhext.MessagesRequestPayload, error) { - var payload shhext.MessagesRequestPayload +func (s *WMailServer) decodeRequest(peerID []byte, request *whisper.Envelope) (MessagesRequestPayload, error) { + var payload MessagesRequestPayload if s.pow > 0.0 && request.PoW() < s.pow { return payload, errors.New("PoW too low") @@ -634,7 +646,7 @@ func (s *WMailServer) decodeRequest(peerID []byte, request *whisper.Envelope) (s func (s *WMailServer) validateRequest( peerID []byte, request *whisper.Envelope, -) (uint32, uint32, []byte, uint32, cursorType, error) { +) (uint32, uint32, []byte, uint32, []byte, error) { if s.pow > 0.0 && request.PoW() < s.pow { return 0, 0, nil, 0, nil, fmt.Errorf("PoW() is too low") } @@ -673,8 +685,8 @@ func (s *WMailServer) validateRequest( limit = binary.BigEndian.Uint32(decrypted.Payload[requestTimeRangeLength+whisper.BloomFilterSize:]) } - var cursor cursorType - if len(decrypted.Payload) == requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength+dbKeyLength { + var cursor []byte + if len(decrypted.Payload) == requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength+DBKeyLength { cursor = decrypted.Payload[requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength:] } diff --git a/mailserver/mailserver_test.go b/mailserver/mailserver_test.go index 036798727..10c426b19 100644 --- a/mailserver/mailserver_test.go +++ b/mailserver/mailserver_test.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" "github.com/status-im/status-go/params" - "github.com/status-im/status-go/services/shhext" whisper "github.com/status-im/whisper/whisperv6" "github.com/stretchr/testify/suite" ) @@ -266,8 +265,8 @@ func (s *MailserverSuite) TestArchive() { s.NoError(err) s.server.Archive(env) - key := NewDbKey(env.Expiry-env.TTL, env.Hash()) - archivedEnvelope, err := s.server.db.Get(key.raw, nil) + key := NewDBKey(env.Expiry-env.TTL, env.Hash()) + archivedEnvelope, err := s.server.db.Get(key.Bytes(), nil) s.NoError(err) s.Equal(rawEnvelope, archivedEnvelope) @@ -288,10 +287,10 @@ func (s *MailserverSuite) TestManageLimits() { func (s *MailserverSuite) TestDBKey() { var h common.Hash i := uint32(time.Now().Unix()) - k := NewDbKey(i, h) - s.Equal(len(k.raw), common.HashLength+4, "wrong DB key length") - s.Equal(byte(i%0x100), k.raw[3], "raw representation should be big endian") - s.Equal(byte(i/0x1000000), k.raw[0], "big endian expected") + k := NewDBKey(i, h) + s.Equal(len(k.Bytes()), DBKeyLength, "wrong DB key length") + s.Equal(byte(i%0x100), k.Bytes()[3], "raw representation should be big endian") + s.Equal(byte(i/0x1000000), k.Bytes()[0], "big endian expected") } func (s *MailserverSuite) TestRequestPaginationLimit() { @@ -313,8 +312,8 @@ func (s *MailserverSuite) TestRequestPaginationLimit() { env, err := generateEnvelope(sentTime) s.NoError(err) s.server.Archive(env) - key := NewDbKey(env.Expiry-env.TTL, env.Hash()) - archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.raw)) + key := NewDBKey(env.Expiry-env.TTL, env.Hash()) + archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.Bytes())) sentEnvelopes = append(sentEnvelopes, env) reverseSentHashes = append([]common.Hash{env.Hash()}, reverseSentHashes...) } @@ -447,7 +446,7 @@ func (s *MailserverSuite) TestDecodeRequest() { s.setupServer(s.server) defer s.server.Close() - payload := shhext.MessagesRequestPayload{ + payload := MessagesRequestPayload{ Lower: 50, Upper: 100, Bloom: []byte{0x01}, @@ -635,8 +634,8 @@ func generateEnvelope(sentTime time.Time) (*whisper.Envelope, error) { } func processRequestAndCollectHashes( - server *WMailServer, lower, upper uint32, cursor cursorType, bloom []byte, limit int, -) ([]common.Hash, cursorType, common.Hash) { + server *WMailServer, lower, upper uint32, cursor []byte, bloom []byte, limit int, +) ([]common.Hash, []byte, common.Hash) { iter := server.createIterator(lower, upper, cursor) defer iter.Release() bundles := make(chan []*whisper.Envelope, 10) diff --git a/mailserver/request.go b/mailserver/request.go new file mode 100644 index 000000000..9dbb76a50 --- /dev/null +++ b/mailserver/request.go @@ -0,0 +1,17 @@ +package mailserver + +// MessagesRequestPayload is a payload sent to the Mail Server. +type MessagesRequestPayload struct { + // Lower is a lower bound of time range for which messages are requested. + Lower uint32 + // Upper is a lower bound of time range for which messages are requested. + Upper uint32 + // Bloom is a bloom filter to filter envelopes. + Bloom []byte + // Limit is the max number of envelopes to return. + Limit uint32 + // Cursor is used for pagination of the results. + Cursor []byte + // Batch set to true indicates that the client supports batched response. + Batch bool +} diff --git a/services/shhext/api.go b/services/shhext/api.go index b3a69e029..48db72fc4 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/status-im/status-go/mailserver" "github.com/status-im/status-go/services/shhext/chat" "github.com/status-im/status-go/services/shhext/mailservers" whisper "github.com/status-im/whisper/whisperv6" @@ -104,22 +105,6 @@ func (r *MessagesRequest) setDefaults(now time.Time) { } } -// MessagesRequestPayload is a payload sent to the Mail Server. -type MessagesRequestPayload struct { - // Lower is a lower bound of time range for which messages are requested. - Lower uint32 - // Upper is a lower bound of time range for which messages are requested. - Upper uint32 - // Bloom is a bloom filter to filter envelopes. - Bloom []byte - // Limit is the max number of envelopes to return. - Limit uint32 - // Cursor is used for pagination of the results. - Cursor []byte - // Batch set to true indicates that the client supports batched response. - Batch bool -} - // ----- // PUBLIC API // ----- @@ -188,7 +173,7 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex publicKey = mailServerNode.Pubkey() } - payload, err := makePayload(r) + payload, err := makeMessagesRequestPayload(r) if err != nil { return nil, err } @@ -473,15 +458,18 @@ func makeEnvelop( return message.Wrap(¶ms, now) } -// makePayload makes a specific payload for MailServer to request historic messages. -func makePayload(r MessagesRequest) ([]byte, error) { - expectedCursorSize := common.HashLength + 4 +// makeMessagesRequestPayload makes a specific payload for MailServer +// to request historic messages. +func makeMessagesRequestPayload(r MessagesRequest) ([]byte, error) { cursor, err := hex.DecodeString(r.Cursor) - if err != nil || len(cursor) != expectedCursorSize { - cursor = nil + if err != nil { + return nil, fmt.Errorf("invalid cursor: %v", err) + } + if len(cursor) > 0 && len(cursor) != mailserver.DBKeyLength { + return nil, fmt.Errorf("invalid cursor size: expected %d but got %d", mailserver.DBKeyLength, len(cursor)) } - payload := MessagesRequestPayload{ + payload := mailserver.MessagesRequestPayload{ Lower: r.From, Upper: r.To, Bloom: createBloomFilter(r), diff --git a/services/shhext/api_test.go b/services/shhext/api_test.go index a1c3e5016..f7f2fc7fe 100644 --- a/services/shhext/api_test.go +++ b/services/shhext/api_test.go @@ -1,10 +1,14 @@ package shhext import ( + "encoding/hex" "fmt" "testing" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/mailserver" + whisper "github.com/status-im/whisper/whisperv6" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -55,6 +59,43 @@ func TestMessagesRequest_setDefaults(t *testing.T) { } } +func TestMakeMessagesRequestPayload(t *testing.T) { + testCases := []struct { + Name string + Req MessagesRequest + Err string + }{ + { + Name: "empty cursor", + Req: MessagesRequest{Cursor: ""}, + Err: "", + }, + { + Name: "invalid cursor size", + Req: MessagesRequest{Cursor: hex.EncodeToString([]byte{0x01, 0x02, 0x03})}, + Err: fmt.Sprintf("invalid cursor size: expected %d but got 3", mailserver.DBKeyLength), + }, + { + Name: "valid cursor", + Req: MessagesRequest{ + Cursor: hex.EncodeToString(mailserver.NewDBKey(123, common.Hash{}).Bytes()), + }, + Err: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + _, err := makeMessagesRequestPayload(tc.Req) + if tc.Err == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.Err) + } + }) + } +} + func TestTopicsToBloom(t *testing.T) { t1 := stringToTopic("t1") b1 := whisper.TopicToBloom(t1) diff --git a/signal/events_shhext.go b/signal/events_shhext.go index 1d0cb312c..0960671e5 100644 --- a/signal/events_shhext.go +++ b/signal/events_shhext.go @@ -1,6 +1,8 @@ package signal import ( + "encoding/hex" + "github.com/ethereum/go-ethereum/common" ) @@ -71,7 +73,7 @@ func SendMailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash comm sig := MailServerResponseSignal{ RequestID: requestID, LastEnvelopeHash: lastEnvelopeHash, - Cursor: string(cursor), + Cursor: hex.EncodeToString(cursor), ErrorMsg: errorMsg, } send(EventMailServerRequestCompleted, sig)