Fix cursor encoding (#1308)
It fixes encoding and decoding cursor sent in historic messages requests. `hex` package from the standard library is used.
This commit is contained in:
parent
7d651afaae
commit
e2682486fd
|
@ -26,9 +26,9 @@ func NewCleanerWithDB(db dbImpl) *Cleaner {
|
|||
// Prune removes messages sent between lower and upper timestamps and returns how many has been removed
|
||||
func (c *Cleaner) Prune(lower, upper uint32) (int, error) {
|
||||
var zero common.Hash
|
||||
kl := NewDbKey(lower, zero)
|
||||
ku := NewDbKey(upper, zero)
|
||||
i := c.db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
|
||||
kl := NewDBKey(lower, zero)
|
||||
ku := NewDBKey(upper, zero)
|
||||
i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil)
|
||||
defer i.Release()
|
||||
|
||||
return c.prune(i)
|
||||
|
|
|
@ -102,8 +102,8 @@ func countMessages(t *testing.T, db dbImpl) int {
|
|||
)
|
||||
|
||||
now := time.Now()
|
||||
kl := NewDbKey(uint32(0), zero)
|
||||
ku := NewDbKey(uint32(now.Unix()), zero)
|
||||
kl := NewDBKey(uint32(0), zero)
|
||||
ku := NewDBKey(uint32(now.Unix()), zero)
|
||||
i := db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
|
||||
defer i.Release()
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/status-im/status-go/db"
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/services/shhext"
|
||||
whisper "github.com/status-im/whisper/whisperv6"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||||
|
@ -66,14 +65,14 @@ var (
|
|||
)
|
||||
|
||||
const (
|
||||
// DBKeyLength is a size of the envelope key.
|
||||
DBKeyLength = common.HashLength + timestampLength
|
||||
|
||||
timestampLength = 4
|
||||
dbKeyLength = common.HashLength + timestampLength
|
||||
requestLimitLength = 4
|
||||
requestTimeRangeLength = timestampLength * 2
|
||||
)
|
||||
|
||||
type cursorType []byte
|
||||
|
||||
// dbImpl is an interface introduced to be able to test some unexpected
|
||||
// panics from leveldb that are difficult to reproduce.
|
||||
// normally the db implementation is leveldb.DB, but in TestMailServerDBPanicSuite
|
||||
|
@ -108,17 +107,31 @@ type DBKey struct {
|
|||
raw []byte
|
||||
}
|
||||
|
||||
// NewDbKey creates a new DBKey with the given values.
|
||||
func NewDbKey(t uint32, h common.Hash) *DBKey {
|
||||
// Bytes returns a bytes representation of the DBKey.
|
||||
func (k *DBKey) Bytes() []byte {
|
||||
return k.raw
|
||||
}
|
||||
|
||||
// NewDBKey creates a new DBKey with the given values.
|
||||
func NewDBKey(t uint32, h common.Hash) *DBKey {
|
||||
var k DBKey
|
||||
k.timestamp = t
|
||||
k.hash = h
|
||||
k.raw = make([]byte, dbKeyLength)
|
||||
k.raw = make([]byte, DBKeyLength)
|
||||
binary.BigEndian.PutUint32(k.raw, k.timestamp)
|
||||
copy(k.raw[4:], k.hash[:])
|
||||
return &k
|
||||
}
|
||||
|
||||
// NewDBKeyFromBytes creates a DBKey from a byte slice.
|
||||
func NewDBKeyFromBytes(b []byte) *DBKey {
|
||||
return &DBKey{
|
||||
raw: b,
|
||||
timestamp: binary.BigEndian.Uint32(b),
|
||||
hash: common.BytesToHash(b[4:]),
|
||||
}
|
||||
}
|
||||
|
||||
// Init initializes mailServer.
|
||||
func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) error {
|
||||
var err error
|
||||
|
@ -226,13 +239,13 @@ func (s *WMailServer) Archive(env *whisper.Envelope) {
|
|||
|
||||
log.Debug("Archiving envelope", "hash", env.Hash().Hex())
|
||||
|
||||
key := NewDbKey(env.Expiry-env.TTL, env.Hash())
|
||||
key := NewDBKey(env.Expiry-env.TTL, env.Hash())
|
||||
rawEnvelope, err := rlp.EncodeToBytes(env)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
|
||||
archivedErrorsCounter.Inc(1)
|
||||
} else {
|
||||
if err = s.db.Put(key.raw, rawEnvelope, nil); err != nil {
|
||||
if err = s.db.Put(key.Bytes(), rawEnvelope, nil); err != nil {
|
||||
log.Error(fmt.Sprintf("Writing to DB failed: %s", err))
|
||||
archivedErrorsCounter.Inc(1)
|
||||
}
|
||||
|
@ -265,7 +278,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
|
|||
lower, upper uint32
|
||||
bloom []byte
|
||||
limit uint32
|
||||
cursor cursorType
|
||||
cursor []byte
|
||||
batch bool
|
||||
err error
|
||||
)
|
||||
|
@ -274,7 +287,7 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
|
|||
if err == nil {
|
||||
lower, upper = payload.Lower, payload.Upper
|
||||
bloom = payload.Bloom
|
||||
cursor = cursorType(payload.Cursor)
|
||||
cursor = payload.Cursor
|
||||
limit = payload.Limit
|
||||
batch = payload.Batch
|
||||
} else {
|
||||
|
@ -447,23 +460,22 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (s *WMailServer) createIterator(lower, upper uint32, cursor cursorType) iterator.Iterator {
|
||||
func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte) iterator.Iterator {
|
||||
var (
|
||||
emptyHash common.Hash
|
||||
ku []byte
|
||||
kl []byte
|
||||
ku, kl *DBKey
|
||||
)
|
||||
|
||||
kl = NewDbKey(lower, emptyHash).raw
|
||||
if len(cursor) == dbKeyLength {
|
||||
ku = cursor
|
||||
kl = NewDBKey(lower, emptyHash)
|
||||
if len(cursor) == DBKeyLength {
|
||||
ku = NewDBKeyFromBytes(cursor)
|
||||
} else {
|
||||
ku = NewDbKey(upper+1, emptyHash).raw
|
||||
ku = NewDBKey(upper+1, emptyHash)
|
||||
}
|
||||
|
||||
i := s.db.NewIterator(&util.Range{Start: kl, Limit: ku}, nil)
|
||||
i := s.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil)
|
||||
// seek to the end as we want to return envelopes in a descending order
|
||||
i.Seek(ku)
|
||||
i.Seek(ku.Bytes())
|
||||
|
||||
return i
|
||||
}
|
||||
|
@ -472,13 +484,13 @@ func (s *WMailServer) createIterator(lower, upper uint32, cursor cursorType) ite
|
|||
// to the output channel in bundles.
|
||||
func (s *WMailServer) processRequestInBundles(
|
||||
iter iterator.Iterator, bloom []byte, limit int, output chan<- []*whisper.Envelope,
|
||||
) (cursorType, common.Hash) {
|
||||
) ([]byte, common.Hash) {
|
||||
var (
|
||||
bundle []*whisper.Envelope
|
||||
bundleSize uint32
|
||||
processedEnvelopes int
|
||||
processedEnvelopesSize int64
|
||||
nextCursor cursorType
|
||||
nextCursor []byte
|
||||
lastEnvelopeHash common.Hash
|
||||
)
|
||||
|
||||
|
@ -559,7 +571,7 @@ func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Env
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope, lastEnvelopeHash common.Hash, cursor cursorType) error {
|
||||
func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope, lastEnvelopeHash common.Hash, cursor []byte) error {
|
||||
payload := whisper.CreateMailServerRequestCompletedPayload(request.Hash(), lastEnvelopeHash, cursor)
|
||||
return s.w.SendHistoricMessageResponse(peer, payload)
|
||||
}
|
||||
|
@ -592,8 +604,8 @@ func (s *WMailServer) openEnvelope(request *whisper.Envelope) *whisper.ReceivedM
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *WMailServer) decodeRequest(peerID []byte, request *whisper.Envelope) (shhext.MessagesRequestPayload, error) {
|
||||
var payload shhext.MessagesRequestPayload
|
||||
func (s *WMailServer) decodeRequest(peerID []byte, request *whisper.Envelope) (MessagesRequestPayload, error) {
|
||||
var payload MessagesRequestPayload
|
||||
|
||||
if s.pow > 0.0 && request.PoW() < s.pow {
|
||||
return payload, errors.New("PoW too low")
|
||||
|
@ -634,7 +646,7 @@ func (s *WMailServer) decodeRequest(peerID []byte, request *whisper.Envelope) (s
|
|||
func (s *WMailServer) validateRequest(
|
||||
peerID []byte,
|
||||
request *whisper.Envelope,
|
||||
) (uint32, uint32, []byte, uint32, cursorType, error) {
|
||||
) (uint32, uint32, []byte, uint32, []byte, error) {
|
||||
if s.pow > 0.0 && request.PoW() < s.pow {
|
||||
return 0, 0, nil, 0, nil, fmt.Errorf("PoW() is too low")
|
||||
}
|
||||
|
@ -673,8 +685,8 @@ func (s *WMailServer) validateRequest(
|
|||
limit = binary.BigEndian.Uint32(decrypted.Payload[requestTimeRangeLength+whisper.BloomFilterSize:])
|
||||
}
|
||||
|
||||
var cursor cursorType
|
||||
if len(decrypted.Payload) == requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength+dbKeyLength {
|
||||
var cursor []byte
|
||||
if len(decrypted.Payload) == requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength+DBKeyLength {
|
||||
cursor = decrypted.Payload[requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength:]
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/services/shhext"
|
||||
whisper "github.com/status-im/whisper/whisperv6"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
@ -266,8 +265,8 @@ func (s *MailserverSuite) TestArchive() {
|
|||
s.NoError(err)
|
||||
|
||||
s.server.Archive(env)
|
||||
key := NewDbKey(env.Expiry-env.TTL, env.Hash())
|
||||
archivedEnvelope, err := s.server.db.Get(key.raw, nil)
|
||||
key := NewDBKey(env.Expiry-env.TTL, env.Hash())
|
||||
archivedEnvelope, err := s.server.db.Get(key.Bytes(), nil)
|
||||
s.NoError(err)
|
||||
|
||||
s.Equal(rawEnvelope, archivedEnvelope)
|
||||
|
@ -288,10 +287,10 @@ func (s *MailserverSuite) TestManageLimits() {
|
|||
func (s *MailserverSuite) TestDBKey() {
|
||||
var h common.Hash
|
||||
i := uint32(time.Now().Unix())
|
||||
k := NewDbKey(i, h)
|
||||
s.Equal(len(k.raw), common.HashLength+4, "wrong DB key length")
|
||||
s.Equal(byte(i%0x100), k.raw[3], "raw representation should be big endian")
|
||||
s.Equal(byte(i/0x1000000), k.raw[0], "big endian expected")
|
||||
k := NewDBKey(i, h)
|
||||
s.Equal(len(k.Bytes()), DBKeyLength, "wrong DB key length")
|
||||
s.Equal(byte(i%0x100), k.Bytes()[3], "raw representation should be big endian")
|
||||
s.Equal(byte(i/0x1000000), k.Bytes()[0], "big endian expected")
|
||||
}
|
||||
|
||||
func (s *MailserverSuite) TestRequestPaginationLimit() {
|
||||
|
@ -313,8 +312,8 @@ func (s *MailserverSuite) TestRequestPaginationLimit() {
|
|||
env, err := generateEnvelope(sentTime)
|
||||
s.NoError(err)
|
||||
s.server.Archive(env)
|
||||
key := NewDbKey(env.Expiry-env.TTL, env.Hash())
|
||||
archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.raw))
|
||||
key := NewDBKey(env.Expiry-env.TTL, env.Hash())
|
||||
archiveKeys = append(archiveKeys, fmt.Sprintf("%x", key.Bytes()))
|
||||
sentEnvelopes = append(sentEnvelopes, env)
|
||||
reverseSentHashes = append([]common.Hash{env.Hash()}, reverseSentHashes...)
|
||||
}
|
||||
|
@ -447,7 +446,7 @@ func (s *MailserverSuite) TestDecodeRequest() {
|
|||
s.setupServer(s.server)
|
||||
defer s.server.Close()
|
||||
|
||||
payload := shhext.MessagesRequestPayload{
|
||||
payload := MessagesRequestPayload{
|
||||
Lower: 50,
|
||||
Upper: 100,
|
||||
Bloom: []byte{0x01},
|
||||
|
@ -635,8 +634,8 @@ func generateEnvelope(sentTime time.Time) (*whisper.Envelope, error) {
|
|||
}
|
||||
|
||||
func processRequestAndCollectHashes(
|
||||
server *WMailServer, lower, upper uint32, cursor cursorType, bloom []byte, limit int,
|
||||
) ([]common.Hash, cursorType, common.Hash) {
|
||||
server *WMailServer, lower, upper uint32, cursor []byte, bloom []byte, limit int,
|
||||
) ([]common.Hash, []byte, common.Hash) {
|
||||
iter := server.createIterator(lower, upper, cursor)
|
||||
defer iter.Release()
|
||||
bundles := make(chan []*whisper.Envelope, 10)
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
package mailserver
|
||||
|
||||
// MessagesRequestPayload is a payload sent to the Mail Server.
|
||||
type MessagesRequestPayload struct {
|
||||
// Lower is a lower bound of time range for which messages are requested.
|
||||
Lower uint32
|
||||
// Upper is a lower bound of time range for which messages are requested.
|
||||
Upper uint32
|
||||
// Bloom is a bloom filter to filter envelopes.
|
||||
Bloom []byte
|
||||
// Limit is the max number of envelopes to return.
|
||||
Limit uint32
|
||||
// Cursor is used for pagination of the results.
|
||||
Cursor []byte
|
||||
// Batch set to true indicates that the client supports batched response.
|
||||
Batch bool
|
||||
}
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/status-im/status-go/mailserver"
|
||||
"github.com/status-im/status-go/services/shhext/chat"
|
||||
"github.com/status-im/status-go/services/shhext/mailservers"
|
||||
whisper "github.com/status-im/whisper/whisperv6"
|
||||
|
@ -104,22 +105,6 @@ func (r *MessagesRequest) setDefaults(now time.Time) {
|
|||
}
|
||||
}
|
||||
|
||||
// MessagesRequestPayload is a payload sent to the Mail Server.
|
||||
type MessagesRequestPayload struct {
|
||||
// Lower is a lower bound of time range for which messages are requested.
|
||||
Lower uint32
|
||||
// Upper is a lower bound of time range for which messages are requested.
|
||||
Upper uint32
|
||||
// Bloom is a bloom filter to filter envelopes.
|
||||
Bloom []byte
|
||||
// Limit is the max number of envelopes to return.
|
||||
Limit uint32
|
||||
// Cursor is used for pagination of the results.
|
||||
Cursor []byte
|
||||
// Batch set to true indicates that the client supports batched response.
|
||||
Batch bool
|
||||
}
|
||||
|
||||
// -----
|
||||
// PUBLIC API
|
||||
// -----
|
||||
|
@ -188,7 +173,7 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex
|
|||
publicKey = mailServerNode.Pubkey()
|
||||
}
|
||||
|
||||
payload, err := makePayload(r)
|
||||
payload, err := makeMessagesRequestPayload(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -473,15 +458,18 @@ func makeEnvelop(
|
|||
return message.Wrap(¶ms, now)
|
||||
}
|
||||
|
||||
// makePayload makes a specific payload for MailServer to request historic messages.
|
||||
func makePayload(r MessagesRequest) ([]byte, error) {
|
||||
expectedCursorSize := common.HashLength + 4
|
||||
// makeMessagesRequestPayload makes a specific payload for MailServer
|
||||
// to request historic messages.
|
||||
func makeMessagesRequestPayload(r MessagesRequest) ([]byte, error) {
|
||||
cursor, err := hex.DecodeString(r.Cursor)
|
||||
if err != nil || len(cursor) != expectedCursorSize {
|
||||
cursor = nil
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid cursor: %v", err)
|
||||
}
|
||||
if len(cursor) > 0 && len(cursor) != mailserver.DBKeyLength {
|
||||
return nil, fmt.Errorf("invalid cursor size: expected %d but got %d", mailserver.DBKeyLength, len(cursor))
|
||||
}
|
||||
|
||||
payload := MessagesRequestPayload{
|
||||
payload := mailserver.MessagesRequestPayload{
|
||||
Lower: r.From,
|
||||
Upper: r.To,
|
||||
Bloom: createBloomFilter(r),
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
package shhext
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/status-im/status-go/mailserver"
|
||||
|
||||
whisper "github.com/status-im/whisper/whisperv6"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -55,6 +59,43 @@ func TestMessagesRequest_setDefaults(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMakeMessagesRequestPayload(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Name string
|
||||
Req MessagesRequest
|
||||
Err string
|
||||
}{
|
||||
{
|
||||
Name: "empty cursor",
|
||||
Req: MessagesRequest{Cursor: ""},
|
||||
Err: "",
|
||||
},
|
||||
{
|
||||
Name: "invalid cursor size",
|
||||
Req: MessagesRequest{Cursor: hex.EncodeToString([]byte{0x01, 0x02, 0x03})},
|
||||
Err: fmt.Sprintf("invalid cursor size: expected %d but got 3", mailserver.DBKeyLength),
|
||||
},
|
||||
{
|
||||
Name: "valid cursor",
|
||||
Req: MessagesRequest{
|
||||
Cursor: hex.EncodeToString(mailserver.NewDBKey(123, common.Hash{}).Bytes()),
|
||||
},
|
||||
Err: "",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, err := makeMessagesRequestPayload(tc.Req)
|
||||
if tc.Err == "" {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.EqualError(t, err, tc.Err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTopicsToBloom(t *testing.T) {
|
||||
t1 := stringToTopic("t1")
|
||||
b1 := whisper.TopicToBloom(t1)
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package signal
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
)
|
||||
|
||||
|
@ -71,7 +73,7 @@ func SendMailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash comm
|
|||
sig := MailServerResponseSignal{
|
||||
RequestID: requestID,
|
||||
LastEnvelopeHash: lastEnvelopeHash,
|
||||
Cursor: string(cursor),
|
||||
Cursor: hex.EncodeToString(cursor),
|
||||
ErrorMsg: errorMsg,
|
||||
}
|
||||
send(EventMailServerRequestCompleted, sig)
|
||||
|
|
Loading…
Reference in New Issue