diff --git a/.travis.yml b/.travis.yml index dd4f3322d..ba2ef987c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,7 +28,7 @@ jobs: # ACCOUNT_PASSWORD environment variable anyway. if: ((type != push) OR (branch = "develop")) AND (fork = false) script: - # Sync the chain first. It will time out after 30 minutes. Used network: Rinkeby. + # Sync the chain first. It will time out after 45 minutes. Used network: Rinkeby. - make statusgo - ./build/bin/statusd -datadir=.ethereumtest/Rinkeby -les -networkid=4 -sync-and-exit=45 -log=WARN -standalone=false -discovery=false -ntp=false - make test-e2e networkid=4 diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 97db56b07..c2df3c562 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -217,7 +217,7 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl var err error var zero common.Hash kl := NewDbKey(lower, zero) - ku := NewDbKey(upper, zero) + ku := NewDbKey(upper+1, zero) // LevelDB is exclusive, while the Whisper API is inclusive i := s.db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil) defer i.Release() diff --git a/t/e2e/whisper/whisper_mailbox_test.go b/t/e2e/whisper/whisper_mailbox_test.go index 42fbcd64f..0b4fb20ab 100644 --- a/t/e2e/whisper/whisper_mailbox_test.go +++ b/t/e2e/whisper/whisper_mailbox_test.go @@ -51,7 +51,7 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { err = sender.StatusNode().AddPeer(mailboxEnode) s.Require().NoError(err) // Wait async processes on adding peer. - time.Sleep(time.Second) + time.Sleep(500 * time.Millisecond) senderWhisperService, err := sender.StatusNode().WhisperService() s.Require().NoError(err) @@ -72,6 +72,14 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { rpcClient := sender.StatusNode().RPCClient() s.Require().NotNil(rpcClient) + mailboxWhisperService, _ := mailboxBackend.StatusNode().WhisperService() + s.Require().NotNil(mailboxWhisperService) + mailboxTracer := newTracer() + mailboxWhisperService.RegisterEnvelopeTracer(mailboxTracer) + + tracer := newTracer() + senderWhisperService.RegisterEnvelopeTracer(tracer) + // Create topic. topic := whisper.BytesToTopic([]byte("topic name")) @@ -87,14 +95,13 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { // There are no messages at filter. messages := s.getMessagesByMessageFilterID(rpcClient, messageFilterID) - s.Require().Equal(0, len(messages)) + s.Require().Empty(messages) // Post message matching with filter (key and topic). s.postMessageToPrivate(rpcClient, pubkey.String(), topic.String(), hexutil.Encode([]byte("Hello world!"))) // Get message to make sure that it will come from the mailbox later. - time.Sleep(1 * time.Second) - messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) + messages = s.getMessagesByMessageFilterIDWithTracer(rpcClient, messageFilterID, mailboxTracer) s.Require().Equal(1, len(messages)) // Act. @@ -106,15 +113,13 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { result := s.requestHistoricMessages(senderWhisperService, rpcClient, mailboxPeerStr, MailServerKeyID, topic.String()) requestID := common.BytesToHash(result) - // Wait to receive message. - time.Sleep(time.Second) // And we receive message, it comes from mailbox. - messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) + messages = s.getMessagesByMessageFilterIDWithTracer(rpcClient, messageFilterID, tracer) s.Require().Equal(1, len(messages)) // Check that there are no messages. messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) - s.Require().Equal(0, len(messages)) + s.Require().Empty(messages) select { case e := <-events: @@ -153,7 +158,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { err = charlieBackend.StatusNode().AddPeer(mailboxEnode) s.Require().NoError(err) // Wait async processes on adding peer. - time.Sleep(time.Second) + time.Sleep(500 * time.Millisecond) // Get whisper service. aliceWhisperService, err := aliceBackend.StatusNode().WhisperService() @@ -167,6 +172,13 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { bobRPCClient := bobBackend.StatusNode().RPCClient() charlieRPCClient := charlieBackend.StatusNode().RPCClient() + aliceTracer := newTracer() + aliceWhisperService.RegisterEnvelopeTracer(aliceTracer) + bobTracer := newTracer() + bobWhisperService.RegisterEnvelopeTracer(bobTracer) + charlieTracer := newTracer() + charlieWhisperService.RegisterEnvelopeTracer(charlieTracer) + // Bob and charlie add the mailserver key. password := mailboxPassword bobMailServerKeyID, err := bobWhisperService.AddSymKeyFromPassword(password) @@ -216,12 +228,9 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { s.postMessageToPrivate(aliceRPCClient, bobPubkey.String(), bobAliceKeySendTopic.String(), payloadStr) s.postMessageToPrivate(aliceRPCClient, charliePubkey.String(), charlieAliceKeySendTopic.String(), payloadStr) - // Wait to receive. - time.Sleep(5 * time.Second) - // Bob receive group chat data and add it to his node. // Bob get group chat details. - messages := s.getMessagesByMessageFilterID(bobRPCClient, bobMessageFilterID) + messages := s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobMessageFilterID, bobTracer) s.Require().Equal(1, len(messages)) bobGroupChatData := groupChatParams{} err = bobGroupChatData.Decode(messages[0]["payload"].(string)) @@ -237,7 +246,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { // Charlie receive group chat data and add it to his node. // Charlie get group chat details. - messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieMessageFilterID) + messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieMessageFilterID, charlieTracer) s.Require().Equal(1, len(messages)) charlieGroupChatData := groupChatParams{} err = charlieGroupChatData.Decode(messages[0]["payload"].(string)) @@ -254,38 +263,34 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { // Alice send message to group chat. helloWorldMessage := hexutil.Encode([]byte("Hello world!")) s.postMessageToGroup(aliceRPCClient, groupChatKeyID, groupChatTopic.String(), helloWorldMessage) - // It need to receive envelopes by bob and charlie nodes. - time.Sleep(5 * time.Second) // Bob receive group chat message. - messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID) + messages = s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobGroupChatMessageFilterID, bobTracer) s.Require().Equal(1, len(messages)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) // Charlie receive group chat message. - messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID) + messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieGroupChatMessageFilterID, charlieTracer) s.Require().Equal(1, len(messages)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) // Check that we don't receive messages each one time. messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID) - s.Require().Equal(0, len(messages)) + s.Require().Empty(messages) messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID) - s.Require().Equal(0, len(messages)) + s.Require().Empty(messages) // Request each one messages from mailbox using enode. s.requestHistoricMessages(bobWhisperService, bobRPCClient, mailboxEnode, bobMailServerKeyID, groupChatTopic.String()) s.requestHistoricMessages(charlieWhisperService, charlieRPCClient, mailboxEnode, charlieMailServerKeyID, groupChatTopic.String()) - // Wait to receive p2p messages. - time.Sleep(5 * time.Second) // Bob receive p2p message from group chat filter. - messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID) + messages = s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobGroupChatMessageFilterID, bobTracer) s.Require().Equal(1, len(messages)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) // Charlie receive p2p message from group chat filter. - messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID) + messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieGroupChatMessageFilterID, charlieTracer) s.Require().Equal(1, len(messages)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) } @@ -467,6 +472,29 @@ func (s *WhisperMailboxSuite) getMessagesByMessageFilterID(rpcCli *rpc.Client, m return messages.Result } +func (s *WhisperMailboxSuite) getMessagesByMessageFilterIDWithTracer(rpcCli *rpc.Client, messageFilterID string, tracer *envelopeTracer) (messages []map[string]interface{}) { + select { + case <-tracer.envelopChan: + case <-time.After(5 * time.Second): + s.Fail("Timed out waiting for new messages after 5 seconds") + } + + // Attempt to retrieve messages up to 3 times, 1 second apart + // TODO: There is a lag between the time when the envelope is traced by EventTracer + // and when it is decoded and actually available. Ideally this would be event-driven as well + // I.e. instead of signing up for EnvelopeTracer, we'd sign up for an event happening later + // which tells us that a call to shh_getFilterMessages will return some messages + for i := 0; i < 3; i++ { + messages = s.getMessagesByMessageFilterID(rpcCli, messageFilterID) + if len(messages) > 0 { + break + } + time.Sleep(1 * time.Second) + } + + return +} + // addSymKey added symkey to node and return symkeyID. func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) string { resp := rpcCli.CallRaw(`{"jsonrpc":"2.0","method":"shh_addSymKey", @@ -483,7 +511,9 @@ func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) strin // requestHistoricMessages asks a mailnode to resend messages. func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) []byte { - from := w.GetCurrentTime().Add(-12 * time.Hour) + currentTime := w.GetCurrentTime() + from := currentTime.Add(-12 * time.Hour) + to := currentTime resp := rpcCli.CallRaw(`{ "jsonrpc": "2.0", "id": 2, @@ -493,7 +523,7 @@ func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli "topic":"` + topic + `", "symKeyID":"` + mailServerKeyID + `", "from":` + strconv.FormatInt(from.Unix(), 10) + `, - "to":` + strconv.FormatInt(w.GetCurrentTime().Unix(), 10) + ` + "to":` + strconv.FormatInt(to.Unix(), 10) + ` }] }`) reqMessagesResp := baseRPCResponse{} @@ -527,3 +557,19 @@ type baseRPCResponse struct { Result interface{} Error interface{} } + +// envelopeTracer traces incoming envelopes. We leverage it to know when a peer has received an envelope +// so we rely less on timeouts for the tests +type envelopeTracer struct { + envelopChan chan *whisper.EnvelopeMeta +} + +func newTracer() *envelopeTracer { + return &envelopeTracer{make(chan *whisper.EnvelopeMeta, 1)} +} + +// Trace is called for every incoming envelope. +func (t *envelopeTracer) Trace(envelope *whisper.EnvelopeMeta) { + // Do not block notifier + go func() { t.envelopChan <- envelope }() +}