add SyncMessages method to shhext api (#1309)

This commit is contained in:
Adam Babik 2018-12-14 12:21:34 +01:00 committed by GitHub
parent 2f65dd58f9
commit 74cb16c456
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 297 additions and 32 deletions

6
Gopkg.lock generated
View File

@ -822,12 +822,12 @@
revision = "fbcc46a78cd43fef95a110df664aab513116a850"
[[projects]]
digest = "1:7f2aeb661efc22a59fff9f7e223c59844e32433c18b3567ad24d23758ecf2f64"
digest = "1:2c5092efed72e4c33a9d5f2ca6970609ed959a07b08a6b85fe6e7b70df3ed210"
name = "github.com/status-im/whisper"
packages = ["whisperv6"]
pruneopts = "NUT"
revision = "6e5af097a1a80e2e407ff097d4dd22e747768d3b"
version = "v1.4.2"
revision = "82a7734c369137d50fcbcae86230d83db6bfc885"
version = "v1.4.4"
[[projects]]
digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e"

View File

@ -29,7 +29,7 @@
[[constraint]]
name = "github.com/status-im/whisper"
version = "=v1.4.2"
version = "=v1.4.4"
[[override]]
name = "github.com/golang/protobuf"

View File

@ -241,7 +241,8 @@ deploy-install:
go get -u github.com/c4milo/github-release
gen-install:
go get -u github.com/jteeuwen/go-bindata/...
go get -u github.com/jteeuwen/go-bindata
go get -u github.com/jteeuwen/go-bindata/go-bindata
go get -u github.com/golang/protobuf/protoc-gen-go
mock-install: ##@other Install mocking tools

View File

@ -384,6 +384,10 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque
return fmt.Errorf("requests per seconds limit exceeded")
}
if err := request.Validate(); err != nil {
return fmt.Errorf("request is invalid: %v", err)
}
iter := s.createIterator(request.Lower, request.Upper, request.Cursor)
defer iter.Release()
@ -710,8 +714,8 @@ func (s *WMailServer) checkMsgSignature(msg *whisper.ReceivedMessage, id []byte)
return nil
}
// bloomFromReceivedMessage gor a given whisper.ReceivedMessage it extracts the
// used bloom filter
// bloomFromReceivedMessage for a given whisper.ReceivedMessage it extracts the
// used bloom filter.
func (s *WMailServer) bloomFromReceivedMessage(msg *whisper.ReceivedMessage) ([]byte, error) {
payloadSize := len(msg.Payload)

View File

@ -46,7 +46,7 @@ var (
// PAYLOADS
// -----
// MessagesRequest is a payload send to a MailServer to get messages.
// MessagesRequest is a RequestMessages() request payload.
type MessagesRequest struct {
// MailServerPeer is MailServer's enode address.
MailServerPeer string `json:"mailServerPeer"`
@ -75,9 +75,6 @@ type MessagesRequest struct {
// SymKeyID is an ID of a symmetric key to authenticate to MailServer.
// It's derived from MailServer password.
//
// It's also possible to authenticate request with MailServerPeer
// public key.
SymKeyID string `json:"symKeyID"`
// Timeout is the time to live of the request specified in seconds.
@ -105,6 +102,43 @@ func (r *MessagesRequest) setDefaults(now time.Time) {
}
}
// SyncMessagesRequest is a SyncMessages() request payload.
type SyncMessagesRequest struct {
// MailServerPeer is MailServer's enode address.
MailServerPeer string `json:"mailServerPeer"`
// From is a lower bound of time range (optional).
// Default is 24 hours back from now.
From uint32 `json:"from"`
// To is a upper bound of time range (optional).
// Default is now.
To uint32 `json:"to"`
// Limit determines the number of messages sent by the mail server
// for the current paginated request
Limit uint32 `json:"limit"`
// Cursor is used as starting point for paginated requests
Cursor string `json:"cursor"`
// Topics is a list of Whisper topics.
// If empty, a full bloom filter will be used.
Topics []whisper.TopicType `json:"topics"`
}
// SyncMessagesResponse is a response from the mail server
// to which SyncMessagesRequest was sent.
type SyncMessagesResponse struct {
// Cursor from the response can be used to retrieve more messages
// for the previous request.
Cursor string `json:"cursor"`
// Error indicates that something wrong happened when sending messages
// to the requester.
Error string `json:"error"`
}
// -----
// PUBLIC API
// -----
@ -197,6 +231,81 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex
return hash[:], nil
}
// createSyncMailRequest creates SyncMailRequest. It uses a full bloom filter
// if no topics are given.
func createSyncMailRequest(r SyncMessagesRequest) (whisper.SyncMailRequest, error) {
var bloom []byte
if len(r.Topics) > 0 {
bloom = topicsToBloom(r.Topics...)
} else {
bloom = whisper.MakeFullNodeBloom()
}
cursor, err := hex.DecodeString(r.Cursor)
if err != nil {
return whisper.SyncMailRequest{}, err
}
return whisper.SyncMailRequest{
Lower: r.From,
Upper: r.To,
Bloom: bloom,
Limit: r.Limit,
Cursor: cursor,
}, nil
}
func createSyncMessagesResponse(r whisper.SyncEventResponse) SyncMessagesResponse {
return SyncMessagesResponse{
Cursor: hex.EncodeToString(r.Cursor),
Error: r.Error,
}
}
// SyncMessages sends a request to a given MailServerPeer to sync historic messages.
// MailServerPeers needs to be added as a trusted peer first.
func (api *PublicAPI) SyncMessages(ctx context.Context, r SyncMessagesRequest) (SyncMessagesResponse, error) {
var response SyncMessagesResponse
mailServerEnode, err := enode.ParseV4(r.MailServerPeer)
if err != nil {
return response, fmt.Errorf("invalid MailServerPeer: %v", err)
}
request, err := createSyncMailRequest(r)
if err != nil {
return response, fmt.Errorf("failed to create a sync mail request: %v", err)
}
if err := api.service.w.SyncMessages(mailServerEnode.ID().Bytes(), request); err != nil {
return response, fmt.Errorf("failed to send a sync request: %v", err)
}
// Wait for the response which is received asynchronously as a p2p packet.
// This packet handler will send an event which contains the response payload.
events := make(chan whisper.EnvelopeEvent)
sub := api.service.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
for {
select {
case event := <-events:
if event.Event != whisper.EventMailServerSyncFinished {
continue
}
log.Info("received EventMailServerSyncFinished event", "data", event.Data)
if resp, ok := event.Data.(whisper.SyncEventResponse); ok {
return createSyncMessagesResponse(resp), nil
}
return response, fmt.Errorf("did not understand the response event data")
case <-ctx.Done():
return response, ctx.Err()
}
}
}
// GetNewFilterMessages is a prototype method with deduplication
func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]*whisper.Message, error) {
msgs, err := api.publicAPI.GetFilterMessages(filterID)

View File

@ -1,6 +1,7 @@
package shhext
import (
"context"
"encoding/hex"
"fmt"
"testing"
@ -136,3 +137,85 @@ func TestCreateBloomFilter(t *testing.T) {
func stringToTopic(s string) whisper.TopicType {
return whisper.BytesToTopic([]byte(s))
}
func TestCreateSyncMailRequest(t *testing.T) {
testCases := []struct {
Name string
Req SyncMessagesRequest
Verify func(*testing.T, whisper.SyncMailRequest)
Error string
}{
{
Name: "no topics",
Req: SyncMessagesRequest{},
Verify: func(t *testing.T, r whisper.SyncMailRequest) {
require.Equal(t, whisper.MakeFullNodeBloom(), r.Bloom)
},
},
{
Name: "some topics",
Req: SyncMessagesRequest{
Topics: []whisper.TopicType{{0x01, 0xff, 0xff, 0xff}},
},
Verify: func(t *testing.T, r whisper.SyncMailRequest) {
expectedBloom := whisper.TopicToBloom(whisper.TopicType{0x01, 0xff, 0xff, 0xff})
require.Equal(t, expectedBloom, r.Bloom)
},
},
{
Name: "decode cursor",
Req: SyncMessagesRequest{
Cursor: hex.EncodeToString([]byte{0x01, 0x02, 0x03}),
},
Verify: func(t *testing.T, r whisper.SyncMailRequest) {
require.Equal(t, []byte{0x01, 0x02, 0x03}, r.Cursor)
},
},
}
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
r, err := createSyncMailRequest(tc.Req)
if tc.Error != "" {
require.EqualError(t, err, tc.Error)
}
tc.Verify(t, r)
})
}
}
func TestSyncMessagesErrors(t *testing.T) {
validEnode := "enode://e8a7c03b58911e98bbd66accb2a55d57683f35b23bf9dfca89e5e244eb5cc3f25018b4112db507faca34fb69ffb44b362f79eda97a669a8df29c72e654416784@127.0.0.1:30404"
testCases := []struct {
Name string
Req SyncMessagesRequest
Resp SyncMessagesResponse
Error string
}{
{
Name: "invalid MailServerPeer",
Req: SyncMessagesRequest{MailServerPeer: "invalid-scheme://"},
Error: `invalid MailServerPeer: invalid URL scheme, want "enode"`,
},
{
Name: "failed to create SyncMailRequest",
Req: SyncMessagesRequest{
MailServerPeer: validEnode,
Cursor: "a", // odd number of characters is an invalid hex representation
},
Error: "failed to create a sync mail request: encoding/hex: odd length hex string",
},
}
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
api := PublicAPI{}
resp, err := api.SyncMessages(context.TODO(), tc.Req)
if tc.Error != "" {
require.EqualError(t, err, tc.Error)
}
require.EqualValues(t, tc.Resp, resp)
})
}
}

