diff --git a/Gopkg.lock b/Gopkg.lock index f4a7b04a0..08b783c46 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -822,12 +822,12 @@ revision = "fbcc46a78cd43fef95a110df664aab513116a850" [[projects]] - digest = "1:7f2aeb661efc22a59fff9f7e223c59844e32433c18b3567ad24d23758ecf2f64" + digest = "1:2c5092efed72e4c33a9d5f2ca6970609ed959a07b08a6b85fe6e7b70df3ed210" name = "github.com/status-im/whisper" packages = ["whisperv6"] pruneopts = "NUT" - revision = "6e5af097a1a80e2e407ff097d4dd22e747768d3b" - version = "v1.4.2" + revision = "82a7734c369137d50fcbcae86230d83db6bfc885" + version = "v1.4.4" [[projects]] digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" diff --git a/Gopkg.toml b/Gopkg.toml index 747b413f5..0d8516583 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -29,7 +29,7 @@ [[constraint]] name = "github.com/status-im/whisper" - version = "=v1.4.2" + version = "=v1.4.4" [[override]] name = "github.com/golang/protobuf" diff --git a/Makefile b/Makefile index 4c55985ee..967e3c3ad 100644 --- a/Makefile +++ b/Makefile @@ -241,7 +241,8 @@ deploy-install: go get -u github.com/c4milo/github-release gen-install: - go get -u github.com/jteeuwen/go-bindata/... + go get -u github.com/jteeuwen/go-bindata + go get -u github.com/jteeuwen/go-bindata/go-bindata go get -u github.com/golang/protobuf/protoc-gen-go mock-install: ##@other Install mocking tools diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 93e8c1f9e..2ef00a29e 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -384,6 +384,10 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque return fmt.Errorf("requests per seconds limit exceeded") } + if err := request.Validate(); err != nil { + return fmt.Errorf("request is invalid: %v", err) + } + iter := s.createIterator(request.Lower, request.Upper, request.Cursor) defer iter.Release() @@ -710,8 +714,8 @@ func (s *WMailServer) checkMsgSignature(msg *whisper.ReceivedMessage, id []byte) return nil } -// bloomFromReceivedMessage gor a given whisper.ReceivedMessage it extracts the -// used bloom filter +// bloomFromReceivedMessage for a given whisper.ReceivedMessage it extracts the +// used bloom filter. func (s *WMailServer) bloomFromReceivedMessage(msg *whisper.ReceivedMessage) ([]byte, error) { payloadSize := len(msg.Payload) diff --git a/services/shhext/api.go b/services/shhext/api.go index 48db72fc4..7ade09a5e 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -46,7 +46,7 @@ var ( // PAYLOADS // ----- -// MessagesRequest is a payload send to a MailServer to get messages. +// MessagesRequest is a RequestMessages() request payload. type MessagesRequest struct { // MailServerPeer is MailServer's enode address. MailServerPeer string `json:"mailServerPeer"` @@ -75,9 +75,6 @@ type MessagesRequest struct { // SymKeyID is an ID of a symmetric key to authenticate to MailServer. // It's derived from MailServer password. - // - // It's also possible to authenticate request with MailServerPeer - // public key. SymKeyID string `json:"symKeyID"` // Timeout is the time to live of the request specified in seconds. @@ -105,6 +102,43 @@ func (r *MessagesRequest) setDefaults(now time.Time) { } } +// SyncMessagesRequest is a SyncMessages() request payload. +type SyncMessagesRequest struct { + // MailServerPeer is MailServer's enode address. + MailServerPeer string `json:"mailServerPeer"` + + // From is a lower bound of time range (optional). + // Default is 24 hours back from now. + From uint32 `json:"from"` + + // To is a upper bound of time range (optional). + // Default is now. + To uint32 `json:"to"` + + // Limit determines the number of messages sent by the mail server + // for the current paginated request + Limit uint32 `json:"limit"` + + // Cursor is used as starting point for paginated requests + Cursor string `json:"cursor"` + + // Topics is a list of Whisper topics. + // If empty, a full bloom filter will be used. + Topics []whisper.TopicType `json:"topics"` +} + +// SyncMessagesResponse is a response from the mail server +// to which SyncMessagesRequest was sent. +type SyncMessagesResponse struct { + // Cursor from the response can be used to retrieve more messages + // for the previous request. + Cursor string `json:"cursor"` + + // Error indicates that something wrong happened when sending messages + // to the requester. + Error string `json:"error"` +} + // ----- // PUBLIC API // ----- @@ -197,6 +231,81 @@ func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hex return hash[:], nil } +// createSyncMailRequest creates SyncMailRequest. It uses a full bloom filter +// if no topics are given. +func createSyncMailRequest(r SyncMessagesRequest) (whisper.SyncMailRequest, error) { + var bloom []byte + if len(r.Topics) > 0 { + bloom = topicsToBloom(r.Topics...) + } else { + bloom = whisper.MakeFullNodeBloom() + } + + cursor, err := hex.DecodeString(r.Cursor) + if err != nil { + return whisper.SyncMailRequest{}, err + } + + return whisper.SyncMailRequest{ + Lower: r.From, + Upper: r.To, + Bloom: bloom, + Limit: r.Limit, + Cursor: cursor, + }, nil +} + +func createSyncMessagesResponse(r whisper.SyncEventResponse) SyncMessagesResponse { + return SyncMessagesResponse{ + Cursor: hex.EncodeToString(r.Cursor), + Error: r.Error, + } +} + +// SyncMessages sends a request to a given MailServerPeer to sync historic messages. +// MailServerPeers needs to be added as a trusted peer first. +func (api *PublicAPI) SyncMessages(ctx context.Context, r SyncMessagesRequest) (SyncMessagesResponse, error) { + var response SyncMessagesResponse + + mailServerEnode, err := enode.ParseV4(r.MailServerPeer) + if err != nil { + return response, fmt.Errorf("invalid MailServerPeer: %v", err) + } + + request, err := createSyncMailRequest(r) + if err != nil { + return response, fmt.Errorf("failed to create a sync mail request: %v", err) + } + + if err := api.service.w.SyncMessages(mailServerEnode.ID().Bytes(), request); err != nil { + return response, fmt.Errorf("failed to send a sync request: %v", err) + } + + // Wait for the response which is received asynchronously as a p2p packet. + // This packet handler will send an event which contains the response payload. + events := make(chan whisper.EnvelopeEvent) + sub := api.service.w.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + + for { + select { + case event := <-events: + if event.Event != whisper.EventMailServerSyncFinished { + continue + } + + log.Info("received EventMailServerSyncFinished event", "data", event.Data) + + if resp, ok := event.Data.(whisper.SyncEventResponse); ok { + return createSyncMessagesResponse(resp), nil + } + return response, fmt.Errorf("did not understand the response event data") + case <-ctx.Done(): + return response, ctx.Err() + } + } +} + // GetNewFilterMessages is a prototype method with deduplication func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]*whisper.Message, error) { msgs, err := api.publicAPI.GetFilterMessages(filterID) diff --git a/services/shhext/api_test.go b/services/shhext/api_test.go index f7f2fc7fe..9d007eedf 100644 --- a/services/shhext/api_test.go +++ b/services/shhext/api_test.go @@ -1,6 +1,7 @@ package shhext import ( + "context" "encoding/hex" "fmt" "testing" @@ -136,3 +137,85 @@ func TestCreateBloomFilter(t *testing.T) { func stringToTopic(s string) whisper.TopicType { return whisper.BytesToTopic([]byte(s)) } + +func TestCreateSyncMailRequest(t *testing.T) { + testCases := []struct { + Name string + Req SyncMessagesRequest + Verify func(*testing.T, whisper.SyncMailRequest) + Error string + }{ + { + Name: "no topics", + Req: SyncMessagesRequest{}, + Verify: func(t *testing.T, r whisper.SyncMailRequest) { + require.Equal(t, whisper.MakeFullNodeBloom(), r.Bloom) + }, + }, + { + Name: "some topics", + Req: SyncMessagesRequest{ + Topics: []whisper.TopicType{{0x01, 0xff, 0xff, 0xff}}, + }, + Verify: func(t *testing.T, r whisper.SyncMailRequest) { + expectedBloom := whisper.TopicToBloom(whisper.TopicType{0x01, 0xff, 0xff, 0xff}) + require.Equal(t, expectedBloom, r.Bloom) + }, + }, + { + Name: "decode cursor", + Req: SyncMessagesRequest{ + Cursor: hex.EncodeToString([]byte{0x01, 0x02, 0x03}), + }, + Verify: func(t *testing.T, r whisper.SyncMailRequest) { + require.Equal(t, []byte{0x01, 0x02, 0x03}, r.Cursor) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + r, err := createSyncMailRequest(tc.Req) + if tc.Error != "" { + require.EqualError(t, err, tc.Error) + } + tc.Verify(t, r) + }) + } +} + +func TestSyncMessagesErrors(t *testing.T) { + validEnode := "enode://e8a7c03b58911e98bbd66accb2a55d57683f35b23bf9dfca89e5e244eb5cc3f25018b4112db507faca34fb69ffb44b362f79eda97a669a8df29c72e654416784@127.0.0.1:30404" + + testCases := []struct { + Name string + Req SyncMessagesRequest + Resp SyncMessagesResponse + Error string + }{ + { + Name: "invalid MailServerPeer", + Req: SyncMessagesRequest{MailServerPeer: "invalid-scheme://"}, + Error: `invalid MailServerPeer: invalid URL scheme, want "enode"`, + }, + { + Name: "failed to create SyncMailRequest", + Req: SyncMessagesRequest{ + MailServerPeer: validEnode, + Cursor: "a", // odd number of characters is an invalid hex representation + }, + Error: "failed to create a sync mail request: encoding/hex: odd length hex string", + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + api := PublicAPI{} + resp, err := api.SyncMessages(context.TODO(), tc.Req) + if tc.Error != "" { + require.EqualError(t, err, tc.Error) + } + require.EqualValues(t, tc.Resp, resp) + }) + } +} diff --git a/t/e2e/whisper/whisper_mailbox_test.go b/t/e2e/whisper/whisper_mailbox_test.go index 248b4bf8b..d6fb4cc04 100644 --- a/t/e2e/whisper/whisper_mailbox_test.go +++ b/t/e2e/whisper/whisper_mailbox_test.go @@ -1,6 +1,7 @@ package whisper import ( + "context" "encoding/hex" "encoding/json" "fmt" @@ -11,6 +12,8 @@ import ( "testing" "time" + "github.com/status-im/status-go/services/shhext" + "os" "github.com/ethereum/go-ethereum/common" @@ -529,15 +532,49 @@ func (s *WhisperMailboxSuite) TestSyncBetweenTwoMailServers() { }, time.Second*5, time.Millisecond*100) s.Require().NoError(err) - err = emptyMailboxWhisperService.SyncMessages( - mailbox.StatusNode().Server().Self().ID().Bytes(), - whisper.SyncMailRequest{ - Lower: 0, - Upper: uint32(time.Now().Unix()), - Bloom: whisper.MakeFullNodeBloom(), + emptyMailboxRPCClient := emptyMailbox.StatusNode().RPCPrivateClient() + s.Require().NotNil(emptyMailboxRPCClient) + + // Ask to sync the first batch of messages. + // We artificially set Limit to 1 in order to test pagination. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + var syncMessagesResponse shhext.SyncMessagesResponse + err = emptyMailboxRPCClient.CallContext( + ctx, + &syncMessagesResponse, + "shhext_syncMessages", + shhext.SyncMessagesRequest{ + MailServerPeer: mailbox.StatusNode().Server().Self().String(), + From: 0, + To: uint32(time.Now().Unix()), + Limit: 1, }, ) s.Require().NoError(err) + s.Require().NotEmpty(syncMessagesResponse.Cursor) + s.Require().Empty(syncMessagesResponse.Error) + + // Ask to sync the rest of messages. + ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + err = emptyMailboxRPCClient.CallContext( + ctx, + &syncMessagesResponse, + "shhext_syncMessages", + shhext.SyncMessagesRequest{ + MailServerPeer: mailbox.StatusNode().Server().Self().String(), + From: 0, + To: uint32(time.Now().Unix()), + Limit: 10, + Cursor: syncMessagesResponse.Cursor, + }, + ) + s.Require().NoError(err) + s.Require().Empty(syncMessagesResponse.Cursor) + s.Require().Empty(syncMessagesResponse.Error) // create and start a client client, stop := s.startBackend("client") @@ -584,7 +621,7 @@ func (s *WhisperMailboxSuite) TestSyncBetweenTwoMailServers() { // get messages messages := s.getMessagesByMessageFilterID(clientRPCClient, messageFilterID) - s.Require().NotEmpty(messages) + s.Require().Len(messages, envelopesCount) } func (s *WhisperMailboxSuite) waitForEnvelopeEvents(events chan whisper.EnvelopeEvent, hashes []string, event whisper.EventType) { diff --git a/vendor/github.com/status-im/whisper/whisperv6/doc.go b/vendor/github.com/status-im/whisper/whisperv6/doc.go index f911d264f..eb1da0618 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/doc.go +++ b/vendor/github.com/status-im/whisper/whisperv6/doc.go @@ -33,6 +33,8 @@ particularly the notion of singular endpoints. package whisperv6 import ( + "errors" + "fmt" "time" ) @@ -80,6 +82,8 @@ const ( DefaultTTL = 50 // seconds DefaultSyncAllowance = 10 // seconds + + MaxLimitInSyncMailRequest = 1000 ) // MailServer represents a mail server, capable of @@ -109,6 +113,19 @@ type SyncMailRequest struct { Cursor []byte } +// Validate checks request's fields if they are valid. +func (r SyncMailRequest) Validate() error { + if r.Limit > MaxLimitInSyncMailRequest { + return fmt.Errorf("invalid 'Limit' value, expected lower than %d", MaxLimitInSyncMailRequest) + } + + if r.Lower > r.Upper { + return errors.New("invalid 'Lower' value, can't be greater than 'Upper'") + } + + return nil +} + // SyncResponse is a struct representing a response sent to the peer // asking for syncing archived envelopes. type SyncResponse struct { @@ -117,10 +134,3 @@ type SyncResponse struct { Final bool // if true it means all envelopes were processed Error string } - -// IsFinal returns true if it's the final response for the request. -// It might be a successful final response (r.Final being true) -// or an error occured (r.Error being not empty). -func (r SyncResponse) IsFinal() bool { - return r.Final || r.Error != "" -} diff --git a/vendor/github.com/status-im/whisper/whisperv6/events.go b/vendor/github.com/status-im/whisper/whisperv6/events.go index 2faabe1f7..380f6d92a 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/events.go +++ b/vendor/github.com/status-im/whisper/whisperv6/events.go @@ -27,6 +27,8 @@ const ( EventMailServerRequestExpired EventType = "mailserver.request.expired" // EventMailServerEnvelopeArchived fires after an envelope has been archived EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived" + // EventMailServerSyncFinished fires when the sync of messages is finished. + EventMailServerSyncFinished EventType = "mailserver.sync.finished" ) // EnvelopeEvent used for envelopes events. @@ -37,3 +39,10 @@ type EnvelopeEvent struct { Peer enode.ID Data interface{} } + +// SyncEventResponse is a response from the Mail Server +// form which the peer received envelopes. +type SyncEventResponse struct { + Cursor []byte + Error string +} diff --git a/vendor/github.com/status-im/whisper/whisperv6/whisper.go b/vendor/github.com/status-im/whisper/whisperv6/whisper.go index 2b92d588e..7917bd982 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/whisper.go +++ b/vendor/github.com/status-im/whisper/whisperv6/whisper.go @@ -463,6 +463,10 @@ func (whisper *Whisper) SyncMessages(peerID []byte, req SyncMailRequest) error { return err } + if err := req.Validate(); err != nil { + return err + } + return p2p.Send(p.ws, p2pSyncRequestCode, req) } @@ -975,10 +979,14 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { // TODO(adam): should we limit who can send this request? if whisper.mailServer != nil { var request SyncMailRequest - if err = packet.Decode(&request); err != nil { + if err := packet.Decode(&request); err != nil { return fmt.Errorf("failed to decode p2pSyncRequestCode payload: %v", err) } + if err := request.Validate(); err != nil { + return fmt.Errorf("sync mail request was invalid: %v", err) + } + if err := whisper.mailServer.SyncMail(p, request); err != nil { log.Error("failed to sync envelopes", "peer", p.peer.ID().String()) } @@ -998,17 +1006,21 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { return fmt.Errorf("failed to decode p2pSyncResponseCode payload: %v", err) } - log.Info("received sync response", "count", len(resp.Envelopes), "final", resp.Final, "err", resp.Error) + log.Info("received sync response", "count", len(resp.Envelopes), "final", resp.Final, "err", resp.Error, "cursor", resp.Cursor) for _, envelope := range resp.Envelopes { whisper.mailServer.Archive(envelope) } - if resp.Error != "" { - log.Error("failed to sync envelopes", "err", resp.Error) - } - if resp.Final { - log.Info("finished to sync envelopes successfully") + if resp.Error != "" || resp.Final { + whisper.envelopeFeed.Send(EnvelopeEvent{ + Event: EventMailServerSyncFinished, + Peer: p.peer.ID(), + Data: SyncEventResponse{ + Cursor: resp.Cursor, + Error: resp.Error, + }, + }) } } case p2pRequestCode: