Publish rlp.RawValue instead of envelope (#1459)

As part of a performance profiling of mailserver we noticed that most of
the resources on a query are spend decoding the whisper envelope.
This PR changes the way we store envelopes encoding the Topic into the
database key, so we can check that and we are able to publish the
envelope rawValue if it matches.
The change is backward compatible as only newly added envelopes will
have the new key, while old ones will have to be unmarshaled.
This commit is contained in:
Andrea Maria Piana 2019-05-09 12:58:02 +02:00 committed by GitHub
parent 7002311f96
commit 061f10e58d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 155 additions and 78 deletions

6
Gopkg.lock generated
View File

@ -846,12 +846,12 @@
version = "v1.1.0"
[[projects]]
digest = "1:d499fd4787bb7a4a5f6fe9f75a517346d70e1e4ab3dbcc83ed85151833e3493d"
digest = "1:0612400565fd528de0ca1a2908a9b18e9fbc33add226df524cf404bf091d87c8"
name = "github.com/status-im/whisper"
packages = ["whisperv6"]
pruneopts = "NUT"
revision = "4fae75da94b1ab6dc13a5fa7d5087bfbfa04406f"
version = "v1.4.12"
revision = "0b742129500f4f54ddbf15c02ad53d4f7101d2a8"
version = "v1.4.13"
[[projects]]
digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e"

View File

@ -46,7 +46,7 @@
[[constraint]]
name = "github.com/status-im/whisper"
version = "=v1.4.12"
version = "=v1.4.13"
[[constraint]]
name = "golang.org/x/text"

View File

@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
@ -86,8 +87,9 @@ func (c *dbCleaner) schedule(period time.Duration, cancel <-chan struct{}) {
// and returns how many have been removed.
func (c *dbCleaner) PruneEntriesOlderThan(t time.Time) (int, error) {
var zero common.Hash
kl := NewDBKey(0, zero)
ku := NewDBKey(uint32(t.Unix()), zero)
var emptyTopic whisper.TopicType
kl := NewDBKey(0, emptyTopic, zero)
ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero)
i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil)
defer i.Release()

View File

@ -115,13 +115,14 @@ func testMessagesCount(t *testing.T, expected int, s *WMailServer) {
func countMessages(t *testing.T, db dbImpl) int {
var (
count int
zero common.Hash
count int
zero common.Hash
emptyTopic whisper.TopicType
)
now := time.Now()
kl := NewDBKey(uint32(0), zero)
ku := NewDBKey(uint32(now.Unix()), zero)
kl := NewDBKey(uint32(0), emptyTopic, zero)
ku := NewDBKey(uint32(now.Unix()), emptyTopic, zero)
i := db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
defer i.Release()

View File

@ -5,11 +5,13 @@ import (
"errors"
"github.com/ethereum/go-ethereum/common"
whisper "github.com/status-im/whisper/whisperv6"
)
const (
// DBKeyLength is a size of the envelope key.
DBKeyLength = common.HashLength + timestampLength
DBKeyLength = common.HashLength + timestampLength + whisper.TopicLength
CursorLength = common.HashLength + timestampLength
)
var (
@ -20,9 +22,7 @@ var (
// DBKey key to be stored in a db.
type DBKey struct {
timestamp uint32
hash common.Hash
raw []byte
raw []byte
}
// Bytes returns a bytes representation of the DBKey.
@ -30,26 +30,25 @@ func (k *DBKey) Bytes() []byte {
return k.raw
}
func (k *DBKey) Topic() whisper.TopicType {
return whisper.BytesToTopic(k.raw[timestampLength+common.HashLength:])
}
func (k *DBKey) EnvelopeHash() common.Hash {
return common.BytesToHash(k.raw[timestampLength : common.HashLength+timestampLength])
}
func (k *DBKey) Cursor() []byte {
// We don't use the whole cursor for backward compatibility (also it's not needed)
return k.raw[:CursorLength]
}
// NewDBKey creates a new DBKey with the given values.
func NewDBKey(timestamp uint32, h common.Hash) *DBKey {
func NewDBKey(timestamp uint32, topic whisper.TopicType, h common.Hash) *DBKey {
var k DBKey
k.timestamp = timestamp
k.hash = h
k.raw = make([]byte, DBKeyLength)
binary.BigEndian.PutUint32(k.raw, k.timestamp)
copy(k.raw[4:], k.hash[:])
binary.BigEndian.PutUint32(k.raw, timestamp)
copy(k.raw[timestampLength:], h[:])
copy(k.raw[timestampLength+common.HashLength:], topic[:])
return &k
}
// NewDBKeyFromBytes creates a DBKey from a byte slice.
func NewDBKeyFromBytes(b []byte) (*DBKey, error) {
if len(b) != DBKeyLength {
return nil, ErrInvalidByteSize
}
return &DBKey{
raw: b,
timestamp: binary.BigEndian.Uint32(b),
hash: common.BytesToHash(b[4:]),
}, nil
}

19
mailserver/db_key_test.go Normal file
View File

@ -0,0 +1,19 @@
package mailserver
import (
"github.com/ethereum/go-ethereum/common"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/require"
"testing"
)
func TestNewDBKey(t *testing.T) {
topic := whisper.BytesToTopic([]byte{0x01, 0x02, 0x03, 0x04})
hash := common.BytesToHash([]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30, 0x31, 0x32})
dbKey := NewDBKey(0xabcdef12, topic, hash)
expected := []byte{0xab, 0xcd, 0xef, 0x12, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30, 0x31, 0x32, 0x01, 0x02, 0x03, 0x04}
require.Equal(t, expected, dbKey.Bytes())
require.Equal(t, topic, dbKey.Topic())
require.Equal(t, hash, dbKey.EnvelopeHash())
}

View File

@ -198,7 +198,7 @@ func (s *WMailServer) Archive(env *whisper.Envelope) {
log.Debug("Archiving envelope", "hash", env.Hash().Hex())
key := NewDBKey(env.Expiry-env.TTL, env.Hash())
key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash())
rawEnvelope, err := rlp.EncodeToBytes(env)
if err != nil {
log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
@ -299,14 +299,14 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
iter := s.createIterator(lower, upper, cursor)
defer iter.Release()
bundles := make(chan []*whisper.Envelope, 5)
bundles := make(chan []rlp.RawValue, 5)
errCh := make(chan error)
cancelProcessing := make(chan struct{})
go func() {
counter := 0
for bundle := range bundles {
if err := s.sendEnvelopes(peer, bundle, batch); err != nil {
if err := s.sendRawEnvelopes(peer, bundle, batch); err != nil {
close(cancelProcessing)
errCh <- err
break
@ -395,14 +395,14 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque
iter := s.createIterator(request.Lower, request.Upper, request.Cursor)
defer iter.Release()
bundles := make(chan []*whisper.Envelope, 5)
bundles := make(chan []rlp.RawValue, 5)
errCh := make(chan error)
cancelProcessing := make(chan struct{})
go func() {
for bundle := range bundles {
resp := whisper.SyncResponse{Envelopes: bundle}
if err := s.w.SendSyncResponse(peer, resp); err != nil {
resp := whisper.RawSyncResponse{Envelopes: bundle}
if err := s.w.SendRawSyncResponse(peer, resp); err != nil {
close(cancelProcessing)
errCh <- fmt.Errorf("failed to send sync response: %v", err)
break
@ -475,16 +475,17 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool {
func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterator.Iterator {
var (
emptyHash common.Hash
ku, kl *DBKey
emptyHash common.Hash
emptyTopic whisper.TopicType
ku, kl *DBKey
)
ku = NewDBKey(upper+1, emptyHash)
kl = NewDBKey(lower, emptyHash)
ku = NewDBKey(upper+1, emptyTopic, emptyHash)
kl = NewDBKey(lower, emptyTopic, emptyHash)
i := s.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil)
// seek to the end as we want to return envelopes in a descending order
if len(cursor) == DBKeyLength {
if len(cursor) == CursorLength {
i.Seek(cursor)
}
return i
@ -498,13 +499,13 @@ func (s *WMailServer) processRequestInBundles(
limit int,
timeout time.Duration,
requestID string,
output chan<- []*whisper.Envelope,
output chan<- []rlp.RawValue,
cancel <-chan struct{},
) ([]byte, common.Hash) {
var (
bundle []*whisper.Envelope
bundle []rlp.RawValue
bundleSize uint32
batches [][]*whisper.Envelope
batches [][]rlp.RawValue
processedEnvelopes int
processedEnvelopesSize int64
nextCursor []byte
@ -522,29 +523,41 @@ func (s *WMailServer) processRequestInBundles(
// Otherwise publish what you have so far, reset the bundle to the
// current envelope, and leave if we hit the limit
for iter.Next() {
var envelope whisper.Envelope
decodeErr := rlp.DecodeBytes(iter.Value(), &envelope)
if decodeErr != nil {
log.Error("[mailserver:processRequestInBundles] failed to decode RLP",
"err", decodeErr,
"requestID", requestID)
rawValue := make([]byte, len(iter.Value()))
copy(rawValue, iter.Value())
key := &DBKey{
raw: iter.Key(),
}
var envelopeBloom []byte
// Old key, we extract the topic from the envelope
if len(key.Bytes()) != DBKeyLength {
var err error
envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue)
if err != nil {
log.Error("[mailserver:processRequestInBundles] failed to decode RLP",
"err", err,
"requestID", requestID)
continue
}
} else {
envelopeBloom = whisper.TopicToBloom(key.Topic())
}
if !whisper.BloomFilterMatch(bloom, envelopeBloom) {
continue
}
if !whisper.BloomFilterMatch(bloom, envelope.Bloom()) {
continue
}
lastEnvelopeHash = envelope.Hash()
lastEnvelopeHash = key.EnvelopeHash()
processedEnvelopes++
envelopeSize := whisper.EnvelopeHeaderLength + uint32(len(envelope.Data))
envelopeSize := uint32(len(rawValue))
limitReached := processedEnvelopes == limit
newSize := bundleSize + envelopeSize
// If we still have some room for messages, add and continue
if !limitReached && newSize < s.w.MaxMessageSize() {
bundle = append(bundle, &envelope)
bundle = append(bundle, rawValue)
bundleSize = newSize
continue
}
@ -557,12 +570,12 @@ func (s *WMailServer) processRequestInBundles(
}
// Reset the bundle with the current envelope
bundle = []*whisper.Envelope{&envelope}
bundle = []rlp.RawValue{rawValue}
bundleSize = envelopeSize
// Leave if we reached the limit
if limitReached {
nextCursor = iter.Key()
nextCursor = key.Cursor()
break
}
}
@ -608,16 +621,16 @@ func (s *WMailServer) processRequestInBundles(
return nextCursor, lastEnvelopeHash
}
func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Envelope, batch bool) error {
func (s *WMailServer) sendRawEnvelopes(peer *whisper.Peer, envelopes []rlp.RawValue, batch bool) error {
start := time.Now()
defer requestProcessNetTimer.UpdateSince(start)
if batch {
return s.w.SendP2PDirect(peer, envelopes...)
return s.w.SendRawP2PDirect(peer, envelopes...)
}
for _, env := range envelopes {
if err := s.w.SendP2PDirect(peer, env); err != nil {
if err := s.w.SendRawP2PDirect(peer, env); err != nil {
return err
}
}
@ -792,3 +805,12 @@ func peerIDString(peer peerWithID) string {
func peerIDBytesString(id []byte) string {
return fmt.Sprintf("%x", id)
}
func extractBloomFromEncodedEnvelope(rawValue rlp.RawValue) ([]byte, error) {
var envelope whisper.Envelope
decodeErr := rlp.DecodeBytes(rawValue, &envelope)
if decodeErr != nil {
return nil, decodeErr
}
return envelope.Bloom(), nil
}

View File

@ -266,7 +266,7 @@ func (s *MailserverSuite) TestArchive() {
s.NoError(err)
s.server.Archive(env)
key := NewDBKey(env.Expiry-env.TTL, env.Hash())
key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash())
archivedEnvelope, err := s.server.db.Get(key.Bytes(), nil)
s.NoError(err)
@ -287,8 +287,9 @@ func (s *MailserverSuite) TestManageLimits() {
func (s *MailserverSuite) TestDBKey() {
var h common.Hash
var emptyTopic whisper.TopicType
i := uint32(time.Now().Unix())
k := NewDBKey(i, h)
k := NewDBKey(i, emptyTopic, h)
s.Equal(len(k.Bytes()), DBKeyLength, "wrong DB key length")
s.Equal(byte(i%0x100), k.Bytes()[3], "raw representation should be big endian")
s.Equal(byte(i/0x1000000), k.Bytes()[0], "big endian expected")
@ -313,8 +314,8 @@ func (s *MailserverSuite) TestRequestPaginationLimit() {
env, err := generateEnvelope(sentTime)
s.NoError(err)
s.server.Archive(env)
key := NewDBKey(env.Expiry-env.TTL, env.Hash())
archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.Bytes()))
key := NewDBKey(env.Expiry-env.TTL, env.Topic, env.Hash())
archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.Cursor()))
sentEnvelopes = append(sentEnvelopes, env)
sentHashes = append(sentHashes, env.Hash())
}
@ -523,7 +524,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
Verify func(
iterator.Iterator,
time.Duration, // processRequestInBundles timeout
chan []*whisper.Envelope,
chan []rlp.RawValue,
)
}{
{
@ -532,7 +533,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
Verify: func(
iter iterator.Iterator,
timeout time.Duration,
bundles chan []*whisper.Envelope,
bundles chan []rlp.RawValue,
) {
done := make(chan struct{})
processFinished := make(chan struct{})
@ -556,7 +557,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
Verify: func(
iter iterator.Iterator,
timeout time.Duration,
bundles chan []*whisper.Envelope,
bundles chan []rlp.RawValue,
) {
done := make(chan struct{}) // won't be closed because we test timeout of `processRequestInBundles()`
processFinished := make(chan struct{})
@ -582,7 +583,7 @@ func (s *MailserverSuite) TestProcessRequestDeadlockHandling() {
// Nothing reads from this unbuffered channel which simulates a situation
// when a connection between a peer and mail server was dropped.
bundles := make(chan []*whisper.Envelope)
bundles := make(chan []rlp.RawValue)
tc.Verify(iter, tc.Timeout, bundles)
})
@ -778,13 +779,19 @@ func processRequestAndCollectHashes(
) ([]common.Hash, []byte, common.Hash) {
iter := server.createIterator(lower, upper, cursor)
defer iter.Release()
bundles := make(chan []*whisper.Envelope, 10)
bundles := make(chan []rlp.RawValue, 10)
done := make(chan struct{})
var hashes []common.Hash
go func() {
for bundle := range bundles {
for _, env := range bundle {
for _, rawEnvelope := range bundle {
var env *whisper.Envelope
if err := rlp.DecodeBytes(rawEnvelope, &env); err != nil {
panic(err)
}
hashes = append(hashes, env.Hash())
}
}

View File

@ -750,8 +750,9 @@ func makeMessagesRequestPayload(r MessagesRequest) ([]byte, error) {
if err != nil {
return nil, fmt.Errorf("invalid cursor: %v", err)
}
if len(cursor) > 0 && len(cursor) != mailserver.DBKeyLength {
return nil, fmt.Errorf("invalid cursor size: expected %d but got %d", mailserver.DBKeyLength, len(cursor))
if len(cursor) > 0 && len(cursor) != mailserver.CursorLength {
return nil, fmt.Errorf("invalid cursor size: expected %d but got %d", mailserver.CursorLength, len(cursor))
}
payload := mailserver.MessagesRequestPayload{

View File

@ -61,6 +61,7 @@ func TestMessagesRequest_setDefaults(t *testing.T) {
}
func TestMakeMessagesRequestPayload(t *testing.T) {
var emptyTopic whisper.TopicType
testCases := []struct {
Name string
Req MessagesRequest
@ -74,12 +75,12 @@ func TestMakeMessagesRequestPayload(t *testing.T) {
{
Name: "invalid cursor size",
Req: MessagesRequest{Cursor: hex.EncodeToString([]byte{0x01, 0x02, 0x03})},
Err: fmt.Sprintf("invalid cursor size: expected %d but got 3", mailserver.DBKeyLength),
Err: fmt.Sprintf("invalid cursor size: expected %d but got 3", mailserver.CursorLength),
},
{
Name: "valid cursor",
Req: MessagesRequest{
Cursor: hex.EncodeToString(mailserver.NewDBKey(123, common.Hash{}).Bytes()),
Cursor: hex.EncodeToString(mailserver.NewDBKey(123, emptyTopic, common.Hash{}).Cursor()),
},
Err: "",
},

View File

@ -142,6 +142,15 @@ type SyncResponse struct {
Error string
}
// RawSyncResponse is a struct representing a response sent to the peer
// asking for syncing archived envelopes.
type RawSyncResponse struct {
Envelopes []rlp.RawValue
Cursor []byte
Final bool // if true it means all envelopes were processed
Error string
}
// MessagesResponse sent as a response after processing batch of envelopes.
type MessagesResponse struct {
// Hash is a hash of all envelopes sent in the single batch.

View File

@ -493,6 +493,11 @@ func (whisper *Whisper) SendSyncResponse(p *Peer, data SyncResponse) error {
return p2p.Send(p.ws, p2pSyncResponseCode, data)
}
// SendRawSyncResponse sends a response to a Mail Server with a slice of envelopes.
func (whisper *Whisper) SendRawSyncResponse(p *Peer, data RawSyncResponse) error {
return p2p.Send(p.ws, p2pSyncResponseCode, data)
}
// SendP2PMessage sends a peer-to-peer message to a specific peer.
func (whisper *Whisper) SendP2PMessage(peerID []byte, envelopes ...*Envelope) error {
p, err := whisper.getPeer(peerID)
@ -513,6 +518,17 @@ func (whisper *Whisper) SendP2PDirect(peer *Peer, envelopes ...*Envelope) error
return p2p.Send(peer.ws, p2pMessageCode, envelopes)
}
// SendRawP2PDirect sends a peer-to-peer message to a specific peer.
// If only a single envelope is given, data is sent as a single object
// rather than a slice. This is important to keep this method backward compatible
// as it used to send only single envelopes.
func (whisper *Whisper) SendRawP2PDirect(peer *Peer, envelopes ...rlp.RawValue) error {
if len(envelopes) == 1 {
return p2p.Send(peer.ws, p2pMessageCode, envelopes[0])
}
return p2p.Send(peer.ws, p2pMessageCode, envelopes)
}
// NewKeyPair generates a new cryptographic identity for the client, and injects
// it into the known identities for message decryption. Returns ID of the new key pair.
func (whisper *Whisper) NewKeyPair() (string, error) {