From 913dbfca9bdcac13e4bd1a48d03a6b753938781d Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Thu, 6 Dec 2018 10:48:28 +0100 Subject: [PATCH] Support Mail Server data synchronization (#1302) These changes add a support for syncing data between two Mail Servers. It does not give an easy way to start syncing. This will be solved in the next PR. --- Gopkg.lock | 6 +- Gopkg.toml | 2 +- cmd/node-canary/main.go | 24 +- mailserver/mailserver.go | 230 +++++++++++------- mailserver/mailserver_db_panic_test.go | 4 +- mailserver/mailserver_test.go | 62 +++-- t/e2e/whisper/whisper_mailbox_test.go | 178 ++++++++++++-- .../status-im/whisper/whisperv6/doc.go | 34 +++ .../whisper/whisperv6/mailserver_response.go | 13 +- .../status-im/whisper/whisperv6/whisper.go | 61 ++++- 10 files changed, 482 insertions(+), 132 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index c6ec200fb..0c3f60f3a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -822,12 +822,12 @@ revision = "fbcc46a78cd43fef95a110df664aab513116a850" [[projects]] - digest = "1:5c62af344925b846377386dec72e06eb3e1e15222542b3d22fe0f0da75c7f090" + digest = "1:7f2aeb661efc22a59fff9f7e223c59844e32433c18b3567ad24d23758ecf2f64" name = "github.com/status-im/whisper" packages = ["whisperv6"] pruneopts = "NUT" - revision = "96d2199ed511430c642d877afe7bacaac5f37426" - version = "v1.4.1" + revision = "6e5af097a1a80e2e407ff097d4dd22e747768d3b" + version = "v1.4.2" [[projects]] digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" diff --git a/Gopkg.toml b/Gopkg.toml index 254703bb2..747b413f5 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -29,7 +29,7 @@ [[constraint]] name = "github.com/status-im/whisper" - version = "=v1.4.1" + version = "=v1.4.2" [[override]] name = "github.com/golang/protobuf" diff --git a/cmd/node-canary/main.go b/cmd/node-canary/main.go index 9df9fe6fa..d72141a46 100644 --- a/cmd/node-canary/main.go +++ b/cmd/node-canary/main.go @@ -181,6 +181,13 @@ func verifyMailserverBehavior(mailserverNode *enode.Node) { } requestID := common.BytesToHash(requestIDBytes) + // wait for mailserver request sent event + err = waitForMailServerRequestSent(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second) + if err != nil { + logger.Error("Error waiting for mailserver request sent event", "error", err) + os.Exit(3) + } + // wait for mailserver response resp, err := waitForMailServerResponse(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second) if err != nil { @@ -313,6 +320,21 @@ func joinPublicChat(w *whisper.Whisper, rpcClient *rpc.Client, name string) (str return keyID, topic, filterID, err } +func waitForMailServerRequestSent(events chan whisper.EnvelopeEvent, requestID common.Hash, timeout time.Duration) error { + timeoutTimer := time.NewTimer(timeout) + for { + select { + case event := <-events: + if event.Hash == requestID && event.Event == whisper.EventMailServerRequestSent { + timeoutTimer.Stop() + return nil + } + case <-timeoutTimer.C: + return errors.New("timed out waiting for mailserver request sent") + } + } +} + func waitForMailServerResponse(events chan whisper.EnvelopeEvent, requestID common.Hash, timeout time.Duration) (*whisper.MailServerResponse, error) { timeoutTimer := time.NewTimer(timeout) for { @@ -345,7 +367,7 @@ func decodeMailServerResponse(event whisper.EnvelopeEvent) (*whisper.MailServerR case whisper.EventMailServerRequestExpired: return nil, errors.New("no messages available from mailserver") default: - return nil, errors.New("unexpected event type") + return nil, fmt.Errorf("unexpected event type: %v", event.Event) } } diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 9e06c9717..6e5f44ca3 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -62,6 +62,7 @@ var ( requestValidationErrorsCounter = metrics.NewRegisteredCounter("mailserver/requestValidationErrors", nil) processRequestErrorsCounter = metrics.NewRegisteredCounter("mailserver/processRequestErrors", nil) historicResponseErrorsCounter = metrics.NewRegisteredCounter("mailserver/historicResponseErrors", nil) + syncRequestsMeter = metrics.NewRegisteredMeter("mailserver/syncRequests", nil) ) const ( @@ -242,7 +243,10 @@ func (s *WMailServer) Archive(env *whisper.Envelope) { // DeliverMail sends mail to specified whisper peer. func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) { + defer recoverLevelDBPanics("DeliverMail") + log.Info("Delivering mail", "peerID", peerIDString(peer)) + requestsMeter.Mark(1) if peer == nil { @@ -257,8 +261,6 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) return } - defer recoverLevelDBPanics("DeliverMail") - var ( lower, upper uint32 bloom []byte @@ -288,24 +290,56 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) } log.Debug("Processing request", - "lower", lower, "upper", upper, + "lower", lower, + "upper", upper, "bloom", bloom, "limit", limit, "cursor", cursor, - "batch", batch) + "batch", batch, + ) if batch { requestsBatchedCounter.Inc(1) } - _, lastEnvelopeHash, nextPageCursor, err := s.processRequest( - peer, - lower, upper, + iter := s.createIterator(lower, upper, cursor) + defer iter.Release() + + bundles := make(chan []*whisper.Envelope, 5) + errCh := make(chan error) + + go func() { + for bundle := range bundles { + if err := s.sendEnvelopes(peer, bundle, batch); err != nil { + errCh <- err + break + } + } + close(errCh) + }() + + start := time.Now() + nextPageCursor, lastEnvelopeHash := s.processRequestInBundles( + iter, bloom, - limit, - cursor, - batch) - if err != nil { + int(limit), + bundles, + ) + requestProcessTimer.UpdateSince(start) + + // bundles channel can be closed now because the processing finished. + close(bundles) + + // Wait for the goroutine to finish the work. It may return an error. + if err := <-errCh; err != nil { + processRequestErrorsCounter.Inc(1) + log.Error("Error while processing mail server request", "err", err, "peerID", peerIDString(peer)) + s.trySendHistoricMessageErrorResponse(peer, request, err) + return + } + + // Processing of the request could be finished earlier due to iterator error. + if err := iter.Error(); err != nil { processRequestErrorsCounter.Inc(1) log.Error("Error while processing mail server request", "err", err, "peerID", peerIDString(peer)) s.trySendHistoricMessageErrorResponse(peer, request, err) @@ -322,6 +356,80 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) } } +// SyncMail syncs mail servers between two Mail Servers. +func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailRequest) error { + log.Info("Started syncing envelopes", "peer", peerIDString(peer), "req", request) + + defer recoverLevelDBPanics("SyncMail") + + syncRequestsMeter.Mark(1) + + // Check rate limiting for a requesting peer. + if s.exceedsPeerRequests(peer.ID()) { + requestErrorsCounter.Inc(1) + log.Error("Peer exceeded request per seconds limit", "peerID", peerIDString(peer)) + return fmt.Errorf("requests per seconds limit exceeded") + } + + iter := s.createIterator(request.Lower, request.Upper, request.Cursor) + defer iter.Release() + + bundles := make(chan []*whisper.Envelope, 5) + errCh := make(chan error) + + go func() { + for bundle := range bundles { + resp := whisper.SyncResponse{Envelopes: bundle} + if err := s.w.SendSyncResponse(peer, resp); err != nil { + errCh <- fmt.Errorf("failed to send sync response: %v", err) + break + } + } + close(errCh) + }() + + start := time.Now() + nextCursor, _ := s.processRequestInBundles( + iter, + request.Bloom, + int(request.Limit), + bundles, + ) + requestProcessTimer.UpdateSince(start) + + // bundles channel can be closed now because the processing finished. + close(bundles) + + // Wait for the goroutine to finish the work. It may return an error. + if err := <-errCh; err != nil { + _ = s.w.SendSyncResponse( + peer, + whisper.SyncResponse{Error: "failed to send a response"}, + ) + return err + } + + // Processing of the request could be finished earlier due to iterator error. + if err := iter.Error(); err != nil { + _ = s.w.SendSyncResponse( + peer, + whisper.SyncResponse{Error: "failed to process all envelopes"}, + ) + return fmt.Errorf("levelDB iterator failed: %v", err) + } + + log.Info("Finished syncing envelopes", "peer", peerIDString(peer)) + + if err := s.w.SendSyncResponse(peer, whisper.SyncResponse{ + Cursor: nextCursor, + Final: true, + }); err != nil { + return fmt.Errorf("failed to send the final sync response: %v", err) + } + + return nil +} + // exceedsPeerRequests in case limit its been setup on the current server and limit // allows the query, it will store/update new query time for the current peer. func (s *WMailServer) exceedsPeerRequests(peer []byte) bool { @@ -360,45 +468,24 @@ func (s *WMailServer) createIterator(lower, upper uint32, cursor cursorType) ite return i } -// processRequest processes the current request and re-sends all stored messages -// accomplishing lower and upper limits. The limit parameter determines the maximum number of -// messages to be sent back for the current request. -// The cursor parameter is used for pagination. -// After sending all the messages, a message of type p2pRequestCompleteCode is sent by the mailserver to -// the peer. -func (s *WMailServer) processRequest( - peer *whisper.Peer, - lower, upper uint32, - bloom []byte, - limit uint32, - cursor cursorType, - batch bool, -) (ret []*whisper.Envelope, lastEnvelopeHash common.Hash, nextPageCursor cursorType, err error) { - // Recover from possible goleveldb panics - defer func() { - if r := recover(); r != nil { - err = fmt.Errorf("recovered from panic in processRequest: %v", r) - } - }() - +// processRequestInBundles processes envelopes using an iterator and passes them +// to the output channel in bundles. +func (s *WMailServer) processRequestInBundles( + iter iterator.Iterator, bloom []byte, limit int, output chan<- []*whisper.Envelope, +) (cursorType, common.Hash) { var ( - sentEnvelopes uint32 - sentEnvelopesSize int64 + bundle []*whisper.Envelope + bundleSize uint32 + processedEnvelopes int + processedEnvelopesSize int64 + nextCursor cursorType + lastEnvelopeHash common.Hash ) - i := s.createIterator(lower, upper, cursor) - defer i.Release() - - var ( - bundle []*whisper.Envelope - bundleSize uint32 - ) - - start := time.Now() - - for i.Prev() { + for iter.Prev() { var envelope whisper.Envelope - decodeErr := rlp.DecodeBytes(i.Value(), &envelope) + + decodeErr := rlp.DecodeBytes(iter.Value(), &envelope) if decodeErr != nil { log.Error("failed to decode RLP", "err", decodeErr) continue @@ -409,7 +496,7 @@ func (s *WMailServer) processRequest( } newSize := bundleSize + whisper.EnvelopeHeaderLength + uint32(len(envelope.Data)) - limitReached := limit != noLimits && (int(sentEnvelopes)+len(bundle)) == int(limit) + limitReached := limit != noLimits && (processedEnvelopes+len(bundle)) == limit if !limitReached && newSize < s.w.MaxMessageSize() { bundle = append(bundle, &envelope) bundleSize = newSize @@ -417,30 +504,22 @@ func (s *WMailServer) processRequest( continue } - if peer == nil { - // used for test purposes - ret = append(ret, bundle...) - } else { - err = s.sendEnvelopes(peer, bundle, batch) - if err != nil { - return - } - } + output <- bundle + bundle = bundle[:0] + bundleSize = 0 - sentEnvelopes += uint32(len(bundle)) - sentEnvelopesSize += int64(bundleSize) + processedEnvelopes += len(bundle) + processedEnvelopesSize += int64(bundleSize) if limitReached { - bundle = nil - bundleSize = 0 - // When the limit is reached, the current retrieved envelope // is not included in the response. - // The nextPageCursor is a key used as a limit in a range and + // The nextCursor is a key used as a limit in a range and // is not included in the range, hence, we need to get // the previous iterator key. - i.Next() - nextPageCursor = i.Key() + // Because we iterate backwards, we use Next(). + iter.Next() + nextCursor = iter.Key() break } else { // Reset bundle information and add the last read envelope @@ -452,28 +531,15 @@ func (s *WMailServer) processRequest( lastEnvelopeHash = envelope.Hash() } - // Send any outstanding envelopes. + // There might be some outstanding elements in the bundle. if len(bundle) > 0 && bundleSize > 0 { - if peer == nil { - ret = append(ret, bundle...) - } else { - err = s.sendEnvelopes(peer, bundle, batch) - if err != nil { - return - } - } + output <- bundle } - requestProcessTimer.UpdateSince(start) - sentEnvelopesMeter.Mark(int64(sentEnvelopes)) - sentEnvelopesSizeMeter.Mark(sentEnvelopesSize) + sentEnvelopesMeter.Mark(int64(processedEnvelopes)) + sentEnvelopesSizeMeter.Mark(processedEnvelopesSize) - err = i.Error() - if err != nil { - err = fmt.Errorf("levelDB iterator error: %v", err) - } - - return + return nextCursor, lastEnvelopeHash } func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Envelope, batch bool) error { diff --git a/mailserver/mailserver_db_panic_test.go b/mailserver/mailserver_db_panic_test.go index e316d2bc4..61ade9888 100644 --- a/mailserver/mailserver_db_panic_test.go +++ b/mailserver/mailserver_db_panic_test.go @@ -54,9 +54,7 @@ func (s *MailServerDBPanicSuite) TestArchive() { func (s *MailServerDBPanicSuite) TestDeliverMail() { defer s.testPanicRecover("DeliverMail") - _, _, _, err := s.server.processRequest(nil, 10, 20, []byte{}, 0, nil, false) - s.Error(err) - s.Equal("recovered from panic in processRequest: panicDB panic on NewIterator", err.Error()) + s.server.DeliverMail(&whisper.Peer{}, &whisper.Envelope{}) } func (s *MailServerDBPanicSuite) testPanicRecover(method string) { diff --git a/mailserver/mailserver_test.go b/mailserver/mailserver_test.go index 74a8a497d..036798727 100644 --- a/mailserver/mailserver_test.go +++ b/mailserver/mailserver_test.go @@ -330,31 +330,26 @@ func (s *MailserverSuite) TestRequestPaginationLimit() { s.Nil(cursor) s.Equal(params.limit, limit) - envelopes, _, cursor, err := s.server.processRequest(nil, lower, upper, bloom, limit, nil, false) - s.NoError(err) - for _, env := range envelopes { - receivedHashes = append(receivedHashes, env.Hash()) - } + receivedHashes, cursor, _ = processRequestAndCollectHashes( + s.server, lower, upper, cursor, bloom, int(limit), + ) // 10 envelopes sent s.Equal(count, uint32(len(sentEnvelopes))) // 6 envelopes received - s.Equal(limit, uint32(len(receivedHashes))) + s.Equal(int(limit), len(receivedHashes)) // the 6 envelopes received should be in descending order s.Equal(reverseSentHashes[:limit], receivedHashes) // cursor should be the key of the last envelope of the last page s.Equal(archiveKeys[count-limit], fmt.Sprintf("%x", cursor)) // second page - receivedHashes = []common.Hash{} - envelopes, _, cursor, err = s.server.processRequest(nil, lower, upper, bloom, limit, cursor, false) - s.NoError(err) - for _, env := range envelopes { - receivedHashes = append(receivedHashes, env.Hash()) - } + receivedHashes, cursor, _ = processRequestAndCollectHashes( + s.server, lower, upper, cursor, bloom, int(limit), + ) // 4 envelopes received - s.Equal(count-limit, uint32(len(receivedHashes))) + s.Equal(int(count-limit), len(receivedHashes)) // cursor is nil because there are no other pages s.Nil(cursor) } @@ -476,16 +471,15 @@ func (s *MailserverSuite) TestDecodeRequest() { } func (s *MailserverSuite) messageExists(envelope *whisper.Envelope, low, upp uint32, bloom []byte, limit uint32) bool { - var exist bool - mail, _, _, err := s.server.processRequest(nil, low, upp, bloom, limit, nil, false) - s.NoError(err) - for _, msg := range mail { - if msg.Hash() == envelope.Hash() { - exist = true - break + receivedHashes, _, _ := processRequestAndCollectHashes( + s.server, low, upp, nil, bloom, int(limit), + ) + for _, hash := range receivedHashes { + if hash == envelope.Hash() { + return true } } - return exist + return false } func (s *MailserverSuite) TestBloomFromReceivedMessage() { @@ -640,6 +634,32 @@ func generateEnvelope(sentTime time.Time) (*whisper.Envelope, error) { return generateEnvelopeWithKeys(sentTime, h[:], nil) } +func processRequestAndCollectHashes( + server *WMailServer, lower, upper uint32, cursor cursorType, bloom []byte, limit int, +) ([]common.Hash, cursorType, common.Hash) { + iter := server.createIterator(lower, upper, cursor) + defer iter.Release() + bundles := make(chan []*whisper.Envelope, 10) + done := make(chan struct{}) + + var hashes []common.Hash + go func() { + for bundle := range bundles { + for _, env := range bundle { + hashes = append(hashes, env.Hash()) + } + } + close(done) + }() + + cursor, lastHash := server.processRequestInBundles(iter, bloom, limit, bundles) + close(bundles) + + <-done + + return hashes, cursor, lastHash +} + // mockPeerWithID is a struct that conforms to peerWithID interface. type mockPeerWithID struct { id []byte diff --git a/t/e2e/whisper/whisper_mailbox_test.go b/t/e2e/whisper/whisper_mailbox_test.go index f85ff8c28..c918085d3 100644 --- a/t/e2e/whisper/whisper_mailbox_test.go +++ b/t/e2e/whisper/whisper_mailbox_test.go @@ -19,6 +19,8 @@ import ( "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/p2p" "github.com/status-im/status-go/api" + "github.com/status-im/status-go/mailserver" + "github.com/status-im/status-go/params" "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/t/helpers" . "github.com/status-im/status-go/t/utils" @@ -39,7 +41,7 @@ func TestWhisperMailboxTestSuite(t *testing.T) { func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { var err error // Start mailbox and status node. - mailboxBackend, stop := s.startMailboxBackend() + mailboxBackend, stop := s.startMailboxBackend("") defer stop() s.Require().True(mailboxBackend.IsNodeRunning()) mailboxNode := mailboxBackend.StatusNode().GethNode() @@ -150,7 +152,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { var err error // Start mailbox, alice, bob, charlie node. - mailboxBackend, stop := s.startMailboxBackend() + mailboxBackend, stop := s.startMailboxBackend("") defer stop() aliceBackend, stop := s.startBackend("alice") @@ -333,7 +335,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() { // Start mailbox - mailbox, stop := s.startMailboxBackend() + mailbox, stop := s.startMailboxBackend("") defer stop() s.Require().True(mailbox.IsNodeRunning()) mailboxEnode := mailbox.StatusNode().GethNode().Server().NodeInfo().Enode @@ -456,6 +458,133 @@ func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() { s.Equal(sentEnvelopesHashes, allReceivedHashes) } +// TestSyncBetweenTwoMailServers tests syncing state between two mail servers +// using `shhext_requestMessages`. +func (s *WhisperMailboxSuite) TestSyncBetweenTwoMailServers() { + var ( + chatName = "some-public-chat" + ) + + // Start mailbox but with mail server disabled. + // MailServer will be registered below manually. + mailbox, stop := s.startMailboxBackendWithCallback("", func(c *params.NodeConfig) { + c.WhisperConfig.EnableMailServer = false + }) + defer stop() + s.Require().True(mailbox.IsNodeRunning()) + mailboxEnode := mailbox.StatusNode().GethNode().Server().NodeInfo().Enode + + // Whisper services + mailboxWhisperService, err := mailbox.StatusNode().WhisperService() + s.Require().NoError(err) + + // symmetric key for public chats + symKeyID, err := mailboxWhisperService.AddSymKeyFromPassword(chatName) + s.Require().NoError(err) + publicChatSymKey, err := mailboxWhisperService.GetSymKey(symKeyID) + s.Require().NoError(err) + + var mailServer mailserver.WMailServer + err = mailServer.Init(mailboxWhisperService, &mailbox.StatusNode().Config().WhisperConfig) + s.Require().NoError(err) + mailboxWhisperService.RegisterServer(&mailServer) + + // envelopes to archive + envelopesCount := 5 + topic := s.createPublicChatTopic(chatName) + for i := 0; i < envelopesCount; i++ { + params := whisper.MessageParams{ + Topic: topic, + WorkTime: 10, + PoW: 2.0, + Payload: []byte("some-payload"), + KeySym: publicChatSymKey, + } + sentMessage, err := whisper.NewSentMessage(¶ms) + s.Require().NoError(err) + envelope, err := sentMessage.Wrap(¶ms, time.Now()) + s.Require().NoError(err) + + mailServer.Archive(envelope) + } + + // Start a second mail server which would like to sync the state. + emptyMailbox, stopEmptyMailbox := s.startMailboxBackend("empty") + defer stopEmptyMailbox() + s.Require().True(emptyMailbox.IsNodeRunning()) + + emptyMailboxEnode := emptyMailbox.StatusNode().Server().Self().String() + + errCh := helpers.WaitForPeerAsync(emptyMailbox.StatusNode().Server(), mailboxEnode, p2p.PeerEventTypeAdd, 5*time.Second) + s.Require().NoError(emptyMailbox.StatusNode().AddPeer(mailboxEnode)) + s.Require().NoError(<-errCh) + + emptyMailboxWhisperService, err := emptyMailbox.StatusNode().WhisperService() + s.Require().NoError(err) + + err = emptyMailboxWhisperService.AllowP2PMessagesFromPeer( + mailbox.StatusNode().Server().Self().ID().Bytes(), + ) + s.Require().NoError(err) + + err = emptyMailboxWhisperService.SyncMessages( + mailbox.StatusNode().Server().Self().ID().Bytes(), + whisper.SyncMailRequest{ + Lower: 0, + Upper: uint32(time.Now().Unix()), + Bloom: whisper.MakeFullNodeBloom(), + }, + ) + s.Require().NoError(err) + + // create and start a client + client, stop := s.startBackend("client") + defer stop() + s.Require().True(client.IsNodeRunning()) + clientWhisperService, err := client.StatusNode().WhisperService() + s.Require().NoError(err) + clientRPCClient := client.StatusNode().RPCPrivateClient() + + // create filter for messages + messagesKeyID, err := clientWhisperService.AddSymKeyFromPassword(chatName) + s.Require().NoError(err) + messageFilterID := s.createGroupChatMessageFilter(clientRPCClient, messagesKeyID, topic.String()) + + // add mailbox password + mailServerKeyID, err := clientWhisperService.AddSymKeyFromPassword(mailboxPassword) + s.Require().NoError(err) + + // add the second mail server as a peer + errCh = helpers.WaitForPeerAsync( + client.StatusNode().Server(), + emptyMailboxEnode, + p2p.PeerEventTypeAdd, + 5*time.Second) + s.Require().NoError(client.StatusNode().AddPeer(emptyMailboxEnode)) + s.Require().NoError(<-errCh) + + // request for messages + mailboxResponseWatcher := make(chan whisper.EnvelopeEvent, 1024) + sub := clientWhisperService.SubscribeEnvelopeEvents(mailboxResponseWatcher) + defer sub.Unsubscribe() + + requestID := s.requestHistoricMessagesFromLast12Hours( + clientWhisperService, + clientRPCClient, + emptyMailboxEnode, + mailServerKeyID, + topic.String(), + 0, + "", + ) + response := s.waitForMailServerResponse(mailboxResponseWatcher, requestID) + s.Require().NotEqual(common.Hash{}.Bytes(), response.LastEnvelopeHash.Bytes()) + + // get messages + messages := s.getMessagesByMessageFilterID(clientRPCClient, messageFilterID) + s.Require().NotEmpty(messages) +} + func (s *WhisperMailboxSuite) waitForEnvelopeEvents(events chan whisper.EnvelopeEvent, hashes []string, event whisper.EventType) { check := make(map[string]struct{}) for _, hash := range hashes { @@ -558,11 +687,23 @@ func (s *WhisperMailboxSuite) startBackend(name string) (*api.StatusBackend, fun } // Start mailbox node. -func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func()) { - mailboxBackend := api.NewStatusBackend() +func (s *WhisperMailboxSuite) startMailboxBackend(name string) (*api.StatusBackend, func()) { + return s.startMailboxBackendWithCallback(name, nil) +} + +func (s *WhisperMailboxSuite) startMailboxBackendWithCallback( + name string, + callback func(*params.NodeConfig), +) (*api.StatusBackend, func()) { + if name == "" { + name = "mailserver" + } + mailboxConfig, err := MakeTestNodeConfig(GetNetworkID()) s.Require().NoError(err) - datadir := filepath.Join(RootDir, ".ethereumtest/mailbox/mailserver") + + mailboxBackend := api.NewStatusBackend() + datadir := filepath.Join(RootDir, ".ethereumtest/mailbox", name) mailboxConfig.LightEthConfig.Enabled = false mailboxConfig.WhisperConfig.Enabled = true @@ -572,6 +713,10 @@ func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func()) mailboxConfig.WhisperConfig.DataDir = filepath.Join(datadir, "data") mailboxConfig.DataDir = datadir + if callback != nil { + callback(mailboxConfig) + } + s.Require().False(mailboxBackend.IsNodeRunning()) s.Require().NoError(mailboxBackend.StartNode(mailboxConfig)) s.Require().True(mailboxBackend.IsNodeRunning()) @@ -579,7 +724,7 @@ func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func()) s.True(mailboxBackend.IsNodeRunning()) s.NoError(mailboxBackend.StopNode()) s.False(mailboxBackend.IsNodeRunning()) - err = os.RemoveAll(datadir) + err := os.RemoveAll(datadir) s.Require().NoError(err) } } @@ -608,7 +753,7 @@ func (s *WhisperMailboxSuite) createGroupChatMessageFilter(rpcCli *rpc.Client, s resp := rpcCli.CallRaw(`{ "jsonrpc": "2.0", "method": "shh_newMessageFilter", "params": [ - {"symKeyID": "` + symkeyID + `", "topics": [ "` + topic + `"], "allowP2P":true} + {"symKeyID": "` + symkeyID + `", "topics": ["` + topic + `"], "allowP2P": true} ], "id": 1 }`) @@ -745,17 +890,20 @@ func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli return common.Hash{} } +func (s *WhisperMailboxSuite) createPublicChatTopic(name string) whisper.TopicType { + h := sha3.NewKeccak256() + _, err := h.Write([]byte(name)) + if err != nil { + s.Fail("error generating topic", "failed gerating topic from chat name, %+v", err) + } + return whisper.BytesToTopic(h.Sum(nil)) +} + func (s *WhisperMailboxSuite) joinPublicChat(w *whisper.Whisper, rpcClient *rpc.Client, name string) (string, whisper.TopicType, string) { keyID, err := w.AddSymKeyFromPassword(name) s.Require().NoError(err) - h := sha3.NewKeccak256() - _, err = h.Write([]byte(name)) - if err != nil { - s.Fail("error generating topic", "failed gerating topic from chat name, %+v", err) - } - fullTopic := h.Sum(nil) - topic := whisper.BytesToTopic(fullTopic) + topic := s.createPublicChatTopic(name) filterID := s.createGroupChatMessageFilter(rpcClient, keyID, topic.String()) diff --git a/vendor/github.com/status-im/whisper/whisperv6/doc.go b/vendor/github.com/status-im/whisper/whisperv6/doc.go index c8a9c508e..f911d264f 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/doc.go +++ b/vendor/github.com/status-im/whisper/whisperv6/doc.go @@ -48,6 +48,8 @@ const ( powRequirementCode = 2 // PoW requirement bloomFilterExCode = 3 // bloom filter exchange batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received + p2pSyncRequestCode = 123 // used to sync envelopes between two mail servers + p2pSyncResponseCode = 124 // used to sync envelopes between two mail servers p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) @@ -89,4 +91,36 @@ const ( type MailServer interface { Archive(env *Envelope) DeliverMail(whisperPeer *Peer, request *Envelope) + SyncMail(*Peer, SyncMailRequest) error +} + +// SyncMailRequest contains details which envelopes should be synced +// between Mail Servers. +type SyncMailRequest struct { + // Lower is a lower bound of time range for which messages are requested. + Lower uint32 + // Upper is a lower bound of time range for which messages are requested. + Upper uint32 + // Bloom is a bloom filter to filter envelopes. + Bloom []byte + // Limit is the max number of envelopes to return. + Limit uint32 + // Cursor is used for pagination of the results. + Cursor []byte +} + +// SyncResponse is a struct representing a response sent to the peer +// asking for syncing archived envelopes. +type SyncResponse struct { + Envelopes []*Envelope + Cursor []byte + Final bool // if true it means all envelopes were processed + Error string +} + +// IsFinal returns true if it's the final response for the request. +// It might be a successful final response (r.Final being true) +// or an error occured (r.Error being not empty). +func (r SyncResponse) IsFinal() bool { + return r.Final || r.Error != "" } diff --git a/vendor/github.com/status-im/whisper/whisperv6/mailserver_response.go b/vendor/github.com/status-im/whisper/whisperv6/mailserver_response.go index 5ec072105..e7710a215 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/mailserver_response.go +++ b/vendor/github.com/status-im/whisper/whisperv6/mailserver_response.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p/enode" ) const ( @@ -40,22 +41,22 @@ func CreateMailServerRequestFailedPayload(requestID common.Hash, err error) []by // * request completed successfully // * request failed // If the payload is unknown/unparseable, it returns `nil` -func CreateMailServerEvent(payload []byte) (*EnvelopeEvent, error) { +func CreateMailServerEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, error) { if len(payload) < common.HashLength { return nil, invalidResponseSizeError(len(payload)) } - event, err := tryCreateMailServerRequestFailedEvent(payload) + event, err := tryCreateMailServerRequestFailedEvent(nodeID, payload) if err != nil || event != nil { return event, err } - return tryCreateMailServerRequestCompletedEvent(payload) + return tryCreateMailServerRequestCompletedEvent(nodeID, payload) } -func tryCreateMailServerRequestFailedEvent(payload []byte) (*EnvelopeEvent, error) { +func tryCreateMailServerRequestFailedEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, error) { if len(payload) < common.HashLength+len(mailServerFailedPayloadPrefix) { return nil, nil } @@ -75,6 +76,7 @@ func tryCreateMailServerRequestFailedEvent(payload []byte) (*EnvelopeEvent, erro errorMsg = string(remainder) event := EnvelopeEvent{ + Peer: nodeID, Hash: requestID, Event: EventMailServerRequestCompleted, Data: &MailServerResponse{ @@ -86,7 +88,7 @@ func tryCreateMailServerRequestFailedEvent(payload []byte) (*EnvelopeEvent, erro } -func tryCreateMailServerRequestCompletedEvent(payload []byte) (*EnvelopeEvent, error) { +func tryCreateMailServerRequestCompletedEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, error) { // check if payload is // - requestID or // - requestID + lastEnvelopeHash or @@ -115,6 +117,7 @@ func tryCreateMailServerRequestCompletedEvent(payload []byte) (*EnvelopeEvent, e } event := EnvelopeEvent{ + Peer: nodeID, Hash: requestID, Event: EventMailServerRequestCompleted, Data: &MailServerResponse{ diff --git a/vendor/github.com/status-im/whisper/whisperv6/whisper.go b/vendor/github.com/status-im/whisper/whisperv6/whisper.go index c8bed7564..2b92d588e 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/whisper.go +++ b/vendor/github.com/status-im/whisper/whisperv6/whisper.go @@ -452,6 +452,25 @@ func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) return peer.ws.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r}) } +// SyncMessages can be sent between two Mail Servers and syncs envelopes between them. +func (whisper *Whisper) SyncMessages(peerID []byte, req SyncMailRequest) error { + if whisper.mailServer == nil { + return errors.New("can not sync messages if Mail Server is not configured") + } + + p, err := whisper.getPeer(peerID) + if err != nil { + return err + } + + return p2p.Send(p.ws, p2pSyncRequestCode, req) +} + +// SendSyncResponse sends a response to a Mail Server with a slice of envelopes. +func (whisper *Whisper) SendSyncResponse(p *Peer, data SyncResponse) error { + return p2p.Send(p.ws, p2pSyncResponseCode, data) +} + // SendP2PMessage sends a peer-to-peer message to a specific peer. func (whisper *Whisper) SendP2PMessage(peerID []byte, envelopes ...*Envelope) error { p, err := whisper.getPeer(peerID) @@ -952,6 +971,46 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { return fmt.Errorf("invalid direct message: %v", err) } } + case p2pSyncRequestCode: + // TODO(adam): should we limit who can send this request? + if whisper.mailServer != nil { + var request SyncMailRequest + if err = packet.Decode(&request); err != nil { + return fmt.Errorf("failed to decode p2pSyncRequestCode payload: %v", err) + } + + if err := whisper.mailServer.SyncMail(p, request); err != nil { + log.Error("failed to sync envelopes", "peer", p.peer.ID().String()) + } + } else { + log.Debug("requested to sync messages but mail servers is not registered", "peer", p.peer.ID().String()) + } + case p2pSyncResponseCode: + // TODO(adam): currently, there is no feedback when a sync response + // is received. An idea to fix this: + // 1. Sending a request contains an ID, + // 2. Each sync reponse contains this ID, + // 3. There is a way to call whisper.SyncMessages() and wait for the response.Final to be received for that particular request ID. + // 4. If Cursor is not empty, another p2pSyncRequestCode should be sent. + if p.trusted && whisper.mailServer != nil { + var resp SyncResponse + if err = packet.Decode(&resp); err != nil { + return fmt.Errorf("failed to decode p2pSyncResponseCode payload: %v", err) + } + + log.Info("received sync response", "count", len(resp.Envelopes), "final", resp.Final, "err", resp.Error) + + for _, envelope := range resp.Envelopes { + whisper.mailServer.Archive(envelope) + } + + if resp.Error != "" { + log.Error("failed to sync envelopes", "err", resp.Error) + } + if resp.Final { + log.Info("finished to sync envelopes successfully") + } + } case p2pRequestCode: // Must be processed if mail server is implemented. Otherwise ignore. if whisper.mailServer != nil { @@ -971,7 +1030,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { return errors.New("invalid request response message") } - event, err := CreateMailServerEvent(payload) + event, err := CreateMailServerEvent(p.peer.ID(), payload) if err != nil { log.Warn("error while parsing request complete code, peer will be disconnected", "peer", p.peer.ID(), "err", err)