mailserver pagination (#1039)

* mailserver sends envelopes in descending order

* add limit value in mailserver request payload

* mailserver sends messages up to the limit specified in the request

* update Archive method to return key and error

* processRequest returns the next page cursor

* add cursor to mailserver request

* add limit and cursor to request payload

* fix request limit encoding

* wait for request completed event in TrackerSuite/TestRequestCompleted

* add cursor to mailserver response

* fix cursor position in payload

* add e2e test for mail server pagination

* validate mail server response size

* remove old limitReached var

* fix lint warnings

* add whisper patch

* fix tests after rebase

* check all return values to avoid lint warnings

* check that all messages have been retrieved after 2 paginated requests

* fix lint warnings

* rename geth patch

* merge mailserver patches into one

* add last envelope hash to mailserver response and EventEnvelopeAvailable event

* update whisper patch

* add docs to MailServerResponse

* update whisper patch

* fix tests and lint warnings

* send mailserver response data on EventMailServerRequestCompleted signal

* update tracker tests

* optimise pagination test waiting for mailserver to archive only before requesting

* rollback mailserver interface changes

* refactoring and docs changes

* fix payload size check to determine if a limit is specified

* add more docs to the processRequest method

* add constants for request payload field lengths

* add const noLimits to specify that limit=0 means no limits
This commit is contained in:
Andrea Franz 2018-07-02 09:38:10 +02:00 committed by GitHub
parent 327299ddec
commit 809db97e54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 641 additions and 63 deletions

View File

@ -0,0 +1,134 @@
diff --git a/whisper/whisperv6/events.go b/whisper/whisperv6/events.go
index 1665539..fe7570e 100644
--- a/whisper/whisperv6/events.go
+++ b/whisper/whisperv6/events.go
@@ -13,10 +13,14 @@ const (
EventEnvelopeSent EventType = "envelope.sent"
// EventEnvelopeExpired fires when envelop expired
EventEnvelopeExpired EventType = "envelope.expired"
+ // EventEnvelopeAvailable fires when envelop is available for filters
+ EventEnvelopeAvailable EventType = "envelope.available"
// EventMailServerRequestCompleted fires after mailserver sends all the requested messages
EventMailServerRequestCompleted EventType = "mailserver.request.completed"
// EventMailServerRequestExpired fires after mailserver the request TTL ends
EventMailServerRequestExpired EventType = "mailserver.request.expired"
+ // EventMailServerEnvelopeArchived fires after an envelope has been archived
+ EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived"
)
// EnvelopeEvent used for envelopes events.
@@ -24,4 +28,5 @@ type EnvelopeEvent struct {
Event EventType
Hash common.Hash
Peer discover.NodeID
+ Data interface{}
}
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
index 91d4482..6a937a2 100644
--- a/whisper/whisperv6/whisper.go
+++ b/whisper/whisperv6/whisper.go
@@ -49,6 +49,12 @@ type Statistics struct {
totalMessagesCleared int
}
+// MailServerResponse is the response payload sent by the mailserver
+type MailServerResponse struct {
+ LastEnvelopeHash common.Hash
+ Cursor []byte
+}
+
const (
maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
overflowIdx // Indicator of message queue overflow
@@ -378,8 +384,8 @@ func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelop
return p2p.Send(p.ws, p2pRequestCode, envelope)
}
-func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, requestID common.Hash) error {
- size, r, err := rlp.EncodeToReader(requestID)
+func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) error {
+ size, r, err := rlp.EncodeToReader(payload)
if err != nil {
return err
}
@@ -835,15 +841,49 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
}
case p2pRequestCompleteCode:
if p.trusted {
- var requestID common.Hash
- if err := packet.Decode(&requestID); err != nil {
+ var payload []byte
+ if err := packet.Decode(&payload); err != nil {
log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid request response message")
}
+ // check if payload is
+ // - requestID or
+ // - requestID + lastEnvelopeHash or
+ // - requestID + lastEnvelopeHash + cursor
+ // requestID is the hash of the request envelope.
+ // lastEnvelopeHash is the last envelope sent by the mail server
+ // cursor is the db key, 36 bytes: 4 for the timestamp + 32 for the envelope hash.
+ // length := len(payload)
+
+ if len(payload) < common.HashLength || len(payload) > common.HashLength*3+4 {
+ log.Warn("invalid response message, peer will be disconnected", "peer", p.peer.ID(), "err", err, "payload size", len(payload))
+ return errors.New("invalid response size")
+ }
+
+ var (
+ requestID common.Hash
+ lastEnvelopeHash common.Hash
+ cursor []byte
+ )
+
+ requestID = common.BytesToHash(payload[:common.HashLength])
+
+ if len(payload) >= common.HashLength*2 {
+ lastEnvelopeHash = common.BytesToHash(payload[common.HashLength : common.HashLength*2])
+ }
+
+ if len(payload) >= common.HashLength*2+36 {
+ cursor = payload[common.HashLength*2 : common.HashLength*2+36]
+ }
+
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: requestID,
Event: EventMailServerRequestCompleted,
+ Data: &MailServerResponse{
+ LastEnvelopeHash: lastEnvelopeHash,
+ Cursor: cursor,
+ },
})
}
default:
@@ -927,6 +967,10 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
whisper.postEvent(envelope, isP2P) // notify the local node about the new message
if whisper.mailServer != nil {
whisper.mailServer.Archive(envelope)
+ whisper.envelopeFeed.Send(EnvelopeEvent{
+ Hash: envelope.Hash(),
+ Event: EventMailServerEnvelopeArchived,
+ })
}
}
return true, nil
@@ -985,9 +1029,17 @@ func (whisper *Whisper) processQueue() {
case e = <-whisper.messageQueue:
whisper.filters.NotifyWatchers(e, false)
+ whisper.envelopeFeed.Send(EnvelopeEvent{
+ Hash: e.Hash(),
+ Event: EventEnvelopeAvailable,
+ })
case e = <-whisper.p2pMsgQueue:
whisper.filters.NotifyWatchers(e, true)
+ whisper.envelopeFeed.Send(EnvelopeEvent{
+ Hash: e.Hash(),
+ Event: EventEnvelopeAvailable,
+ })
}
}
}

