Request historical messages in batches (#1245)

This commit is contained in:
Adam Babik 2018-10-19 11:09:13 +02:00 committed by GitHub
parent ba504e99c4
commit a75f9c34cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 304 additions and 90 deletions

View File

@ -27,6 +27,10 @@
version = "=v1.8.16"
source = "github.com/status-im/go-ethereum"
[[constraint]]
name = "github.com/status-im/whisper"
version = "=v1.2.0"
[[override]]
name = "github.com/golang/protobuf"
version = "1.1.0"
@ -161,7 +165,3 @@
[[constraint]]
name = "github.com/status-im/migrate"
branch = "master"
[[constraint]]
version = "v1.2.0"
name = "github.com/status-im/whisper"

View File

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/db"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/shhext"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
@ -220,6 +221,8 @@ func recoverLevelDBPanics(calleMethodName string) {
func (s *WMailServer) Archive(env *whisper.Envelope) {
defer recoverLevelDBPanics("Archive")
log.Debug("Archiving envelope", "hash", env.Hash().Hex())
key := NewDbKey(env.Expiry-env.TTL, env.Hash())
rawEnvelope, err := rlp.EncodeToBytes(env)
if err != nil {
@ -237,7 +240,7 @@ func (s *WMailServer) Archive(env *whisper.Envelope) {
// DeliverMail sends mail to specified whisper peer.
func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) {
log.Info("Delivering mail", "peer", peer.ID)
log.Info("Delivering mail", "peer", peer.ID())
requestsMeter.Mark(1)
if peer == nil {
@ -254,26 +257,62 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
defer recoverLevelDBPanics("DeliverMail")
if lower, upper, bloom, limit, cursor, err := s.validateRequest(peer.ID(), request); err != nil {
var (
lower, upper uint32
bloom []byte
limit uint32
cursor cursorType
batch bool
err error
)
payload, err := s.decodeRequest(peer.ID(), request)
if err == nil {
lower, upper = payload.Lower, payload.Upper
bloom = payload.Bloom
cursor = cursorType(payload.Cursor)
limit = payload.Limit
batch = payload.Batch
} else {
log.Debug("Failed to decode request", "err", err, "peerID", peer.ID())
lower, upper, bloom, limit, cursor, err = s.validateRequest(peer.ID(), request)
}
if err != nil {
requestValidationErrorsCounter.Inc(1)
log.Error("Mailserver request failed validaton", "peerID", peer.ID())
s.trySendHistoricMessageErrorResponse(peer, request, err)
} else {
_, lastEnvelopeHash, nextPageCursor, err := s.processRequest(peer, lower, upper, bloom, limit, cursor)
return
}
if err != nil {
processRequestErrorsCounter.Inc(1)
log.Error("Error while delivering mail to the peer", "err", err, "peerID", peer.ID())
s.trySendHistoricMessageErrorResponse(peer, request, err)
return
}
log.Debug("Processing request",
"lower", lower, "upper", upper,
"bloom", bloom,
"limit", limit,
"cursor", cursor,
"batch", batch)
if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil {
historicResponseErrorsCounter.Inc(1)
log.Error("Error while sending historic message response", "err", err, "peerID", peer.ID())
// we still want to try to report error even it it is a p2p error and it is unlikely
s.trySendHistoricMessageErrorResponse(peer, request, err)
}
_, lastEnvelopeHash, nextPageCursor, err := s.processRequest(
peer,
lower, upper,
bloom,
limit,
cursor,
batch)
if err != nil {
processRequestErrorsCounter.Inc(1)
log.Error("Error while processing mail server request", "err", err, "peerID", peer.ID())
s.trySendHistoricMessageErrorResponse(peer, request, err)
return
}
log.Debug("Sending historic message response", "last", lastEnvelopeHash, "next", nextPageCursor)
if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil {
historicResponseErrorsCounter.Inc(1)
log.Error("Error sending historic message response", "err", err, "peerID", peer.ID())
// we still want to try to report error even it it is a p2p error and it is unlikely
s.trySendHistoricMessageErrorResponse(peer, request, err)
}
}
@ -294,13 +333,41 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool {
return false
}
func (s *WMailServer) createIterator(lower, upper uint32, cursor cursorType) iterator.Iterator {
var (
emptyHash common.Hash
ku []byte
kl []byte
)
kl = NewDbKey(lower, emptyHash).raw
if len(cursor) == dbKeyLength {
ku = cursor
} else {
ku = NewDbKey(upper+1, emptyHash).raw
}
i := s.db.NewIterator(&util.Range{Start: kl, Limit: ku}, nil)
// seek to the end as we want to return envelopes in a descending order
i.Seek(ku)
return i
}
// processRequest processes the current request and re-sends all stored messages
// accomplishing lower and upper limits. The limit parameter determines the maximum number of
// messages to be sent back for the current request.
// The cursor parameter is used for pagination.
// After sending all the messages, a message of type p2pRequestCompleteCode is sent by the mailserver to
// the peer.
func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bloom []byte, limit uint32, cursor cursorType) (ret []*whisper.Envelope, lastEnvelopeHash common.Hash, nextPageCursor cursorType, err error) {
func (s *WMailServer) processRequest(
peer *whisper.Peer,
lower, upper uint32,
bloom []byte,
limit uint32,
cursor cursorType,
batch bool,
) (ret []*whisper.Envelope, lastEnvelopeHash common.Hash, nextPageCursor cursorType, err error) {
// Recover from possible goleveldb panics
defer func() {
if r := recover(); r != nil {
@ -311,50 +378,82 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl
var (
sentEnvelopes uint32
sentEnvelopesSize int64
zero common.Hash
ku []byte
kl []byte
)
kl = NewDbKey(lower, zero).raw
if cursor != nil {
ku = cursor
} else {
ku = NewDbKey(upper+1, zero).raw
}
i := s.db.NewIterator(&util.Range{Start: kl, Limit: ku}, nil)
i.Seek(ku)
i := s.createIterator(lower, upper, cursor)
defer i.Release()
var (
bundle []*whisper.Envelope
bundleSize uint32
)
start := time.Now()
for i.Prev() {
var envelope whisper.Envelope
decodeErr := rlp.DecodeBytes(i.Value(), &envelope)
if decodeErr != nil {
log.Error(fmt.Sprintf("failed to decode RLP: %s", decodeErr))
log.Error("failed to decode RLP", "err", decodeErr)
continue
}
if whisper.BloomFilterMatch(bloom, envelope.Bloom()) {
if peer == nil {
// used for test purposes
ret = append(ret, &envelope)
} else {
err = s.w.SendP2PDirect(peer, &envelope)
if err != nil {
log.Error(fmt.Sprintf("failed to send direct message to peer: %s", err))
return
}
lastEnvelopeHash = envelope.Hash()
}
sentEnvelopes++
sentEnvelopesSize += whisper.EnvelopeHeaderLength + int64(len(envelope.Data))
if !whisper.BloomFilterMatch(bloom, envelope.Bloom()) {
continue
}
if limit != noLimits && sentEnvelopes == limit {
nextPageCursor = i.Key()
break
newSize := bundleSize + whisper.EnvelopeHeaderLength + uint32(len(envelope.Data))
limitReached := limit != noLimits && (int(sentEnvelopes)+len(bundle)) == int(limit)
if !limitReached && newSize < s.w.MaxMessageSize() {
bundle = append(bundle, &envelope)
bundleSize = newSize
lastEnvelopeHash = envelope.Hash()
continue
}
if peer == nil {
// used for test purposes
ret = append(ret, bundle...)
} else {
err = s.sendEnvelopes(peer, bundle, batch)
if err != nil {
return
}
}
sentEnvelopes += uint32(len(bundle))
sentEnvelopesSize += int64(bundleSize)
if limitReached {
bundle = nil
bundleSize = 0
// When the limit is reached, the current retrieved envelope
// is not included in the response.
// The nextPageCursor is a key used as a limit in a range and
// is not included in the range, hence, we need to get
// the previous iterator key.
i.Next()
nextPageCursor = i.Key()
break
} else {
// Reset bundle information and add the last read envelope
// which did not make in the last batch.
bundle = []*whisper.Envelope{&envelope}
bundleSize = whisper.EnvelopeHeaderLength + uint32(len(envelope.Data))
}
lastEnvelopeHash = envelope.Hash()
}
// Send any outstanding envelopes.
if len(bundle) > 0 && bundleSize > 0 {
if peer == nil {
ret = append(ret, bundle...)
} else {
err = s.sendEnvelopes(peer, bundle, batch)
if err != nil {
return
}
}
}
@ -365,12 +464,26 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl
err = i.Error()
if err != nil {
log.Error(fmt.Sprintf("Level DB iterator error: %s", err))
err = fmt.Errorf("levelDB iterator error: %v", err)
}
return
}
func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Envelope, batch bool) error {
if batch {
return s.w.SendP2PDirect(peer, envelopes...)
}
for _, env := range envelopes {
if err := s.w.SendP2PDirect(peer, env); err != nil {
return err
}
}
return nil
}
func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope, lastEnvelopeHash common.Hash, cursor cursorType) error {
payload := whisper.CreateMailServerRequestCompletedPayload(request.Hash(), lastEnvelopeHash, cursor)
return s.w.SendHistoricMessageResponse(peer, payload)
@ -404,8 +517,49 @@ func (s *WMailServer) openEnvelope(request *whisper.Envelope) *whisper.ReceivedM
return nil
}
func (s *WMailServer) decodeRequest(peerID []byte, request *whisper.Envelope) (shhext.MessagesRequestPayload, error) {
var payload shhext.MessagesRequestPayload
if s.pow > 0.0 && request.PoW() < s.pow {
return payload, errors.New("PoW too low")
}
decrypted := s.openEnvelope(request)
if decrypted == nil {
log.Warn("Failed to decrypt p2p request")
return payload, errors.New("failed to decrypt p2p request")
}
if err := s.checkMsgSignature(decrypted, peerID); err != nil {
log.Warn("Check message signature failed: %s", "err", err.Error())
return payload, fmt.Errorf("check message signature failed: %v", err)
}
if err := rlp.DecodeBytes(decrypted.Payload, &payload); err != nil {
return payload, fmt.Errorf("failed to decode data: %v", err)
}
if payload.Upper < payload.Lower {
log.Error("Query range is invalid: lower > upper", "lower", payload.Lower, "upper", payload.Upper)
return payload, errors.New("query range is invalid: lower > upper")
}
lowerTime := time.Unix(int64(payload.Lower), 0)
upperTime := time.Unix(int64(payload.Upper), 0)
if upperTime.Sub(lowerTime) > maxQueryRange {
log.Warn("Query range too long", "peerID", peerID, "length", upperTime.Sub(lowerTime), "max", maxQueryRange)
return payload, fmt.Errorf("query range must be shorted than %d", maxQueryRange)
}
return payload, nil
}
// validateRequest runs different validations on the current request.
func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (uint32, uint32, []byte, uint32, cursorType, error) {
// DEPRECATED
func (s *WMailServer) validateRequest(
peerID []byte,
request *whisper.Envelope,
) (uint32, uint32, []byte, uint32, cursorType, error) {
if s.pow > 0.0 && request.PoW() < s.pow {
return 0, 0, nil, 0, nil, fmt.Errorf("PoW() is too low")
}

View File

@ -54,7 +54,7 @@ func (s *MailServerDBPanicSuite) TestArchive() {
func (s *MailServerDBPanicSuite) TestDeliverMail() {
defer s.testPanicRecover("DeliverMail")
_, _, _, err := s.server.processRequest(nil, 10, 20, []byte{}, 0, nil)
_, _, _, err := s.server.processRequest(nil, 10, 20, []byte{}, 0, nil, false)
s.Error(err)
s.Equal("recovered from panic in processRequest: panicDB panic on NewIterator", err.Error())
}

View File

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/shhext"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/suite"
)
@ -327,7 +328,7 @@ func (s *MailserverSuite) TestRequestPaginationLimit() {
s.Nil(cursor)
s.Equal(params.limit, limit)
envelopes, _, cursor, err := s.server.processRequest(nil, lower, upper, bloom, limit, nil)
envelopes, _, cursor, err := s.server.processRequest(nil, lower, upper, bloom, limit, nil, false)
s.NoError(err)
for _, env := range envelopes {
receivedHashes = append(receivedHashes, env.Hash())
@ -339,12 +340,12 @@ func (s *MailserverSuite) TestRequestPaginationLimit() {
s.Equal(limit, uint32(len(receivedHashes)))
// the 6 envelopes received should be in descending order
s.Equal(reverseSentHashes[:limit], receivedHashes)
// cursor should be the key of the first envelope of the next page
// cursor should be the key of the last envelope of the last page
s.Equal(archiveKeys[count-limit], fmt.Sprintf("%x", cursor))
// second page
receivedHashes = []common.Hash{}
envelopes, _, cursor, err = s.server.processRequest(nil, lower, upper, bloom, limit, cursor)
envelopes, _, cursor, err = s.server.processRequest(nil, lower, upper, bloom, limit, cursor, false)
s.NoError(err)
for _, env := range envelopes {
receivedHashes = append(receivedHashes, env.Hash())
@ -445,9 +446,36 @@ func (s *MailserverSuite) TestMailServer() {
}
}
func (s *MailserverSuite) TestDecodeRequest() {
s.setupServer(s.server)
defer s.server.Close()
payload := shhext.MessagesRequestPayload{
Lower: 50,
Upper: 100,
Bloom: []byte{0x01},
Limit: 10,
Cursor: []byte{},
Batch: true,
}
data, err := rlp.EncodeToBytes(payload)
s.Require().NoError(err)
id, err := s.shh.NewKeyPair()
s.Require().NoError(err)
srcKey, err := s.shh.GetPrivateKey(id)
s.Require().NoError(err)
env := s.createEnvelope(whisper.TopicType{0x01}, data, srcKey)
decodedPayload, err := s.server.decodeRequest(nil, env)
s.Require().NoError(err)
s.Equal(payload, decodedPayload)
}
func (s *MailserverSuite) messageExists(envelope *whisper.Envelope, low, upp uint32, bloom []byte, limit uint32) bool {
var exist bool
mail, _, _, err := s.server.processRequest(nil, low, upp, bloom, limit, nil)
mail, _, _, err := s.server.processRequest(nil, low, upp, bloom, limit, nil, false)
s.NoError(err)
for _, msg := range mail {
if msg.Hash() == envelope.Hash() {
@ -549,6 +577,10 @@ func (s *MailserverSuite) createRequest(p *ServerTestParams) *whisper.Envelope {
data = append(data, limitData...)
}
return s.createEnvelope(p.topic, data, p.key)
}
func (s *MailserverSuite) createEnvelope(topic whisper.TopicType, data []byte, srcKey *ecdsa.PrivateKey) *whisper.Envelope {
key, err := s.shh.GetSymKey(keyID)
if err != nil {
s.T().Fatalf("failed to retrieve sym key with seed %d: %s.", seed, err)
@ -556,17 +588,18 @@ func (s *MailserverSuite) createRequest(p *ServerTestParams) *whisper.Envelope {
params := &whisper.MessageParams{
KeySym: key,
Topic: p.topic,
Topic: topic,
Payload: data,
PoW: powRequirement * 2,
WorkTime: 2,
Src: p.key,
Src: srcKey,
}
msg, err := whisper.NewSentMessage(params)
if err != nil {
s.T().Fatalf("failed to create new message with seed %d: %s.", seed, err)
}
env, err := msg.Wrap(params, time.Now())
if err != nil {
s.T().Fatalf("failed to wrap with seed %d: %s.", seed, err)

View File

@ -277,7 +277,14 @@ func activateShhService(stack *node.Node, config *params.NodeConfig, db *leveldb
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
whisperServiceConfig := &whisper.Config{
MaxMessageSize: whisper.DefaultMaxMessageSize,
MinimumAcceptedPOW: 0.001,
MinimumAcceptedPOW: params.WhisperMinimumPoW,
}
if config.WhisperConfig.MaxMessageSize > 0 {
whisperServiceConfig.MaxMessageSize = config.WhisperConfig.MaxMessageSize
}
if config.WhisperConfig.MinimumPoW > 0 {
whisperServiceConfig.MinimumAcceptedPOW = config.WhisperConfig.MinimumPoW
}
whisperService := whisper.New(whisperServiceConfig)

View File

@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/params"
"github.com/status-im/status-go/static"
whisper "github.com/status-im/whisper/whisperv6"
validator "gopkg.in/go-playground/validator.v9"
)
@ -72,6 +73,10 @@ type WhisperConfig struct {
// EnableNTPSync enables NTP synchronizations
EnableNTPSync bool
// MaxMessageSize is a maximum size of a devp2p packet handled by the Whisper protocol,
// not only the size of envelopes sent in that packet.
MaxMessageSize uint32
}
// String dumps config object as nicely indented JSON
@ -430,9 +435,10 @@ func NewNodeConfig(dataDir string, networkID uint64) (*NodeConfig, error) {
DatabaseCache: 16,
},
WhisperConfig: WhisperConfig{
DataDir: wnodeDir,
MinimumPoW: WhisperMinimumPoW,
TTL: WhisperTTL,
DataDir: wnodeDir,
MinimumPoW: WhisperMinimumPoW,
TTL: WhisperTTL,
MaxMessageSize: whisper.DefaultMaxMessageSize,
},
SwarmConfig: SwarmConfig{},
RegisterTopics: []discv5.Topic{},

View File

@ -3,13 +3,14 @@ package shhext
import (
"context"
"crypto/ecdsa"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
@ -102,6 +103,22 @@ func (r *MessagesRequest) setDefaults(now time.Time) {
}
}
// MessagesRequestPayload is a payload sent to the Mail Server.
type MessagesRequestPayload struct {
// Lower is a lower bound of time range for which messages are requested.
Lower uint32
// Upper is a lower bound of time range for which messages are requested.
Upper uint32
// Bloom is a bloom filter to filter envelopes.
Bloom []byte
// Limit is the max number of envelopes to return.
Limit uint32
// Cursor is used for pagination of the results.
Cursor []byte
// Batch set to true indicates that the client supports batched response.
Batch bool
}
// -----
// PUBLIC API
// -----
@ -168,8 +185,13 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex
}
}
payload, err := makePayload(r)
if err != nil {
return nil, err
}
envelope, err := makeEnvelop(
makePayload(r),
payload,
symKey,
publicKey,
api.service.nodeID,
@ -461,33 +483,25 @@ func makeEnvelop(
}
// makePayload makes a specific payload for MailServer to request historic messages.
func makePayload(r MessagesRequest) []byte {
// Payload format:
// 4 bytes for lower
// 4 bytes for upper
// 64 bytes for the bloom filter
// 4 bytes for limit
// 36 bytes for the cursor. optional.
data := make([]byte, 12+whisper.BloomFilterSize)
// from
binary.BigEndian.PutUint32(data, r.From)
// to
binary.BigEndian.PutUint32(data[4:], r.To)
// bloom
copy(data[8:], createBloomFilter(r))
// limit
binary.BigEndian.PutUint32(data[8+whisper.BloomFilterSize:], r.Limit)
// cursor is the key of an envelope in leveldb.
// it's 36 bytes. 4 bytes for the timestamp + 32 bytes for the envelope hash
func makePayload(r MessagesRequest) ([]byte, error) {
expectedCursorSize := common.HashLength + 4
cursorBytes, err := hex.DecodeString(r.Cursor)
if err != nil || len(cursorBytes) != expectedCursorSize {
return data
cursor, err := hex.DecodeString(r.Cursor)
if err != nil || len(cursor) != expectedCursorSize {
cursor = nil
}
return append(data, cursorBytes...)
payload := MessagesRequestPayload{
Lower: r.From,
Upper: r.To,
Bloom: createBloomFilter(r),
Limit: r.Limit,
Cursor: cursor,
// Client must tell the MailServer if it supports batch responses.
// This can be removed in the future.
Batch: true,
}
return rlp.EncodeToBytes(payload)
}
func createBloomFilter(r MessagesRequest) []byte {