diff --git a/Gopkg.lock b/Gopkg.lock index 5e9790e6d..78baaec1e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -846,12 +846,12 @@ version = "v1.1.0" [[projects]] - digest = "1:d499fd4787bb7a4a5f6fe9f75a517346d70e1e4ab3dbcc83ed85151833e3493d" + digest = "1:0612400565fd528de0ca1a2908a9b18e9fbc33add226df524cf404bf091d87c8" name = "github.com/status-im/whisper" packages = ["whisperv6"] pruneopts = "NUT" - revision = "4fae75da94b1ab6dc13a5fa7d5087bfbfa04406f" - version = "v1.4.12" + revision = "0b742129500f4f54ddbf15c02ad53d4f7101d2a8" + version = "v1.4.13" [[projects]] digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" diff --git a/Gopkg.toml b/Gopkg.toml index 1e8cfd49e..21c8dc2b1 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -46,7 +46,7 @@ [[constraint]] name = "github.com/status-im/whisper" - version = "=v1.4.12" + version = "=v1.4.13" [[constraint]] name = "golang.org/x/text" diff --git a/mailserver/cleaner.go b/mailserver/cleaner.go index 531169ab6..f3c36f06a 100644 --- a/mailserver/cleaner.go +++ b/mailserver/cleaner.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + whisper "github.com/status-im/whisper/whisperv6" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/util" @@ -86,8 +87,9 @@ func (c *dbCleaner) schedule(period time.Duration, cancel <-chan struct{}) { // and returns how many have been removed. func (c *dbCleaner) PruneEntriesOlderThan(t time.Time) (int, error) { var zero common.Hash - kl := NewDBKey(0, zero) - ku := NewDBKey(uint32(t.Unix()), zero) + var emptyTopic whisper.TopicType + kl := NewDBKey(0, emptyTopic, zero) + ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero) i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) defer i.Release() diff --git a/mailserver/cleaner_test.go b/mailserver/cleaner_test.go index 285f63899..f90057803 100644 --- a/mailserver/cleaner_test.go +++ b/mailserver/cleaner_test.go @@ -115,13 +115,14 @@ func testMessagesCount(t *testing.T, expected int, s *WMailServer) { func countMessages(t *testing.T, db dbImpl) int { var ( - count int - zero common.Hash + count int + zero common.Hash + emptyTopic whisper.TopicType ) now := time.Now() - kl := NewDBKey(uint32(0), zero) - ku := NewDBKey(uint32(now.Unix()), zero) + kl := NewDBKey(uint32(0), emptyTopic, zero) + ku := NewDBKey(uint32(now.Unix()), emptyTopic, zero) i := db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil) defer i.Release() diff --git a/mailserver/db_key.go b/mailserver/db_key.go index 9ea4bc8ab..1267fdd38 100644 --- a/mailserver/db_key.go +++ b/mailserver/db_key.go @@ -5,11 +5,13 @@ import ( "errors" "github.com/ethereum/go-ethereum/common" + whisper "github.com/status-im/whisper/whisperv6" ) const ( // DBKeyLength is a size of the envelope key. - DBKeyLength = common.HashLength + timestampLength + DBKeyLength = common.HashLength + timestampLength + whisper.TopicLength + CursorLength = common.HashLength + timestampLength ) var ( @@ -20,9 +22,7 @@ var ( // DBKey key to be stored in a db. type DBKey struct { - timestamp uint32 - hash common.Hash - raw []byte + raw []byte } // Bytes returns a bytes representation of the DBKey. @@ -30,26 +30,25 @@ func (k *DBKey) Bytes() []byte { return k.raw } +func (k *DBKey) Topic() whisper.TopicType { + return whisper.BytesToTopic(k.raw[timestampLength+common.HashLength:]) +} + +func (k *DBKey) EnvelopeHash() common.Hash { + return common.BytesToHash(k.raw[timestampLength : common.HashLength+timestampLength]) +} + +func (k *DBKey) Cursor() []byte { + // We don't use the whole cursor for backward compatibility (also it's not needed) + return k.raw[:CursorLength] +} + // NewDBKey creates a new DBKey with the given values. -func NewDBKey(timestamp uint32, h common.Hash) *DBKey { +func NewDBKey(timestamp uint32, topic whisper.TopicType, h common.Hash) *DBKey { var k DBKey - k.timestamp = timestamp - k.hash = h k.raw = make([]byte, DBKeyLength) - binary.BigEndian.PutUint32(k.raw, k.timestamp) - copy(k.raw[4:], k.hash[:]) + binary.BigEndian.PutUint32(k.raw, timestamp) + copy(k.raw[timestampLength:], h[:]) + copy(k.raw[timestampLength+common.HashLength:], topic[:]) return &k } - -// NewDBKeyFromBytes creates a DBKey from a byte slice. -func NewDBKeyFromBytes(b []byte) (*DBKey, error) { - if len(b) != DBKeyLength { - return nil, ErrInvalidByteSize - } - - return &DBKey{ - raw: b, - timestamp: binary.BigEndian.Uint32(b), - hash: common.BytesToHash(b[4:]), - }, nil -} diff --git a/mailserver/db_key_test.go b/mailserver/db_key_test.go new file mode 100644 index 000000000..c64012e02 --- /dev/null +++ b/mailserver/db_key_test.go @@ -0,0 +1,19 @@ +package mailserver + +import ( + "github.com/ethereum/go-ethereum/common" + whisper "github.com/status-im/whisper/whisperv6" + "github.com/stretchr/testify/require" + "testing" +) + +func TestNewDBKey(t *testing.T) { + topic := whisper.BytesToTopic([]byte{0x01, 0x02, 0x03, 0x04}) + + hash := common.BytesToHash([]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30, 0x31, 0x32}) + dbKey := NewDBKey(0xabcdef12, topic, hash) + expected := []byte{0xab, 0xcd, 0xef, 0x12, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30, 0x31, 0x32, 0x01, 0x02, 0x03, 0x04} + require.Equal(t, expected, dbKey.Bytes()) + require.Equal(t, topic, dbKey.Topic()) + require.Equal(t, hash, dbKey.EnvelopeHash()) +} diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 2722fa6f6..2b42b30de 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -198,7 +198,7 @@ func (s *WMailServer) Archive(env *whisper.Envelope) { log.Debug("Archiving envelope", "hash", env.Hash().Hex()) - key := NewDBKey(env.Expiry-env.TTL, env.Hash()) + key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) rawEnvelope, err := rlp.EncodeToBytes(env) if err != nil { log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) @@ -299,14 +299,14 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) iter := s.createIterator(lower, upper, cursor) defer iter.Release() - bundles := make(chan []*whisper.Envelope, 5) + bundles := make(chan []rlp.RawValue, 5) errCh := make(chan error) cancelProcessing := make(chan struct{}) go func() { counter := 0 for bundle := range bundles { - if err := s.sendEnvelopes(peer, bundle, batch); err != nil { + if err := s.sendRawEnvelopes(peer, bundle, batch); err != nil { close(cancelProcessing) errCh <- err break @@ -395,14 +395,14 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque iter := s.createIterator(request.Lower, request.Upper, request.Cursor) defer iter.Release() - bundles := make(chan []*whisper.Envelope, 5) + bundles := make(chan []rlp.RawValue, 5) errCh := make(chan error) cancelProcessing := make(chan struct{}) go func() { for bundle := range bundles { - resp := whisper.SyncResponse{Envelopes: bundle} - if err := s.w.SendSyncResponse(peer, resp); err != nil { + resp := whisper.RawSyncResponse{Envelopes: bundle} + if err := s.w.SendRawSyncResponse(peer, resp); err != nil { close(cancelProcessing) errCh <- fmt.Errorf("failed to send sync response: %v", err) break @@ -475,16 +475,17 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool { func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterator.Iterator { var ( - emptyHash common.Hash - ku, kl *DBKey + emptyHash common.Hash + emptyTopic whisper.TopicType + ku, kl *DBKey ) - ku = NewDBKey(upper+1, emptyHash) - kl = NewDBKey(lower, emptyHash) + ku = NewDBKey(upper+1, emptyTopic, emptyHash) + kl = NewDBKey(lower, emptyTopic, emptyHash) i := s.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) // seek to the end as we want to return envelopes in a descending order - if len(cursor) == DBKeyLength { + if len(cursor) == CursorLength { i.Seek(cursor) } return i @@ -498,13 +499,13 @@ func (s *WMailServer) processRequestInBundles( limit int, timeout time.Duration, requestID string, - output chan<- []*whisper.Envelope, + output chan<- []rlp.RawValue, cancel <-chan struct{}, ) ([]byte, common.Hash) { var ( - bundle []*whisper.Envelope + bundle []rlp.RawValue bundleSize uint32 - batches [][]*whisper.Envelope + batches [][]rlp.RawValue processedEnvelopes int processedEnvelopesSize int64 nextCursor []byte @@ -522,29 +523,41 @@ func (s *WMailServer) processRequestInBundles( // Otherwise publish what you have so far, reset the bundle to the // current envelope, and leave if we hit the limit for iter.Next() { - var envelope whisper.Envelope - decodeErr := rlp.DecodeBytes(iter.Value(), &envelope) - if decodeErr != nil { - log.Error("[mailserver:processRequestInBundles] failed to decode RLP", - "err", decodeErr, - "requestID", requestID) + rawValue := make([]byte, len(iter.Value())) + copy(rawValue, iter.Value()) + + key := &DBKey{ + raw: iter.Key(), + } + + var envelopeBloom []byte + // Old key, we extract the topic from the envelope + if len(key.Bytes()) != DBKeyLength { + var err error + envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue) + if err != nil { + log.Error("[mailserver:processRequestInBundles] failed to decode RLP", + "err", err, + "requestID", requestID) + continue + } + } else { + envelopeBloom = whisper.TopicToBloom(key.Topic()) + } + if !whisper.BloomFilterMatch(bloom, envelopeBloom) { continue } - if !whisper.BloomFilterMatch(bloom, envelope.Bloom()) { - continue - } - - lastEnvelopeHash = envelope.Hash() + lastEnvelopeHash = key.EnvelopeHash() processedEnvelopes++ - envelopeSize := whisper.EnvelopeHeaderLength + uint32(len(envelope.Data)) + envelopeSize := uint32(len(rawValue)) limitReached := processedEnvelopes == limit newSize := bundleSize + envelopeSize // If we still have some room for messages, add and continue if !limitReached && newSize < s.w.MaxMessageSize() { - bundle = append(bundle, &envelope) + bundle = append(bundle, rawValue) bundleSize = newSize continue } @@ -557,12 +570,12 @@ func (s *WMailServer) processRequestInBundles( } // Reset the bundle with the current envelope - bundle = []*whisper.Envelope{&envelope} + bundle = []rlp.RawValue{rawValue} bundleSize = envelopeSize // Leave if we reached the limit if limitReached { - nextCursor = iter.Key() + nextCursor = key.Cursor() break } } @@ -608,16 +621,16 @@ func (s *WMailServer) processRequestInBundles( return nextCursor, lastEnvelopeHash } -func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Envelope, batch bool) error { +func (s *WMailServer) sendRawEnvelopes(peer *whisper.Peer, envelopes []rlp.RawValue, batch bool) error { start := time.Now() defer requestProcessNetTimer.UpdateSince(start) if batch { - return s.w.SendP2PDirect(peer, envelopes...) + return s.w.SendRawP2PDirect(peer, envelopes...) } for _, env := range envelopes { - if err := s.w.SendP2PDirect(peer, env); err != nil { + if err := s.w.SendRawP2PDirect(peer, env); err != nil { return err } } @@ -792,3 +805,12 @@ func peerIDString(peer peerWithID) string { func peerIDBytesString(id []byte) string { return fmt.Sprintf("%x", id) } + +func extractBloomFromEncodedEnvelope(rawValue rlp.RawValue) ([]byte, error) { + var envelope whisper.Envelope + decodeErr := rlp.DecodeBytes(rawValue, &envelope) + if decodeErr != nil { + return nil, decodeErr + } + return envelope.Bloom(), nil +} diff --git a/mailserver/mailserver_test.go b/mailserver/mailserver_test.go index 901209de4..8037b53a3 100644 --- a/mailserver/mailserver_test.go +++ b/mailserver/mailserver_test.go @@ -266,7 +266,7 @@ func (s *MailserverSuite) TestArchive() { s.NoError(err) s.server.Archive(env) - key := NewDBKey(env.Expiry-env.TTL, env.Hash()) + key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) archivedEnvelope, err := s.server.db.Get(key.Bytes(), nil) s.NoError(err) @@ -287,8 +287,9 @@ func (s *MailserverSuite) TestManageLimits() { func (s *MailserverSuite) TestDBKey() { var h common.Hash + var emptyTopic whisper.TopicType i := uint32(time.Now().Unix()) - k := NewDBKey(i, h) + k := NewDBKey(i, emptyTopic, h) s.Equal(len(k.Bytes()), DBKeyLength, "wrong DB key length") s.Equal(byte(i%0x100), k.Bytes()[3], "raw representation should be big endian") s.Equal(byte(i/0x1000000), k.Bytes()[0], "big endian expected") @@ -313,8 +314,8 @@ func (s *MailserverSuite) TestRequestPaginationLimit() { env, err := generateEnvelope(sentTime) s.NoError(err) s.server.Archive(env) - key := NewDBKey(env.Expiry-env.TTL, env.Hash()) - archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.Bytes())) + key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) + archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.Cursor())) sentEnvelopes = append(sentEnvelopes, env) sentHashes = append(sentHashes, env.Hash()) } @@ -523,7 +524,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Verify func( iterator.Iterator, time.Duration, // processRequestInBundles timeout - chan []*whisper.Envelope, + chan []rlp.RawValue, ) }{ { @@ -532,7 +533,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Verify: func( iter iterator.Iterator, timeout time.Duration, - bundles chan []*whisper.Envelope, + bundles chan []rlp.RawValue, ) { done := make(chan struct{}) processFinished := make(chan struct{}) @@ -556,7 +557,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Verify: func( iter iterator.Iterator, timeout time.Duration, - bundles chan []*whisper.Envelope, + bundles chan []rlp.RawValue, ) { done := make(chan struct{}) // won't be closed because we test timeout of `processRequestInBundles()` processFinished := make(chan struct{}) @@ -582,7 +583,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { // Nothing reads from this unbuffered channel which simulates a situation // when a connection between a peer and mail server was dropped. - bundles := make(chan []*whisper.Envelope) + bundles := make(chan []rlp.RawValue) tc.Verify(iter, tc.Timeout, bundles) }) @@ -778,13 +779,19 @@ func processRequestAndCollectHashes( ) ([]common.Hash, []byte, common.Hash) { iter := server.createIterator(lower, upper, cursor) defer iter.Release() - bundles := make(chan []*whisper.Envelope, 10) + bundles := make(chan []rlp.RawValue, 10) done := make(chan struct{}) var hashes []common.Hash go func() { for bundle := range bundles { - for _, env := range bundle { + for _, rawEnvelope := range bundle { + + var env *whisper.Envelope + if err := rlp.DecodeBytes(rawEnvelope, &env); err != nil { + panic(err) + } + hashes = append(hashes, env.Hash()) } } diff --git a/services/shhext/api.go b/services/shhext/api.go index 0d5b8fe72..7bac86228 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -750,8 +750,9 @@ func makeMessagesRequestPayload(r MessagesRequest) ([]byte, error) { if err != nil { return nil, fmt.Errorf("invalid cursor: %v", err) } - if len(cursor) > 0 && len(cursor) != mailserver.DBKeyLength { - return nil, fmt.Errorf("invalid cursor size: expected %d but got %d", mailserver.DBKeyLength, len(cursor)) + + if len(cursor) > 0 && len(cursor) != mailserver.CursorLength { + return nil, fmt.Errorf("invalid cursor size: expected %d but got %d", mailserver.CursorLength, len(cursor)) } payload := mailserver.MessagesRequestPayload{ diff --git a/services/shhext/api_test.go b/services/shhext/api_test.go index 9d007eedf..9dd211374 100644 --- a/services/shhext/api_test.go +++ b/services/shhext/api_test.go @@ -61,6 +61,7 @@ func TestMessagesRequest_setDefaults(t *testing.T) { } func TestMakeMessagesRequestPayload(t *testing.T) { + var emptyTopic whisper.TopicType testCases := []struct { Name string Req MessagesRequest @@ -74,12 +75,12 @@ func TestMakeMessagesRequestPayload(t *testing.T) { { Name: "invalid cursor size", Req: MessagesRequest{Cursor: hex.EncodeToString([]byte{0x01, 0x02, 0x03})}, - Err: fmt.Sprintf("invalid cursor size: expected %d but got 3", mailserver.DBKeyLength), + Err: fmt.Sprintf("invalid cursor size: expected %d but got 3", mailserver.CursorLength), }, { Name: "valid cursor", Req: MessagesRequest{ - Cursor: hex.EncodeToString(mailserver.NewDBKey(123, common.Hash{}).Bytes()), + Cursor: hex.EncodeToString(mailserver.NewDBKey(123, emptyTopic, common.Hash{}).Cursor()), }, Err: "", }, diff --git a/vendor/github.com/status-im/whisper/whisperv6/doc.go b/vendor/github.com/status-im/whisper/whisperv6/doc.go index 75b71572f..97733ed19 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/doc.go +++ b/vendor/github.com/status-im/whisper/whisperv6/doc.go @@ -142,6 +142,15 @@ type SyncResponse struct { Error string } +// RawSyncResponse is a struct representing a response sent to the peer +// asking for syncing archived envelopes. +type RawSyncResponse struct { + Envelopes []rlp.RawValue + Cursor []byte + Final bool // if true it means all envelopes were processed + Error string +} + // MessagesResponse sent as a response after processing batch of envelopes. type MessagesResponse struct { // Hash is a hash of all envelopes sent in the single batch. diff --git a/vendor/github.com/status-im/whisper/whisperv6/whisper.go b/vendor/github.com/status-im/whisper/whisperv6/whisper.go index 15cef7dcb..5edc3c8eb 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/whisper.go +++ b/vendor/github.com/status-im/whisper/whisperv6/whisper.go @@ -493,6 +493,11 @@ func (whisper *Whisper) SendSyncResponse(p *Peer, data SyncResponse) error { return p2p.Send(p.ws, p2pSyncResponseCode, data) } +// SendRawSyncResponse sends a response to a Mail Server with a slice of envelopes. +func (whisper *Whisper) SendRawSyncResponse(p *Peer, data RawSyncResponse) error { + return p2p.Send(p.ws, p2pSyncResponseCode, data) +} + // SendP2PMessage sends a peer-to-peer message to a specific peer. func (whisper *Whisper) SendP2PMessage(peerID []byte, envelopes ...*Envelope) error { p, err := whisper.getPeer(peerID) @@ -513,6 +518,17 @@ func (whisper *Whisper) SendP2PDirect(peer *Peer, envelopes ...*Envelope) error return p2p.Send(peer.ws, p2pMessageCode, envelopes) } +// SendRawP2PDirect sends a peer-to-peer message to a specific peer. +// If only a single envelope is given, data is sent as a single object +// rather than a slice. This is important to keep this method backward compatible +// as it used to send only single envelopes. +func (whisper *Whisper) SendRawP2PDirect(peer *Peer, envelopes ...rlp.RawValue) error { + if len(envelopes) == 1 { + return p2p.Send(peer.ws, p2pMessageCode, envelopes[0]) + } + return p2p.Send(peer.ws, p2pMessageCode, envelopes) +} + // NewKeyPair generates a new cryptographic identity for the client, and injects // it into the known identities for message decryption. Returns ID of the new key pair. func (whisper *Whisper) NewKeyPair() (string, error) {