View File

@ -38,6 +38,7 @@ import (
const (
maxQueryRange = 24 * time.Hour
noLimits = 0
)
var (
@ -55,6 +56,15 @@ var (
archivedErrorsCounter = metrics.NewRegisteredCounter("mailserver/archiveErrors", nil)
)
const (
timestampLength = 4
dbKeyLength = common.HashLength + timestampLength
requestLimitLength = 4
requestTimeRangeLength = timestampLength * 2
)
type cursorType []byte
// dbImpl is an interface introduced to be able to test some unexpected
// panics from leveldb that are difficult to reproduce.
// normally the db implementation is leveldb.DB, but in TestMailServerDBPanicSuite
@ -88,11 +98,10 @@ type DBKey struct {
// NewDbKey creates a new DBKey with the given values.
func NewDbKey(t uint32, h common.Hash) *DBKey {
const sz = common.HashLength + 4
var k DBKey
k.timestamp = t
k.hash = h
k.raw = make([]byte, sz)
k.raw = make([]byte, dbKeyLength)
binary.BigEndian.PutUint32(k.raw, k.timestamp)
copy(k.raw[4:], k.hash[:])
return &k
@ -218,14 +227,14 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
defer recoverLevelDBPanics("DeliverMail")
if ok, lower, upper, bloom := s.validateRequest(peer.ID(), request); ok {
_, err := s.processRequest(peer, lower, upper, bloom)
if ok, lower, upper, bloom, limit, cursor := s.validateRequest(peer.ID(), request); ok {
_, lastEnvelopeHash, nextPageCursor, err := s.processRequest(peer, lower, upper, bloom, limit, cursor)
if err != nil {
log.Error(fmt.Sprintf("error in DeliverMail: %s", err))
return
}
if err := s.sendHistoricMessageResponse(peer, request); err != nil {
if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil {
log.Error(fmt.Sprintf("SendHistoricMessageResponse error: %s", err))
}
}
@ -246,8 +255,12 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool {
}
// processRequest processes the current request and re-sends all stored messages
// accomplishing lower and upper limits.
func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bloom []byte) (ret []*whisper.Envelope, err error) {
// accomplishing lower and upper limits. The limit parameter determines the maximum number of
// messages to be sent back for the current request.
// The cursor parameter is used for pagination.
// After sending all the messages, a message of type p2pRequestCompleteCode is sent by the mailserver to
// the peer.
func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bloom []byte, limit uint32, cursor cursorType) (ret []*whisper.Envelope, lastEnvelopeHash common.Hash, nextPageCursor cursorType, err error) {
// Recover from possible goleveldb panics
defer func() {
if r := recover(); r != nil {
@ -255,20 +268,28 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl
}
}()
var zero common.Hash
kl := NewDbKey(lower, zero)
ku := NewDbKey(upper+1, zero) // LevelDB is exclusive, while the Whisper API is inclusive
i := s.db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
defer i.Release()
var (
sentEnvelopes int64
sentEnvelopes uint32
sentEnvelopesSize int64
zero common.Hash
ku []byte
kl []byte
)
kl = NewDbKey(lower, zero).raw
if cursor != nil {
ku = cursor
} else {
ku = NewDbKey(upper+1, zero).raw
}
i := s.db.NewIterator(&util.Range{Start: kl, Limit: ku}, nil)
i.Seek(ku)
defer i.Release()
start := time.Now()
for i.Next() {
for i.Prev() {
var envelope whisper.Envelope
decodeErr := rlp.DecodeBytes(i.Value(), &envelope)
if decodeErr != nil {
@ -286,14 +307,20 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl
log.Error(fmt.Sprintf("Failed to send direct message to peer: %s", err))
return
}
lastEnvelopeHash = envelope.Hash()
}
sentEnvelopes++
sentEnvelopesSize += whisper.EnvelopeHeaderLength + int64(len(envelope.Data))
if limit != noLimits && sentEnvelopes == limit {
nextPageCursor = i.Key()
break
}
}
}
requestProcessTimer.UpdateSince(start)
sentEnvelopesMeter.Mark(sentEnvelopes)
sentEnvelopesMeter.Mark(int64(sentEnvelopes))
sentEnvelopesSizeMeter.Mark(sentEnvelopesSize)
err = i.Error()
@ -304,32 +331,35 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl
return
}
func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope) error {
return s.w.SendHistoricMessageResponse(peer, request.Hash())
func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope, lastEnvelopeHash common.Hash, cursor cursorType) error {
requestID := request.Hash()
payload := append(requestID[:], lastEnvelopeHash[:]...)
payload = append(payload, cursor...)
return s.w.SendHistoricMessageResponse(peer, payload)
}
// validateRequest runs different validations on the current request.
func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (bool, uint32, uint32, []byte) {
func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (bool, uint32, uint32, []byte, uint32, cursorType) {
if s.pow > 0.0 && request.PoW() < s.pow {
return false, 0, 0, nil
return false, 0, 0, nil, 0, nil
}
f := whisper.Filter{KeySym: s.key}
decrypted := request.Open(&f)
if decrypted == nil {
log.Warn("Failed to decrypt p2p request")
return false, 0, 0, nil
return false, 0, 0, nil, 0, nil
}
if err := s.checkMsgSignature(decrypted, peerID); err != nil {
log.Warn(err.Error())
return false, 0, 0, nil
return false, 0, 0, nil, 0, nil
}
bloom, err := s.bloomFromReceivedMessage(decrypted)
if err != nil {
log.Warn(err.Error())
return false, 0, 0, nil
return false, 0, 0, nil, 0, nil
}
lower := binary.BigEndian.Uint32(decrypted.Payload[:4])
@ -337,17 +367,27 @@ func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope)
if upper < lower {
log.Error(fmt.Sprintf("Query range is invalid: from > to (%d > %d)", lower, upper))
return false, 0, 0, nil
return false, 0, 0, nil, 0, nil
}
lowerTime := time.Unix(int64(lower), 0)
upperTime := time.Unix(int64(upper), 0)
if upperTime.Sub(lowerTime) > maxQueryRange {
log.Warn(fmt.Sprintf("Query range too big for peer %s", string(peerID)))
return false, 0, 0, nil
return false, 0, 0, nil, 0, nil
}
return true, lower, upper, bloom
var limit uint32
if len(decrypted.Payload) >= requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength {
limit = binary.BigEndian.Uint32(decrypted.Payload[requestTimeRangeLength+whisper.BloomFilterSize:])
}
var cursor cursorType
if len(decrypted.Payload) == requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength+dbKeyLength {
cursor = decrypted.Payload[requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength:]
}
return true, lower, upper, bloom, limit, cursor
}
// checkMsgSignature returns an error in case the message is not correcly signed

View File

@ -54,7 +54,7 @@ func (s *MailServerDBPanicSuite) TestArchive() {
func (s *MailServerDBPanicSuite) TestDeliverMail() {
defer s.testPanicRecover("DeliverMail")
_, err := s.server.processRequest(nil, 10, 20, []byte{})
_, _, _, err := s.server.processRequest(nil, 10, 20, []byte{}, 0, nil)
s.Error(err)
s.Equal("recovered from panic in processRequest: panicDB panic on NewIterator", err.Error())
}

View File

@ -43,6 +43,7 @@ type ServerTestParams struct {
birth uint32
low uint32
upp uint32
limit uint32
key *ecdsa.PrivateKey
}
@ -162,6 +163,71 @@ func (s *MailserverSuite) TestDBKey() {
s.Equal(byte(i/0x1000000), k.raw[0], "big endian expected")
}
func (s *MailserverSuite) TestRequestPaginationLimit() {
s.setupServer(s.server)
defer s.server.Close()
var (
sentEnvelopes []*whisper.Envelope
reverseSentHashes []common.Hash
receivedHashes []common.Hash
archiveKeys []string
)
now := time.Now()
count := uint32(10)
for i := count; i > 0; i-- {
sentTime := now.Add(time.Duration(-i) * time.Second)
env, err := generateEnvelope(sentTime)
s.NoError(err)
s.server.Archive(env)
key := NewDbKey(env.Expiry-env.TTL, env.Hash())
archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.raw))
sentEnvelopes = append(sentEnvelopes, env)
reverseSentHashes = append([]common.Hash{env.Hash()}, reverseSentHashes...)
}
params := s.defaultServerParams(sentEnvelopes[0])
params.low = uint32(now.Add(time.Duration(-count) * time.Second).Unix())
params.upp = uint32(now.Unix())
params.limit = 6
request := s.createRequest(params)
src := crypto.FromECDSAPub(&params.key.PublicKey)
ok, lower, upper, bloom, limit, cursor := s.server.validateRequest(src, request)
s.True(ok)
s.Nil(cursor)
s.Equal(params.limit, limit)
envelopes, _, cursor, err := s.server.processRequest(nil, lower, upper, bloom, limit, nil)
s.NoError(err)
for _, env := range envelopes {
receivedHashes = append(receivedHashes, env.Hash())
}
// 10 envelopes sent
s.Equal(count, uint32(len(sentEnvelopes)))
// 6 envelopes received
s.Equal(limit, uint32(len(receivedHashes)))
// the 6 envelopes received should be in descending order
s.Equal(reverseSentHashes[:limit], receivedHashes)
// cursor should be the key of the first envelope of the next page
s.Equal(archiveKeys[count-limit], fmt.Sprintf("%x", cursor))
// second page
receivedHashes = []common.Hash{}
envelopes, _, cursor, err = s.server.processRequest(nil, lower, upper, bloom, limit, cursor)
s.NoError(err)
for _, env := range envelopes {
receivedHashes = append(receivedHashes, env.Hash())
}
// 4 envelopes received
s.Equal(count-limit, uint32(len(receivedHashes)))
// cursor is nil because there are no other pages
s.Nil(cursor)
}
func (s *MailserverSuite) TestMailServer() {
s.setupServer(s.server)
defer s.server.Close()
@ -170,6 +236,7 @@ func (s *MailserverSuite) TestMailServer() {
s.NoError(err)
s.server.Archive(env)
testCases := []struct {
params *ServerTestParams
expect bool
@ -233,25 +300,26 @@ func (s *MailserverSuite) TestMailServer() {
s.T().Run(tc.info, func(*testing.T) {
request := s.createRequest(tc.params)
src := crypto.FromECDSAPub(&tc.params.key.PublicKey)
ok, lower, upper, bloom := s.server.validateRequest(src, request)
ok, lower, upper, bloom, limit, _ := s.server.validateRequest(src, request)
s.Equal(tc.isOK, ok)
if ok {
s.Equal(tc.params.low, lower)
s.Equal(tc.params.upp, upper)
s.Equal(tc.params.limit, limit)
s.Equal(whisper.TopicToBloom(tc.params.topic), bloom)
s.Equal(tc.expect, s.messageExists(env, tc.params.low, tc.params.upp, bloom))
s.Equal(tc.expect, s.messageExists(env, tc.params.low, tc.params.upp, bloom, tc.params.limit))
src[0]++
ok, _, _, _ = s.server.validateRequest(src, request)
ok, _, _, _, _, _ = s.server.validateRequest(src, request)
s.True(ok)
}
})
}
}
func (s *MailserverSuite) messageExists(envelope *whisper.Envelope, low, upp uint32, bloom []byte) bool {
func (s *MailserverSuite) messageExists(envelope *whisper.Envelope, low, upp uint32, bloom []byte, limit uint32) bool {
var exist bool
mail, err := s.server.processRequest(nil, low, upp, bloom)
mail, _, _, err := s.server.processRequest(nil, low, upp, bloom, limit, nil)
s.NoError(err)
for _, msg := range mail {
if msg.Hash() == envelope.Hash() {
@ -337,6 +405,7 @@ func (s *MailserverSuite) defaultServerParams(env *whisper.Envelope) *ServerTest
birth: birth,
low: birth - 1,
upp: birth + 1,
limit: 0,
key: testPeerID,
}
}
@ -348,6 +417,12 @@ func (s *MailserverSuite) createRequest(p *ServerTestParams) *whisper.Envelope {
binary.BigEndian.PutUint32(data[4:], p.upp)
data = append(data, bloom...)
if p.limit != 0 {
limitData := make([]byte, 4)
binary.BigEndian.PutUint32(limitData, p.limit)
data = append(data, limitData...)
}
key, err := s.shh.GetSymKey(keyID)
if err != nil {
s.T().Fatalf("failed to retrieve sym key with seed %d: %s.", seed, err)

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"time"
@ -46,6 +47,13 @@ type MessagesRequest struct {
// Default is now.
To uint32 `json:"to"`
// Limit determines the number of messages sent by the mail server
// for the current paginated request
Limit uint32 `json:"limit"`
// Cursor is used as starting point for paginated requests
Cursor string `json:"cursor"`
// Topic is a regular Whisper topic.
Topic whisper.TopicType `json:"topic"`
@ -184,10 +192,30 @@ func makeEnvelop(payload []byte, symKey []byte, nodeID *ecdsa.PrivateKey, pow fl
// makePayload makes a specific payload for MailServer to request historic messages.
func makePayload(r MessagesRequest) []byte {
// first 8 bytes are lowed and upper bounds as uint32
data := make([]byte, 8+whisper.BloomFilterSize)
// Payload format:
// 4 bytes for lower
// 4 bytes for upper
// 64 bytes for the bloom filter
// 4 bytes for limit
// 36 bytes for the cursor. optional.
data := make([]byte, 12+whisper.BloomFilterSize)
// from
binary.BigEndian.PutUint32(data, r.From)
// to
binary.BigEndian.PutUint32(data[4:], r.To)
// bloom
copy(data[8:], whisper.TopicToBloom(r.Topic))
// limit
binary.BigEndian.PutUint32(data[8+whisper.BloomFilterSize:], r.Limit)
// cursor is the key of an envelope in leveldb.
// it's 36 bytes. 4 bytes for the timestamp + 32 bytes for the envelope hash
expectedCursorSize := common.HashLength + 4
cursorBytes, err := hex.DecodeString(r.Cursor)
if err != nil || len(cursorBytes) != expectedCursorSize {
return data
}
return append(data, cursorBytes...)
}

View File

@ -31,7 +31,7 @@ const (
type EnvelopeEventsHandler interface {
EnvelopeSent(common.Hash)
EnvelopeExpired(common.Hash)
MailServerRequestCompleted(common.Hash)
MailServerRequestCompleted(common.Hash, common.Hash, []byte)
MailServerRequestExpired(common.Hash)
}
@ -235,7 +235,9 @@ func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEv
log.Debug("mailserver response received", "hash", event.Hash)
delete(t.cache, event.Hash)
if t.handler != nil {
t.handler.MailServerRequestCompleted(event.Hash)
if resp, ok := event.Data.(*whisper.MailServerResponse); ok {
t.handler.MailServerRequestCompleted(event.Hash, resp.LastEnvelopeHash, resp.Cursor)
}
}
}

View File

@ -39,8 +39,8 @@ func (t handlerMock) EnvelopeExpired(hash common.Hash) {
t.expirations <- hash
}
func (t handlerMock) MailServerRequestCompleted(hash common.Hash) {
t.requestsCompleted <- hash
func (t handlerMock) MailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte) {
t.requestsCompleted <- requestID
}
func (t handlerMock) MailServerRequestExpired(hash common.Hash) {
@ -389,14 +389,23 @@ func (s *TrackerSuite) TestRemoved() {
}
func (s *TrackerSuite) TestRequestCompleted() {
mock := newHandlerMock(1)
s.tracker.handler = mock
s.tracker.AddRequest(testHash, time.After(defaultRequestTimeout*time.Second))
s.Contains(s.tracker.cache, testHash)
s.Equal(MailServerRequestSent, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
Data: &whisper.MailServerResponse{},
})
select {
case requestID := <-mock.requestsCompleted:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for a request to be completed")
}
}
func (s *TrackerSuite) TestRequestExpiration() {

View File

@ -18,12 +18,12 @@ func (h EnvelopeSignalHandler) EnvelopeExpired(hash common.Hash) {
signal.SendEnvelopeExpired(hash)
}
// MailServerRequestCompleted triggered when mailserver send a ack with a requesID sent previously
func (h EnvelopeSignalHandler) MailServerRequestCompleted(hash common.Hash) {
signal.SendMailServerRequestCompleted(hash)
// MailServerRequestCompleted triggered when the mailserver sends a message to notify that the request has been completed
func (h EnvelopeSignalHandler) MailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte) {
signal.SendMailServerRequestCompleted(requestID, lastEnvelopeHash, cursor)
}
// MailServerRequestExpired triggered when mail server request expires
// MailServerRequestExpired triggered when the mailserver request expires
func (h EnvelopeSignalHandler) MailServerRequestExpired(hash common.Hash) {
signal.SendMailServerRequestExpired(hash)
}

View File

@ -24,6 +24,13 @@ type EnvelopeSignal struct {
Hash common.Hash `json:"hash"`
}
// MailServerResponseSignal holds the data received in the response from the mailserver.
type MailServerResponseSignal struct {
RequestID common.Hash `json:"requestID"`
LastEnvelopeHash common.Hash `json:"lastEnvelopeHash"`
Cursor string `json:"cursor"`
}
// SendEnvelopeSent triggered when envelope delivered at least to 1 peer.
func SendEnvelopeSent(hash common.Hash) {
send(EventEnvelopeSent, EnvelopeSignal{hash})
@ -35,8 +42,13 @@ func SendEnvelopeExpired(hash common.Hash) {
}
// SendMailServerRequestCompleted triggered when mail server response has been received
func SendMailServerRequestCompleted(hash common.Hash) {
send(EventMailServerRequestCompleted, EnvelopeSignal{hash})
func SendMailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte) {
sig := MailServerResponseSignal{
RequestID: requestID,
LastEnvelopeHash: lastEnvelopeHash,
Cursor: string(cursor),
}
send(EventMailServerRequestCompleted, sig)
}
// SendMailServerRequestExpired triggered when mail server request expires

View File

@ -79,13 +79,13 @@ func testMailserverPeer(t *testing.T) {
ok, err := shhAPI.MarkTrustedPeer(context.TODO(), *peerURL)
require.NoError(t, err)
require.True(t, ok)
hash, err := shhextAPI.RequestMessages(context.TODO(), shhext.MessagesRequest{
requestID, err := shhextAPI.RequestMessages(context.TODO(), shhext.MessagesRequest{
MailServerPeer: *peerURL,
SymKeyID: symKeyID,
Topic: topic,
})
require.NoError(t, err)
require.NotNil(t, hash)
require.NotNil(t, requestID)
// wait for all messages
require.NoError(t, waitForMessages(t, *msgCount, shhAPI, filterID))
}

View File

@ -3,7 +3,9 @@ package whisper
import (
"encoding/hex"
"encoding/json"
"fmt"
"path/filepath"
"sort"
"strconv"
"strings"
"testing"
@ -14,9 +16,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/api"
"github.com/status-im/status-go/node"
"github.com/status-im/status-go/rpc"
. "github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/suite"
@ -111,8 +116,7 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
senderWhisperService.SubscribeEnvelopeEvents(events)
// Request messages (including the previous one, expired) from mailbox.
result := s.requestHistoricMessagesFromLast12Hours(senderWhisperService, rpcClient, mailboxPeerStr, MailServerKeyID, topic.String())
requestID := common.BytesToHash(result)
requestID := s.requestHistoricMessagesFromLast12Hours(senderWhisperService, rpcClient, mailboxPeerStr, MailServerKeyID, topic.String(), 0, "")
// And we receive message, it comes from mailbox.
messages = s.getMessagesByMessageFilterIDWithTracer(rpcClient, messageFilterID, tracer, messageHash)
@ -282,8 +286,8 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
s.Require().Empty(messages)
// Request each one messages from mailbox using enode.
s.requestHistoricMessagesFromLast12Hours(bobWhisperService, bobRPCClient, mailboxEnode, bobMailServerKeyID, groupChatTopic.String())
s.requestHistoricMessagesFromLast12Hours(charlieWhisperService, charlieRPCClient, mailboxEnode, charlieMailServerKeyID, groupChatTopic.String())
s.requestHistoricMessagesFromLast12Hours(bobWhisperService, bobRPCClient, mailboxEnode, bobMailServerKeyID, groupChatTopic.String(), 0, "")
s.requestHistoricMessagesFromLast12Hours(charlieWhisperService, charlieRPCClient, mailboxEnode, charlieMailServerKeyID, groupChatTopic.String(), 0, "")
// Bob receive p2p message from group chat filter.
messages = s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobGroupChatMessageFilterID, bobTracer, groupChatMessageHash)
@ -296,6 +300,195 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
}
func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() {
// Start mailbox
mailbox, stop := s.startMailboxBackend()
defer stop()
s.Require().True(mailbox.IsNodeRunning())
mailboxEnode := mailbox.StatusNode().GethNode().Server().NodeInfo().Enode
// Start client
client, stop := s.startBackend("client")
defer stop()
s.Require().True(client.IsNodeRunning())
clientRPCClient := client.StatusNode().RPCClient()
// Add mailbox to clients's peers
s.addPeerAndWait(client.StatusNode(), mailbox.StatusNode())
// Whisper services
mailboxWhisperService, err := mailbox.StatusNode().WhisperService()
s.Require().NoError(err)
clientWhisperService, err := client.StatusNode().WhisperService()
s.Require().NoError(err)
// mailserver sym key
mailServerKeyID, err := clientWhisperService.AddSymKeyFromPassword(mailboxPassword)
s.Require().NoError(err)
// public chat
var (
keyID string
topic whisper.TopicType
filterID string
)
publicChatName := "test public chat"
keyID, topic, filterID = s.joinPublicChat(clientWhisperService, clientRPCClient, publicChatName)
// envelopes to be sent
envelopesCount := 5
sentEnvelopesHashes := make([]string, 0)
// watch envelopes to be archived on mailserver
envelopeArchivedWatcher := make(chan whisper.EnvelopeEvent, 1024)
mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher)
// watch envelopes to be available for filters in the client
envelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024)
clientWhisperService.SubscribeEnvelopeEvents(envelopeAvailableWatcher)
// watch mailserver responses in the client
mailServerResponseWatcher := make(chan whisper.EnvelopeEvent, 1024)
clientWhisperService.SubscribeEnvelopeEvents(mailServerResponseWatcher)
// send envelopes
for i := 0; i < envelopesCount; i++ {
hash := s.postMessageToGroup(clientRPCClient, keyID, topic.String(), "")
sentEnvelopesHashes = append(sentEnvelopesHashes, hash)
}
// get messages from filter before requesting them to mailserver
lastEnvelopeHash := sentEnvelopesHashes[len(sentEnvelopesHashes)-1]
s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{lastEnvelopeHash}, whisper.EventEnvelopeAvailable)
messages := s.getMessagesByMessageFilterID(clientRPCClient, filterID)
s.Equal(envelopesCount, len(messages))
messages = s.getMessagesByMessageFilterID(clientRPCClient, filterID)
s.Equal(0, len(messages))
limit := 3
getMessages := func() []string {
envelopes := s.getMessagesByMessageFilterID(clientRPCClient, filterID)
hashes := make([]string, 0)
for _, e := range envelopes {
hashes = append(hashes, e["hash"].(string))
}
return hashes
}
requestMessages := func(cursor string) common.Hash {
return s.requestHistoricMessagesFromLast12Hours(clientWhisperService, clientRPCClient, mailboxEnode, mailServerKeyID, topic.String(), limit, cursor)
}
// wait for mailserver to archive all the envelopes
s.waitForEnvelopeEvents(envelopeArchivedWatcher, sentEnvelopesHashes, whisper.EventMailServerEnvelopeArchived)
// first page
// send request
requestID := requestMessages("")
// wait for mail server response
resp := s.waitForMailServerResponse(mailServerResponseWatcher, requestID)
s.NotEmpty(resp.LastEnvelopeHash)
s.NotEmpty(resp.Cursor)
// wait for last envelope sent by the mailserver to be available for filters
s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{resp.LastEnvelopeHash.String()}, whisper.EventEnvelopeAvailable)
// get messages
firstPageHashes := getMessages()
s.Equal(3, len(firstPageHashes))
// second page
// send request
requestID = requestMessages(fmt.Sprintf("%x", resp.Cursor))
// wait for mail server response
resp = s.waitForMailServerResponse(mailServerResponseWatcher, requestID)
s.NotEmpty(resp.LastEnvelopeHash)
// all messages have been sent, no more pages available
s.Empty(resp.Cursor)
// wait for last envelope sent by the mailserver to be available for filters
s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{resp.LastEnvelopeHash.String()}, whisper.EventEnvelopeAvailable)
// get messages
secondPageHashes := getMessages()
s.Equal(2, len(secondPageHashes))
allReceivedHashes := append(firstPageHashes, secondPageHashes...)
s.Equal(envelopesCount, len(allReceivedHashes))
// check that all the envelopes have been received
sort.Strings(sentEnvelopesHashes)
sort.Strings(allReceivedHashes)
s.Equal(sentEnvelopesHashes, allReceivedHashes)
}
func (s *WhisperMailboxSuite) waitForEnvelopeEvents(events chan whisper.EnvelopeEvent, hashes []string, event whisper.EventType) {
check := make(map[string]struct{})
for _, hash := range hashes {
check[hash] = struct{}{}
}
timeout := time.NewTimer(time.Second * 5)
for {
select {
case e := <-events:
if e.Event == event {
delete(check, e.Hash.String())
if len(check) == 0 {
timeout.Stop()
return
}
}
case <-timeout.C:
s.FailNow("timed out while waiting for event on envelopes", "event: %s", event)
}
}
}
func (s *WhisperMailboxSuite) waitForMailServerResponse(events chan whisper.EnvelopeEvent, requestID common.Hash) *whisper.MailServerResponse {
timeout := time.NewTimer(time.Second * 5)
for {
select {
case event := <-events:
if event.Event == whisper.EventMailServerRequestCompleted && event.Hash == requestID {
timeout.Stop()
resp, ok := event.Data.(*whisper.MailServerResponse)
if !ok {
s.FailNow("mailserver response error", "expected whisper.MailServerResponse, got: %+v", resp)
}
return resp
}
case <-timeout.C:
s.FailNow("timed out while waiting for mailserver response")
}
}
}
func (s *WhisperMailboxSuite) addPeerAndWait(node, other *node.StatusNode) {
nodeInfo := node.GethNode().Server().NodeInfo()
nodeID := nodeInfo.ID
nodeEnode := nodeInfo.Enode
otherEnode := other.GethNode().Server().NodeInfo().Enode
s.Require().NotEqual(nodeEnode, otherEnode)
ch := make(chan *p2p.PeerEvent)
subscription := other.GethNode().Server().SubscribeEvents(ch)
defer subscription.Unsubscribe()
err := node.AddPeer(otherEnode)
s.Require().NoError(err)
select {
case event := <-ch:
if event.Type == p2p.PeerEventTypeAdd && event.Peer.String() == nodeID {
return
}
s.Failf("failed connecting to peer", "expected p2p.PeerEventTypeAdd with nodeID (%s), got: %+v", nodeID, event)
case <-time.After(time.Second):
s.Fail("timed out while waiting for a peer to be added")
}
}
func newGroupChatParams(symkey []byte, topic whisper.TopicType) groupChatParams {
groupChatKeyStr := hexutil.Bytes(symkey).String()
return groupChatParams{
@ -460,7 +653,16 @@ func (s *WhisperMailboxSuite) postMessageToGroup(rpcCli *rpc.Client, groupChatKe
s.Require().NoError(err)
s.Require().Nil(postResp.Error)
return postResp.Result.(string)
hash, ok := postResp.Result.(string)
if !ok {
s.FailNow("error decoding result", "expected string, got: %+v", postResp.Result)
}
if !strings.HasPrefix(hash, "0x") {
s.FailNow("hash format error", "expected hex string, got: %s", hash)
}
return hash
}
// getMessagesByMessageFilterID gets received messages by messageFilterID.
@ -516,15 +718,15 @@ func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) strin
}
// requestHistoricMessagesFromLast12Hours asks a mailnode to resend messages from last 12 hours.
func (s *WhisperMailboxSuite) requestHistoricMessagesFromLast12Hours(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) []byte {
func (s *WhisperMailboxSuite) requestHistoricMessagesFromLast12Hours(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string, limit int, cursor string) common.Hash {
currentTime := w.GetCurrentTime()
from := currentTime.Add(-12 * time.Hour)
to := currentTime
return s.requestHistoricMessages(w, rpcCli, mailboxEnode, mailServerKeyID, topic, from, to)
return s.requestHistoricMessages(w, rpcCli, mailboxEnode, mailServerKeyID, topic, from, to, limit, cursor)
}
// requestHistoricMessages asks a mailnode to resend messages.
func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string, from, to time.Time) []byte {
func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string, from, to time.Time, limit int, cursor string) common.Hash {
resp := rpcCli.CallRaw(`{
"jsonrpc": "2.0",
"id": 2,
@ -534,7 +736,9 @@ func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli
"topic":"` + topic + `",
"symKeyID":"` + mailServerKeyID + `",
"from":` + strconv.FormatInt(from.Unix(), 10) + `,
"to":` + strconv.FormatInt(to.Unix(), 10) + `
"to":` + strconv.FormatInt(to.Unix(), 10) + `,
"limit": ` + fmt.Sprintf("%d", limit) + `,
"cursor": "` + cursor + `"
}]
}`)
reqMessagesResp := baseRPCResponse{}
@ -547,12 +751,29 @@ func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli
s.Require().True(strings.HasPrefix(hash, "0x"))
b, err := hex.DecodeString(hash[2:])
s.Require().NoError(err)
return b
return common.BytesToHash(b)
default:
s.Failf("failed reading shh_newMessageFilter result", "expected a hash, got: %+v", reqMessagesResp.Result)
}
return nil
return common.Hash{}
}
func (s *WhisperMailboxSuite) joinPublicChat(w *whisper.Whisper, rpcClient *rpc.Client, name string) (string, whisper.TopicType, string) {
keyID, err := w.AddSymKeyFromPassword(name)
s.Require().NoError(err)
h := sha3.NewKeccak256()
_, err = h.Write([]byte(name))
if err != nil {
s.Fail("error generating topic", "failed gerating topic from chat name, %+v", err)
}
fullTopic := h.Sum(nil)
topic := whisper.BytesToTopic(fullTopic)
filterID := s.createGroupChatMessageFilter(rpcClient, keyID, topic.String())
return keyID, topic, filterID
}
type getFilterMessagesResponse struct {

View File

@ -13,10 +13,14 @@ const (
EventEnvelopeSent EventType = "envelope.sent"
// EventEnvelopeExpired fires when envelop expired
EventEnvelopeExpired EventType = "envelope.expired"
// EventEnvelopeAvailable fires when envelop is available for filters
EventEnvelopeAvailable EventType = "envelope.available"
// EventMailServerRequestCompleted fires after mailserver sends all the requested messages
EventMailServerRequestCompleted EventType = "mailserver.request.completed"
// EventMailServerRequestExpired fires after mailserver the request TTL ends
EventMailServerRequestExpired EventType = "mailserver.request.expired"
// EventMailServerEnvelopeArchived fires after an envelope has been archived
EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived"
)
// EnvelopeEvent used for envelopes events.
@ -24,4 +28,5 @@ type EnvelopeEvent struct {
Event EventType
Hash common.Hash
Peer discover.NodeID
Data interface{}
}

View File

@ -49,6 +49,12 @@ type Statistics struct {
totalMessagesCleared int
}
// MailServerResponse is the response payload sent by the mailserver
type MailServerResponse struct {
LastEnvelopeHash common.Hash
Cursor []byte
}
const (
maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
overflowIdx // Indicator of message queue overflow
@ -378,8 +384,8 @@ func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelop
return p2p.Send(p.ws, p2pRequestCode, envelope)
}
func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, requestID common.Hash) error {
size, r, err := rlp.EncodeToReader(requestID)
func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) error {
size, r, err := rlp.EncodeToReader(payload)
if err != nil {
return err
}
@ -835,15 +841,49 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
}
case p2pRequestCompleteCode:
if p.trusted {
var requestID common.Hash
if err := packet.Decode(&requestID); err != nil {
var payload []byte
if err := packet.Decode(&payload); err != nil {
log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid request response message")
}
// check if payload is
// - requestID or
// - requestID + lastEnvelopeHash or
// - requestID + lastEnvelopeHash + cursor
// requestID is the hash of the request envelope.
// lastEnvelopeHash is the last envelope sent by the mail server
// cursor is the db key, 36 bytes: 4 for the timestamp + 32 for the envelope hash.
// length := len(payload)
if len(payload) < common.HashLength || len(payload) > common.HashLength*3+4 {
log.Warn("invalid response message, peer will be disconnected", "peer", p.peer.ID(), "err", err, "payload size", len(payload))
return errors.New("invalid response size")
}
var (
requestID common.Hash
lastEnvelopeHash common.Hash
cursor []byte
)
requestID = common.BytesToHash(payload[:common.HashLength])
if len(payload) >= common.HashLength*2 {
lastEnvelopeHash = common.BytesToHash(payload[common.HashLength : common.HashLength*2])
}
if len(payload) >= common.HashLength*2+36 {
cursor = payload[common.HashLength*2 : common.HashLength*2+36]
}
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: requestID,
Event: EventMailServerRequestCompleted,
Data: &MailServerResponse{
LastEnvelopeHash: lastEnvelopeHash,
Cursor: cursor,
},
})
}
default:
@ -927,6 +967,10 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
whisper.postEvent(envelope, isP2P) // notify the local node about the new message
if whisper.mailServer != nil {
whisper.mailServer.Archive(envelope)
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: envelope.Hash(),
Event: EventMailServerEnvelopeArchived,
})
}
}
return true, nil
@ -985,9 +1029,17 @@ func (whisper *Whisper) processQueue() {
case e = <-whisper.messageQueue:
whisper.filters.NotifyWatchers(e, false)
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: e.Hash(),
Event: EventEnvelopeAvailable,
})
case e = <-whisper.p2pMsgQueue:
whisper.filters.NotifyWatchers(e, true)
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: e.Hash(),
Event: EventEnvelopeAvailable,
})
}
}
}