From 061f10e58dcecd61c1d349bf9546fa7a2e84b962 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Thu, 9 May 2019 12:58:02 +0200 Subject: [PATCH] Publish rlp.RawValue instead of envelope (#1459) As part of a performance profiling of mailserver we noticed that most of the resources on a query are spend decoding the whisper envelope. This PR changes the way we store envelopes encoding the Topic into the database key, so we can check that and we are able to publish the envelope rawValue if it matches. The change is backward compatible as only newly added envelopes will have the new key, while old ones will have to be unmarshaled. --- Gopkg.lock | 6 +- Gopkg.toml | 2 +- mailserver/cleaner.go | 6 +- mailserver/cleaner_test.go | 9 +- mailserver/db_key.go | 43 +++++----- mailserver/db_key_test.go | 19 ++++ mailserver/mailserver.go | 86 ++++++++++++------- mailserver/mailserver_test.go | 27 +++--- services/shhext/api.go | 5 +- services/shhext/api_test.go | 5 +- .../status-im/whisper/whisperv6/doc.go | 9 ++ .../status-im/whisper/whisperv6/whisper.go | 16 ++++ 12 files changed, 155 insertions(+), 78 deletions(-) create mode 100644 mailserver/db_key_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 5e9790e6d..78baaec1e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -846,12 +846,12 @@ version = "v1.1.0" [[projects]] - digest = "1:d499fd4787bb7a4a5f6fe9f75a517346d70e1e4ab3dbcc83ed85151833e3493d" + digest = "1:0612400565fd528de0ca1a2908a9b18e9fbc33add226df524cf404bf091d87c8" name = "github.com/status-im/whisper" packages = ["whisperv6"] pruneopts = "NUT" - revision = "4fae75da94b1ab6dc13a5fa7d5087bfbfa04406f" - version = "v1.4.12" + revision = "0b742129500f4f54ddbf15c02ad53d4f7101d2a8" + version = "v1.4.13" [[projects]] digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" diff --git a/Gopkg.toml b/Gopkg.toml index 1e8cfd49e..21c8dc2b1 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -46,7 +46,7 @@ [[constraint]] name = "github.com/status-im/whisper" - version = "=v1.4.12" + version = "=v1.4.13" [[constraint]] name = "golang.org/x/text" diff --git a/mailserver/cleaner.go b/mailserver/cleaner.go index 531169ab6..f3c36f06a 100644 --- a/mailserver/cleaner.go +++ b/mailserver/cleaner.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + whisper "github.com/status-im/whisper/whisperv6" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/util" @@ -86,8 +87,9 @@ func (c *dbCleaner) schedule(period time.Duration, cancel <-chan struct{}) { // and returns how many have been removed. func (c *dbCleaner) PruneEntriesOlderThan(t time.Time) (int, error) { var zero common.Hash - kl := NewDBKey(0, zero) - ku := NewDBKey(uint32(t.Unix()), zero) + var emptyTopic whisper.TopicType + kl := NewDBKey(0, emptyTopic, zero) + ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero) i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) defer i.Release() diff --git a/mailserver/cleaner_test.go b/mailserver/cleaner_test.go index 285f63899..f90057803 100644 --- a/mailserver/cleaner_test.go +++ b/mailserver/cleaner_test.go @@ -115,13 +115,14 @@ func testMessagesCount(t *testing.T, expected int, s *WMailServer) { func countMessages(t *testing.T, db dbImpl) int { var ( - count int - zero common.Hash + count int + zero common.Hash + emptyTopic whisper.TopicType ) now := time.Now() - kl := NewDBKey(uint32(0), zero) - ku := NewDBKey(uint32(now.Unix()), zero) + kl := NewDBKey(uint32(0), emptyTopic, zero) + ku := NewDBKey(uint32(now.Unix()), emptyTopic, zero) i := db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil) defer i.Release() diff --git a/mailserver/db_key.go b/mailserver/db_key.go index 9ea4bc8ab..1267fdd38 100644 --- a/mailserver/db_key.go +++ b/mailserver/db_key.go @@ -5,11 +5,13 @@ import ( "errors" "github.com/ethereum/go-ethereum/common" + whisper "github.com/status-im/whisper/whisperv6" ) const ( // DBKeyLength is a size of the envelope key. - DBKeyLength = common.HashLength + timestampLength + DBKeyLength = common.HashLength + timestampLength + whisper.TopicLength + CursorLength = common.HashLength + timestampLength ) var ( @@ -20,9 +22,7 @@ var ( // DBKey key to be stored in a db. type DBKey struct { - timestamp uint32 - hash common.Hash - raw []byte + raw []byte } // Bytes returns a bytes representation of the DBKey. @@ -30,26 +30,25 @@ func (k *DBKey) Bytes() []byte { return k.raw } +func (k *DBKey) Topic() whisper.TopicType { + return whisper.BytesToTopic(k.raw[timestampLength+common.HashLength:]) +} + +func (k *DBKey) EnvelopeHash() common.Hash { + return common.BytesToHash(k.raw[timestampLength : common.HashLength+timestampLength]) +} + +func (k *DBKey) Cursor() []byte { + // We don't use the whole cursor for backward compatibility (also it's not needed) + return k.raw[:CursorLength] +} + // NewDBKey creates a new DBKey with the given values. -func NewDBKey(timestamp uint32, h common.Hash) *DBKey { +func NewDBKey(timestamp uint32, topic whisper.TopicType, h common.Hash) *DBKey { var k DBKey - k.timestamp = timestamp - k.hash = h k.raw = make([]byte, DBKeyLength) - binary.BigEndian.PutUint32(k.raw, k.timestamp) - copy(k.raw[4:], k.hash[:]) + binary.BigEndian.PutUint32(k.raw, timestamp) + copy(k.raw[timestampLength:], h[:]) + copy(k.raw[timestampLength+common.HashLength:], topic[:]) return &k } - -// NewDBKeyFromBytes creates a DBKey from a byte slice. -func NewDBKeyFromBytes(b []byte) (*DBKey, error) { - if len(b) != DBKeyLength { - return nil, ErrInvalidByteSize - } - - return &DBKey{ - raw: b, - timestamp: binary.BigEndian.Uint32(b), - hash: common.BytesToHash(b[4:]), - }, nil -} diff --git a/mailserver/db_key_test.go b/mailserver/db_key_test.go new file mode 100644 index 000000000..c64012e02 --- /dev/null +++ b/mailserver/db_key_test.go @@ -0,0 +1,19 @@ +package mailserver + +import ( + "github.com/ethereum/go-ethereum/common" + whisper "github.com/status-im/whisper/whisperv6" + "github.com/stretchr/testify/require" + "testing" +) + +func TestNewDBKey(t *testing.T) { + topic := whisper.BytesToTopic([]byte{0x01, 0x02, 0x03, 0x04}) + + hash := common.BytesToHash([]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30, 0x31, 0x32}) + dbKey := NewDBKey(0xabcdef12, topic, hash) + expected := []byte{0xab, 0xcd, 0xef, 0x12, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30, 0x31, 0x32, 0x01, 0x02, 0x03, 0x04} + require.Equal(t, expected, dbKey.Bytes()) + require.Equal(t, topic, dbKey.Topic()) + require.Equal(t, hash, dbKey.EnvelopeHash()) +} diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 2722fa6f6..2b42b30de 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -198,7 +198,7 @@ func (s *WMailServer) Archive(env *whisper.Envelope) { log.Debug("Archiving envelope", "hash", env.Hash().Hex()) - key := NewDBKey(env.Expiry-env.TTL, env.Hash()) + key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) rawEnvelope, err := rlp.EncodeToBytes(env) if err != nil { log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err)) @@ -299,14 +299,14 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) iter := s.createIterator(lower, upper, cursor) defer iter.Release() - bundles := make(chan []*whisper.Envelope, 5) + bundles := make(chan []rlp.RawValue, 5) errCh := make(chan error) cancelProcessing := make(chan struct{}) go func() { counter := 0 for bundle := range bundles { - if err := s.sendEnvelopes(peer, bundle, batch); err != nil { + if err := s.sendRawEnvelopes(peer, bundle, batch); err != nil { close(cancelProcessing) errCh <- err break @@ -395,14 +395,14 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque iter := s.createIterator(request.Lower, request.Upper, request.Cursor) defer iter.Release() - bundles := make(chan []*whisper.Envelope, 5) + bundles := make(chan []rlp.RawValue, 5) errCh := make(chan error) cancelProcessing := make(chan struct{}) go func() { for bundle := range bundles { - resp := whisper.SyncResponse{Envelopes: bundle} - if err := s.w.SendSyncResponse(peer, resp); err != nil { + resp := whisper.RawSyncResponse{Envelopes: bundle} + if err := s.w.SendRawSyncResponse(peer, resp); err != nil { close(cancelProcessing) errCh <- fmt.Errorf("failed to send sync response: %v", err) break @@ -475,16 +475,17 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool { func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterator.Iterator { var ( - emptyHash common.Hash - ku, kl *DBKey + emptyHash common.Hash + emptyTopic whisper.TopicType + ku, kl *DBKey ) - ku = NewDBKey(upper+1, emptyHash) - kl = NewDBKey(lower, emptyHash) + ku = NewDBKey(upper+1, emptyTopic, emptyHash) + kl = NewDBKey(lower, emptyTopic, emptyHash) i := s.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil) // seek to the end as we want to return envelopes in a descending order - if len(cursor) == DBKeyLength { + if len(cursor) == CursorLength { i.Seek(cursor) } return i @@ -498,13 +499,13 @@ func (s *WMailServer) processRequestInBundles( limit int, timeout time.Duration, requestID string, - output chan<- []*whisper.Envelope, + output chan<- []rlp.RawValue, cancel <-chan struct{}, ) ([]byte, common.Hash) { var ( - bundle []*whisper.Envelope + bundle []rlp.RawValue bundleSize uint32 - batches [][]*whisper.Envelope + batches [][]rlp.RawValue processedEnvelopes int processedEnvelopesSize int64 nextCursor []byte @@ -522,29 +523,41 @@ func (s *WMailServer) processRequestInBundles( // Otherwise publish what you have so far, reset the bundle to the // current envelope, and leave if we hit the limit for iter.Next() { - var envelope whisper.Envelope - decodeErr := rlp.DecodeBytes(iter.Value(), &envelope) - if decodeErr != nil { - log.Error("[mailserver:processRequestInBundles] failed to decode RLP", - "err", decodeErr, - "requestID", requestID) + rawValue := make([]byte, len(iter.Value())) + copy(rawValue, iter.Value()) + + key := &DBKey{ + raw: iter.Key(), + } + + var envelopeBloom []byte + // Old key, we extract the topic from the envelope + if len(key.Bytes()) != DBKeyLength { + var err error + envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue) + if err != nil { + log.Error("[mailserver:processRequestInBundles] failed to decode RLP", + "err", err, + "requestID", requestID) + continue + } + } else { + envelopeBloom = whisper.TopicToBloom(key.Topic()) + } + if !whisper.BloomFilterMatch(bloom, envelopeBloom) { continue } - if !whisper.BloomFilterMatch(bloom, envelope.Bloom()) { - continue - } - - lastEnvelopeHash = envelope.Hash() + lastEnvelopeHash = key.EnvelopeHash() processedEnvelopes++ - envelopeSize := whisper.EnvelopeHeaderLength + uint32(len(envelope.Data)) + envelopeSize := uint32(len(rawValue)) limitReached := processedEnvelopes == limit newSize := bundleSize + envelopeSize // If we still have some room for messages, add and continue if !limitReached && newSize < s.w.MaxMessageSize() { - bundle = append(bundle, &envelope) + bundle = append(bundle, rawValue) bundleSize = newSize continue } @@ -557,12 +570,12 @@ func (s *WMailServer) processRequestInBundles( } // Reset the bundle with the current envelope - bundle = []*whisper.Envelope{&envelope} + bundle = []rlp.RawValue{rawValue} bundleSize = envelopeSize // Leave if we reached the limit if limitReached { - nextCursor = iter.Key() + nextCursor = key.Cursor() break } } @@ -608,16 +621,16 @@ func (s *WMailServer) processRequestInBundles( return nextCursor, lastEnvelopeHash } -func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Envelope, batch bool) error { +func (s *WMailServer) sendRawEnvelopes(peer *whisper.Peer, envelopes []rlp.RawValue, batch bool) error { start := time.Now() defer requestProcessNetTimer.UpdateSince(start) if batch { - return s.w.SendP2PDirect(peer, envelopes...) + return s.w.SendRawP2PDirect(peer, envelopes...) } for _, env := range envelopes { - if err := s.w.SendP2PDirect(peer, env); err != nil { + if err := s.w.SendRawP2PDirect(peer, env); err != nil { return err } } @@ -792,3 +805,12 @@ func peerIDString(peer peerWithID) string { func peerIDBytesString(id []byte) string { return fmt.Sprintf("%x", id) } + +func extractBloomFromEncodedEnvelope(rawValue rlp.RawValue) ([]byte, error) { + var envelope whisper.Envelope + decodeErr := rlp.DecodeBytes(rawValue, &envelope) + if decodeErr != nil { + return nil, decodeErr + } + return envelope.Bloom(), nil +} diff --git a/mailserver/mailserver_test.go b/mailserver/mailserver_test.go index 901209de4..8037b53a3 100644 --- a/mailserver/mailserver_test.go +++ b/mailserver/mailserver_test.go @@ -266,7 +266,7 @@ func (s *MailserverSuite) TestArchive() { s.NoError(err) s.server.Archive(env) - key := NewDBKey(env.Expiry-env.TTL, env.Hash()) + key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) archivedEnvelope, err := s.server.db.Get(key.Bytes(), nil) s.NoError(err) @@ -287,8 +287,9 @@ func (s *MailserverSuite) TestManageLimits() { func (s *MailserverSuite) TestDBKey() { var h common.Hash + var emptyTopic whisper.TopicType i := uint32(time.Now().Unix()) - k := NewDBKey(i, h) + k := NewDBKey(i, emptyTopic, h) s.Equal(len(k.Bytes()), DBKeyLength, "wrong DB key length") s.Equal(byte(i%0x100), k.Bytes()[3], "raw representation should be big endian") s.Equal(byte(i/0x1000000), k.Bytes()[0], "big endian expected") @@ -313,8 +314,8 @@ func (s *MailserverSuite) TestRequestPaginationLimit() { env, err := generateEnvelope(sentTime) s.NoError(err) s.server.Archive(env) - key := NewDBKey(env.Expiry-env.TTL, env.Hash()) - archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.Bytes())) + key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash()) + archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.Cursor())) sentEnvelopes = append(sentEnvelopes, env) sentHashes = append(sentHashes, env.Hash()) } @@ -523,7 +524,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Verify func( iterator.Iterator, time.Duration, // processRequestInBundles timeout - chan []*whisper.Envelope, + chan []rlp.RawValue, ) }{ { @@ -532,7 +533,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Verify: func( iter iterator.Iterator, timeout time.Duration, - bundles chan []*whisper.Envelope, + bundles chan []rlp.RawValue, ) { done := make(chan struct{}) processFinished := make(chan struct{}) @@ -556,7 +557,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { Verify: func( iter iterator.Iterator, timeout time.Duration, - bundles chan []*whisper.Envelope, + bundles chan []rlp.RawValue, ) { done := make(chan struct{}) // won't be closed because we test timeout of `processRequestInBundles()` processFinished := make(chan struct{}) @@ -582,7 +583,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() { // Nothing reads from this unbuffered channel which simulates a situation // when a connection between a peer and mail server was dropped. - bundles := make(chan []*whisper.Envelope) + bundles := make(chan []rlp.RawValue) tc.Verify(iter, tc.Timeout, bundles) }) @@ -778,13 +779,19 @@ func processRequestAndCollectHashes( ) ([]common.Hash, []byte, common.Hash) { iter := server.createIterator(lower, upper, cursor) defer iter.Release() - bundles := make(chan []*whisper.Envelope, 10) + bundles := make(chan []rlp.RawValue, 10) done := make(chan struct{}) var hashes []common.Hash go func() { for bundle := range bundles { - for _, env := range bundle { + for _, rawEnvelope := range bundle { + + var env *whisper.Envelope + if err := rlp.DecodeBytes(rawEnvelope, &env); err != nil { + panic(err) + } + hashes = append(hashes, env.Hash()) } } diff --git a/services/shhext/api.go b/services/shhext/api.go index 0d5b8fe72..7bac86228 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -750,8 +750,9 @@ func makeMessagesRequestPayload(r MessagesRequest) ([]byte, error) { if err != nil { return nil, fmt.Errorf("invalid cursor: %v", err) } - if len(cursor) > 0 && len(cursor) != mailserver.DBKeyLength { - return nil, fmt.Errorf("invalid cursor size: expected %d but got %d", mailserver.DBKeyLength, len(cursor)) + + if len(cursor) > 0 && len(cursor) != mailserver.CursorLength { + return nil, fmt.Errorf("invalid cursor size: expected %d but got %d", mailserver.CursorLength, len(cursor)) } payload := mailserver.MessagesRequestPayload{ diff --git a/services/shhext/api_test.go b/services/shhext/api_test.go index 9d007eedf..9dd211374 100644 --- a/services/shhext/api_test.go +++ b/services/shhext/api_test.go @@ -61,6 +61,7 @@ func TestMessagesRequest_setDefaults(t *testing.T) { } func TestMakeMessagesRequestPayload(t *testing.T) { + var emptyTopic whisper.TopicType testCases := []struct { Name string Req MessagesRequest @@ -74,12 +75,12 @@ func TestMakeMessagesRequestPayload(t *testing.T) { { Name: "invalid cursor size", Req: MessagesRequest{Cursor: hex.EncodeToString([]byte{0x01, 0x02, 0x03})}, - Err: fmt.Sprintf("invalid cursor size: expected %d but got 3", mailserver.DBKeyLength), + Err: fmt.Sprintf("invalid cursor size: expected %d but got 3", mailserver.CursorLength), }, { Name: "valid cursor", Req: MessagesRequest{ - Cursor: hex.EncodeToString(mailserver.NewDBKey(123, common.Hash{}).Bytes()), + Cursor: hex.EncodeToString(mailserver.NewDBKey(123, emptyTopic, common.Hash{}).Cursor()), }, Err: "", }, diff --git a/vendor/github.com/status-im/whisper/whisperv6/doc.go b/vendor/github.com/status-im/whisper/whisperv6/doc.go index 75b71572f..97733ed19 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/doc.go +++ b/vendor/github.com/status-im/whisper/whisperv6/doc.go @@ -142,6 +142,15 @@ type SyncResponse struct { Error string } +// RawSyncResponse is a struct representing a response sent to the peer +// asking for syncing archived envelopes. +type RawSyncResponse struct { + Envelopes []rlp.RawValue + Cursor []byte + Final bool // if true it means all envelopes were processed + Error string +} + // MessagesResponse sent as a response after processing batch of envelopes. type MessagesResponse struct { // Hash is a hash of all envelopes sent in the single batch. diff --git a/vendor/github.com/status-im/whisper/whisperv6/whisper.go b/vendor/github.com/status-im/whisper/whisperv6/whisper.go index 15cef7dcb..5edc3c8eb 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/whisper.go +++ b/vendor/github.com/status-im/whisper/whisperv6/whisper.go @@ -493,6 +493,11 @@ func (whisper *Whisper) SendSyncResponse(p *Peer, data SyncResponse) error { return p2p.Send(p.ws, p2pSyncResponseCode, data) } +// SendRawSyncResponse sends a response to a Mail Server with a slice of envelopes. +func (whisper *Whisper) SendRawSyncResponse(p *Peer, data RawSyncResponse) error { + return p2p.Send(p.ws, p2pSyncResponseCode, data) +} + // SendP2PMessage sends a peer-to-peer message to a specific peer. func (whisper *Whisper) SendP2PMessage(peerID []byte, envelopes ...*Envelope) error { p, err := whisper.getPeer(peerID) @@ -513,6 +518,17 @@ func (whisper *Whisper) SendP2PDirect(peer *Peer, envelopes ...*Envelope) error return p2p.Send(peer.ws, p2pMessageCode, envelopes) } +// SendRawP2PDirect sends a peer-to-peer message to a specific peer. +// If only a single envelope is given, data is sent as a single object +// rather than a slice. This is important to keep this method backward compatible +// as it used to send only single envelopes. +func (whisper *Whisper) SendRawP2PDirect(peer *Peer, envelopes ...rlp.RawValue) error { + if len(envelopes) == 1 { + return p2p.Send(peer.ws, p2pMessageCode, envelopes[0]) + } + return p2p.Send(peer.ws, p2pMessageCode, envelopes) +} + // NewKeyPair generates a new cryptographic identity for the client, and injects // it into the known identities for message decryption. Returns ID of the new key pair. func (whisper *Whisper) NewKeyPair() (string, error) {