diff --git a/t/e2e/whisper/whisper_mailbox_test.go b/t/e2e/whisper/whisper_mailbox_test.go index d22e7907b..d5cf5a0f3 100644 --- a/t/e2e/whisper/whisper_mailbox_test.go +++ b/t/e2e/whisper/whisper_mailbox_test.go @@ -53,10 +53,7 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { s.Require().NotEqual(mailboxEnode, node.Server().NodeInfo().Enode) - err = sender.StatusNode().AddPeer(mailboxEnode) - s.Require().NoError(err) - // Wait async processes on adding peer. - time.Sleep(500 * time.Millisecond) + s.addPeerSync(sender.StatusNode(), mailboxBackend.StatusNode()) senderWhisperService, err := sender.StatusNode().WhisperService() s.Require().NoError(err) @@ -80,11 +77,21 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { mailboxWhisperService, err := mailboxBackend.StatusNode().WhisperService() s.Require().NoError(err) s.Require().NotNil(mailboxWhisperService) - mailboxTracer := newTracer() - mailboxWhisperService.RegisterEnvelopeTracer(mailboxTracer) - tracer := newTracer() - senderWhisperService.RegisterEnvelopeTracer(tracer) + // watch envelopes to be archived on mailserver + envelopeArchivedWatcher := make(chan whisper.EnvelopeEvent, 1024) + sub := mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher) + defer sub.Unsubscribe() + + // watch envelopes to be available for filters in the client + envelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024) + sub = senderWhisperService.SubscribeEnvelopeEvents(envelopeAvailableWatcher) + defer sub.Unsubscribe() + + // watch mailserver responses in the client + mailServerResponseWatcher := make(chan whisper.EnvelopeEvent, 1024) + sub = senderWhisperService.SubscribeEnvelopeEvents(mailServerResponseWatcher) + defer sub.Unsubscribe() // Create topic. topic := whisper.BytesToTopic([]byte("topic name")) @@ -107,32 +114,33 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { messageHash := s.postMessageToPrivate(rpcClient, pubkey.String(), topic.String(), hexutil.Encode([]byte("Hello world!"))) // Get message to make sure that it will come from the mailbox later. - messages = s.getMessagesByMessageFilterIDWithTracer(rpcClient, messageFilterID, mailboxTracer, messageHash) + s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{messageHash}, whisper.EventEnvelopeAvailable) + messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) s.Require().Equal(1, len(messages)) // Act. - events := make(chan whisper.EnvelopeEvent) - senderWhisperService.SubscribeEnvelopeEvents(events) + // wait for mailserver to archive all the envelopes + s.waitForEnvelopeEvents(envelopeArchivedWatcher, []string{messageHash}, whisper.EventMailServerEnvelopeArchived) // Request messages (including the previous one, expired) from mailbox. requestID := s.requestHistoricMessagesFromLast12Hours(senderWhisperService, rpcClient, mailboxPeerStr, MailServerKeyID, topic.String(), 0, "") + // wait for mail server response + resp := s.waitForMailServerResponse(mailServerResponseWatcher, requestID) + s.Equal(messageHash, resp.LastEnvelopeHash.String()) + s.Empty(resp.Cursor) + + // wait for last envelope sent by the mailserver to be available for filters + s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{resp.LastEnvelopeHash.String()}, whisper.EventEnvelopeAvailable) + // And we receive message, it comes from mailbox. - messages = s.getMessagesByMessageFilterIDWithTracer(rpcClient, messageFilterID, tracer, messageHash) + messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) s.Require().Equal(1, len(messages)) // Check that there are no messages. messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) s.Require().Empty(messages) - - select { - case e := <-events: - s.Equal(whisper.EventMailServerRequestCompleted, e.Event) - s.Equal(requestID, e.Hash) - case <-time.After(time.Second): - s.Fail("timed out while waiting for request completed event") - } } func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { @@ -156,16 +164,13 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { mailboxNode := mailboxBackend.StatusNode().GethNode() mailboxEnode := mailboxNode.Server().NodeInfo().Enode - err = aliceBackend.StatusNode().AddPeer(mailboxEnode) - s.Require().NoError(err) - err = bobBackend.StatusNode().AddPeer(mailboxEnode) - s.Require().NoError(err) - err = charlieBackend.StatusNode().AddPeer(mailboxEnode) - s.Require().NoError(err) - // Wait async processes on adding peer. - time.Sleep(500 * time.Millisecond) + s.addPeerSync(aliceBackend.StatusNode(), mailboxBackend.StatusNode()) + s.addPeerSync(bobBackend.StatusNode(), mailboxBackend.StatusNode()) + s.addPeerSync(charlieBackend.StatusNode(), mailboxBackend.StatusNode()) // Get whisper service. + mailboxWhisperService, err := mailboxBackend.StatusNode().WhisperService() + s.Require().NoError(err) aliceWhisperService, err := aliceBackend.StatusNode().WhisperService() s.Require().NoError(err) bobWhisperService, err := bobBackend.StatusNode().WhisperService() @@ -177,12 +182,18 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { bobRPCClient := bobBackend.StatusNode().RPCClient() charlieRPCClient := charlieBackend.StatusNode().RPCClient() - aliceTracer := newTracer() - aliceWhisperService.RegisterEnvelopeTracer(aliceTracer) - bobTracer := newTracer() - bobWhisperService.RegisterEnvelopeTracer(bobTracer) - charlieTracer := newTracer() - charlieWhisperService.RegisterEnvelopeTracer(charlieTracer) + // watchers + envelopeArchivedWatcher := make(chan whisper.EnvelopeEvent, 1024) + sub := mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher) + defer sub.Unsubscribe() + + bobEnvelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024) + sub = bobWhisperService.SubscribeEnvelopeEvents(bobEnvelopeAvailableWatcher) + defer sub.Unsubscribe() + + charlieEnvelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024) + sub = charlieWhisperService.SubscribeEnvelopeEvents(charlieEnvelopeAvailableWatcher) + defer sub.Unsubscribe() // Bob and charlie add the mailserver key. password := mailboxPassword @@ -235,7 +246,8 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { // Bob receive group chat data and add it to his node. // Bob get group chat details. - messages := s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobMessageFilterID, bobTracer, aliceToBobMessageHash) + s.waitForEnvelopeEvents(bobEnvelopeAvailableWatcher, []string{aliceToBobMessageHash}, whisper.EventEnvelopeAvailable) + messages := s.getMessagesByMessageFilterID(bobRPCClient, bobMessageFilterID) s.Require().Equal(1, len(messages)) bobGroupChatData := groupChatParams{} err = bobGroupChatData.Decode(messages[0]["payload"].(string)) @@ -251,7 +263,8 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { // Charlie receive group chat data and add it to his node. // Charlie get group chat details. - messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieMessageFilterID, charlieTracer, aliceToCharlieMessageHash) + s.waitForEnvelopeEvents(charlieEnvelopeAvailableWatcher, []string{aliceToCharlieMessageHash}, whisper.EventEnvelopeAvailable) + messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieMessageFilterID) s.Require().Equal(1, len(messages)) charlieGroupChatData := groupChatParams{} err = charlieGroupChatData.Decode(messages[0]["payload"].(string)) @@ -270,12 +283,14 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { groupChatMessageHash := s.postMessageToGroup(aliceRPCClient, groupChatKeyID, groupChatTopic.String(), helloWorldMessage) // Bob receive group chat message. - messages = s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobGroupChatMessageFilterID, bobTracer, groupChatMessageHash) + s.waitForEnvelopeEvents(bobEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable) + messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID) s.Require().Equal(1, len(messages)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) // Charlie receive group chat message. - messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieGroupChatMessageFilterID, charlieTracer, groupChatMessageHash) + s.waitForEnvelopeEvents(charlieEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable) + messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID) s.Require().Equal(1, len(messages)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) @@ -285,17 +300,22 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID) s.Require().Empty(messages) + // be sure that message has been archived + s.waitForEnvelopeEvents(envelopeArchivedWatcher, []string{groupChatMessageHash}, whisper.EventMailServerEnvelopeArchived) + // Request each one messages from mailbox using enode. s.requestHistoricMessagesFromLast12Hours(bobWhisperService, bobRPCClient, mailboxEnode, bobMailServerKeyID, groupChatTopic.String(), 0, "") s.requestHistoricMessagesFromLast12Hours(charlieWhisperService, charlieRPCClient, mailboxEnode, charlieMailServerKeyID, groupChatTopic.String(), 0, "") // Bob receive p2p message from group chat filter. - messages = s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobGroupChatMessageFilterID, bobTracer, groupChatMessageHash) + s.waitForEnvelopeEvents(bobEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable) + messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID) s.Require().Equal(1, len(messages)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) // Charlie receive p2p message from group chat filter. - messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieGroupChatMessageFilterID, charlieTracer, groupChatMessageHash) + s.waitForEnvelopeEvents(charlieEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable) + messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID) s.Require().Equal(1, len(messages)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) } @@ -314,7 +334,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() { clientRPCClient := client.StatusNode().RPCClient() // Add mailbox to clients's peers - s.addPeerAndWait(client.StatusNode(), mailbox.StatusNode()) + s.addPeerSync(client.StatusNode(), mailbox.StatusNode()) // Whisper services mailboxWhisperService, err := mailbox.StatusNode().WhisperService() @@ -340,15 +360,18 @@ func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() { // watch envelopes to be archived on mailserver envelopeArchivedWatcher := make(chan whisper.EnvelopeEvent, 1024) - mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher) + sub := mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher) + defer sub.Unsubscribe() // watch envelopes to be available for filters in the client envelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024) - clientWhisperService.SubscribeEnvelopeEvents(envelopeAvailableWatcher) + sub = clientWhisperService.SubscribeEnvelopeEvents(envelopeAvailableWatcher) + defer sub.Unsubscribe() // watch mailserver responses in the client mailServerResponseWatcher := make(chan whisper.EnvelopeEvent, 1024) - clientWhisperService.SubscribeEnvelopeEvents(mailServerResponseWatcher) + sub = clientWhisperService.SubscribeEnvelopeEvents(mailServerResponseWatcher) + defer sub.Unsubscribe() // send envelopes for i := 0; i < envelopesCount; i++ { @@ -463,7 +486,7 @@ func (s *WhisperMailboxSuite) waitForMailServerResponse(events chan whisper.Enve } } -func (s *WhisperMailboxSuite) addPeerAndWait(node, other *node.StatusNode) { +func (s *WhisperMailboxSuite) addPeerSync(node, other *node.StatusNode) { nodeInfo := node.GethNode().Server().NodeInfo() nodeID := nodeInfo.ID nodeEnode := nodeInfo.Enode @@ -612,13 +635,13 @@ func (s *WhisperMailboxSuite) createGroupChatMessageFilter(rpcCli *rpc.Client, s return messageFilterID } -func (s *WhisperMailboxSuite) postMessageToPrivate(rpcCli *rpc.Client, bobPubkey string, topic string, payload string) string { +func (s *WhisperMailboxSuite) postMessageToPrivate(rpcCli *rpc.Client, recipientPubkey string, topic string, payload string) string { resp := rpcCli.CallRaw(`{ "jsonrpc": "2.0", "method": "shh_post", "params": [ { - "pubKey": "` + bobPubkey + `", + "pubKey": "` + recipientPubkey + `", "topic": "` + topic + `", "payload": "` + payload + `", "powTarget": 0.001, @@ -679,30 +702,6 @@ func (s *WhisperMailboxSuite) getMessagesByMessageFilterID(rpcCli *rpc.Client, m return messages.Result } -func (s *WhisperMailboxSuite) getMessagesByMessageFilterIDWithTracer(rpcCli *rpc.Client, messageFilterID string, tracer *envelopeTracer, messageHash string) (messages []map[string]interface{}) { - select { - case envelope := <-tracer.envelopChan: - s.Require().Equal(envelope.Hash, messageHash) - case <-time.After(5 * time.Second): - s.Fail("Timed out waiting for new messages after 5 seconds") - } - - // Attempt to retrieve messages up to 3 times, 1 second apart - // TODO: There is a lag between the time when the envelope is traced by EventTracer - // and when it is decoded and actually available. Ideally this would be event-driven as well - // I.e. instead of signing up for EnvelopeTracer, we'd sign up for an event happening later - // which tells us that a call to shh_getFilterMessages will return some messages - for i := 0; i < 3; i++ { - messages = s.getMessagesByMessageFilterID(rpcCli, messageFilterID) - if len(messages) > 0 { - break - } - time.Sleep(1 * time.Second) - } - - return -} - // addSymKey added symkey to node and return symkeyID. func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) string { resp := rpcCli.CallRaw(`{"jsonrpc":"2.0","method":"shh_addSymKey", @@ -789,19 +788,3 @@ type baseRPCResponse struct { Result interface{} Error interface{} } - -// envelopeTracer traces incoming envelopes. We leverage it to know when a peer has received an envelope -// so we rely less on timeouts for the tests -type envelopeTracer struct { - envelopChan chan *whisper.EnvelopeMeta -} - -func newTracer() *envelopeTracer { - return &envelopeTracer{make(chan *whisper.EnvelopeMeta, 1)} -} - -// Trace is called for every incoming envelope. -func (t *envelopeTracer) Trace(envelope *whisper.EnvelopeMeta) { - // Do not block notifier - go func() { t.envelopChan <- envelope }() -}