View File

@ -1,6 +1,7 @@
package whisper
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
@ -11,6 +12,8 @@ import (
"testing"
"time"
"github.com/status-im/status-go/services/shhext"
"os"
"github.com/ethereum/go-ethereum/common"
@ -529,15 +532,49 @@ func (s *WhisperMailboxSuite) TestSyncBetweenTwoMailServers() {
}, time.Second*5, time.Millisecond*100)
s.Require().NoError(err)
err = emptyMailboxWhisperService.SyncMessages(
mailbox.StatusNode().Server().Self().ID().Bytes(),
whisper.SyncMailRequest{
Lower: 0,
Upper: uint32(time.Now().Unix()),
Bloom: whisper.MakeFullNodeBloom(),
emptyMailboxRPCClient := emptyMailbox.StatusNode().RPCPrivateClient()
s.Require().NotNil(emptyMailboxRPCClient)
// Ask to sync the first batch of messages.
// We artificially set Limit to 1 in order to test pagination.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
var syncMessagesResponse shhext.SyncMessagesResponse
err = emptyMailboxRPCClient.CallContext(
ctx,
&syncMessagesResponse,
"shhext_syncMessages",
shhext.SyncMessagesRequest{
MailServerPeer: mailbox.StatusNode().Server().Self().String(),
From: 0,
To: uint32(time.Now().Unix()),
Limit: 1,
},
)
s.Require().NoError(err)
s.Require().NotEmpty(syncMessagesResponse.Cursor)
s.Require().Empty(syncMessagesResponse.Error)
// Ask to sync the rest of messages.
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
err = emptyMailboxRPCClient.CallContext(
ctx,
&syncMessagesResponse,
"shhext_syncMessages",
shhext.SyncMessagesRequest{
MailServerPeer: mailbox.StatusNode().Server().Self().String(),
From: 0,
To: uint32(time.Now().Unix()),
Limit: 10,
Cursor: syncMessagesResponse.Cursor,
},
)
s.Require().NoError(err)
s.Require().Empty(syncMessagesResponse.Cursor)
s.Require().Empty(syncMessagesResponse.Error)
// create and start a client
client, stop := s.startBackend("client")
@ -584,7 +621,7 @@ func (s *WhisperMailboxSuite) TestSyncBetweenTwoMailServers() {
// get messages
messages := s.getMessagesByMessageFilterID(clientRPCClient, messageFilterID)
s.Require().NotEmpty(messages)
s.Require().Len(messages, envelopesCount)
}
func (s *WhisperMailboxSuite) waitForEnvelopeEvents(events chan whisper.EnvelopeEvent, hashes []string, event whisper.EventType) {

View File

@ -33,6 +33,8 @@ particularly the notion of singular endpoints.
package whisperv6
import (
"errors"
"fmt"
"time"
)
@ -80,6 +82,8 @@ const (
DefaultTTL = 50 // seconds
DefaultSyncAllowance = 10 // seconds
MaxLimitInSyncMailRequest = 1000
)
// MailServer represents a mail server, capable of
@ -109,6 +113,19 @@ type SyncMailRequest struct {
Cursor []byte
}
// Validate checks request's fields if they are valid.
func (r SyncMailRequest) Validate() error {
if r.Limit > MaxLimitInSyncMailRequest {
return fmt.Errorf("invalid 'Limit' value, expected lower than %d", MaxLimitInSyncMailRequest)
}
if r.Lower > r.Upper {
return errors.New("invalid 'Lower' value, can't be greater than 'Upper'")
}
return nil
}
// SyncResponse is a struct representing a response sent to the peer
// asking for syncing archived envelopes.
type SyncResponse struct {
@ -117,10 +134,3 @@ type SyncResponse struct {
Final bool // if true it means all envelopes were processed
Error string
}
// IsFinal returns true if it's the final response for the request.
// It might be a successful final response (r.Final being true)
// or an error occured (r.Error being not empty).
func (r SyncResponse) IsFinal() bool {
return r.Final || r.Error != ""
}

View File

@ -27,6 +27,8 @@ const (
EventMailServerRequestExpired EventType = "mailserver.request.expired"
// EventMailServerEnvelopeArchived fires after an envelope has been archived
EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived"
// EventMailServerSyncFinished fires when the sync of messages is finished.
EventMailServerSyncFinished EventType = "mailserver.sync.finished"
)
// EnvelopeEvent used for envelopes events.
@ -37,3 +39,10 @@ type EnvelopeEvent struct {
Peer enode.ID
Data interface{}
}
// SyncEventResponse is a response from the Mail Server
// form which the peer received envelopes.
type SyncEventResponse struct {
Cursor []byte
Error string
}

View File

@ -463,6 +463,10 @@ func (whisper *Whisper) SyncMessages(peerID []byte, req SyncMailRequest) error {
return err
}
if err := req.Validate(); err != nil {
return err
}
return p2p.Send(p.ws, p2pSyncRequestCode, req)
}
@ -975,10 +979,14 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// TODO(adam): should we limit who can send this request?
if whisper.mailServer != nil {
var request SyncMailRequest
if err = packet.Decode(&request); err != nil {
if err := packet.Decode(&request); err != nil {
return fmt.Errorf("failed to decode p2pSyncRequestCode payload: %v", err)
}
if err := request.Validate(); err != nil {
return fmt.Errorf("sync mail request was invalid: %v", err)
}
if err := whisper.mailServer.SyncMail(p, request); err != nil {
log.Error("failed to sync envelopes", "peer", p.peer.ID().String())
}
@ -998,17 +1006,21 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
return fmt.Errorf("failed to decode p2pSyncResponseCode payload: %v", err)
}
log.Info("received sync response", "count", len(resp.Envelopes), "final", resp.Final, "err", resp.Error)
log.Info("received sync response", "count", len(resp.Envelopes), "final", resp.Final, "err", resp.Error, "cursor", resp.Cursor)
for _, envelope := range resp.Envelopes {
whisper.mailServer.Archive(envelope)
}
if resp.Error != "" {
log.Error("failed to sync envelopes", "err", resp.Error)
}
if resp.Final {
log.Info("finished to sync envelopes successfully")
if resp.Error != "" || resp.Final {
whisper.envelopeFeed.Send(EnvelopeEvent{
Event: EventMailServerSyncFinished,
Peer: p.peer.ID(),
Data: SyncEventResponse{
Cursor: resp.Cursor,
Error: resp.Error,
},
})
}
}
case p2pRequestCode: