use envelopeFeed to wait for envelopes to be available in tests (#1079)

* remove sleeps in WhisperMailboxSuite

* use envelopeFeed to wait for envelopes to be available in group chat test

* remove unsed envelopeTracer in whisper e2e tests

* unsubscribe from envelopeFeed

* use s.addPeerSync instead of WaitForPeerAsync
This commit is contained in:
Andrea Franz 2018-07-04 13:07:40 +02:00 committed by GitHub
parent 38a60135b2
commit 6d1bcff446
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 70 additions and 87 deletions

View File

@ -53,10 +53,7 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
s.Require().NotEqual(mailboxEnode, node.Server().NodeInfo().Enode)
err = sender.StatusNode().AddPeer(mailboxEnode)
s.Require().NoError(err)
// Wait async processes on adding peer.
time.Sleep(500 * time.Millisecond)
s.addPeerSync(sender.StatusNode(), mailboxBackend.StatusNode())
senderWhisperService, err := sender.StatusNode().WhisperService()
s.Require().NoError(err)
@ -80,11 +77,21 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
mailboxWhisperService, err := mailboxBackend.StatusNode().WhisperService()
s.Require().NoError(err)
s.Require().NotNil(mailboxWhisperService)
mailboxTracer := newTracer()
mailboxWhisperService.RegisterEnvelopeTracer(mailboxTracer)
tracer := newTracer()
senderWhisperService.RegisterEnvelopeTracer(tracer)
// watch envelopes to be archived on mailserver
envelopeArchivedWatcher := make(chan whisper.EnvelopeEvent, 1024)
sub := mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher)
defer sub.Unsubscribe()
// watch envelopes to be available for filters in the client
envelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024)
sub = senderWhisperService.SubscribeEnvelopeEvents(envelopeAvailableWatcher)
defer sub.Unsubscribe()
// watch mailserver responses in the client
mailServerResponseWatcher := make(chan whisper.EnvelopeEvent, 1024)
sub = senderWhisperService.SubscribeEnvelopeEvents(mailServerResponseWatcher)
defer sub.Unsubscribe()
// Create topic.
topic := whisper.BytesToTopic([]byte("topic name"))
@ -107,32 +114,33 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
messageHash := s.postMessageToPrivate(rpcClient, pubkey.String(), topic.String(), hexutil.Encode([]byte("Hello world!")))
// Get message to make sure that it will come from the mailbox later.
messages = s.getMessagesByMessageFilterIDWithTracer(rpcClient, messageFilterID, mailboxTracer, messageHash)
s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{messageHash}, whisper.EventEnvelopeAvailable)
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
s.Require().Equal(1, len(messages))
// Act.
events := make(chan whisper.EnvelopeEvent)
senderWhisperService.SubscribeEnvelopeEvents(events)
// wait for mailserver to archive all the envelopes
s.waitForEnvelopeEvents(envelopeArchivedWatcher, []string{messageHash}, whisper.EventMailServerEnvelopeArchived)
// Request messages (including the previous one, expired) from mailbox.
requestID := s.requestHistoricMessagesFromLast12Hours(senderWhisperService, rpcClient, mailboxPeerStr, MailServerKeyID, topic.String(), 0, "")
// wait for mail server response
resp := s.waitForMailServerResponse(mailServerResponseWatcher, requestID)
s.Equal(messageHash, resp.LastEnvelopeHash.String())
s.Empty(resp.Cursor)
// wait for last envelope sent by the mailserver to be available for filters
s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{resp.LastEnvelopeHash.String()}, whisper.EventEnvelopeAvailable)
// And we receive message, it comes from mailbox.
messages = s.getMessagesByMessageFilterIDWithTracer(rpcClient, messageFilterID, tracer, messageHash)
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
s.Require().Equal(1, len(messages))
// Check that there are no messages.
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
s.Require().Empty(messages)
select {
case e := <-events:
s.Equal(whisper.EventMailServerRequestCompleted, e.Event)
s.Equal(requestID, e.Hash)
case <-time.After(time.Second):
s.Fail("timed out while waiting for request completed event")
}
}
func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
@ -156,16 +164,13 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
mailboxNode := mailboxBackend.StatusNode().GethNode()
mailboxEnode := mailboxNode.Server().NodeInfo().Enode
err = aliceBackend.StatusNode().AddPeer(mailboxEnode)
s.Require().NoError(err)
err = bobBackend.StatusNode().AddPeer(mailboxEnode)
s.Require().NoError(err)
err = charlieBackend.StatusNode().AddPeer(mailboxEnode)
s.Require().NoError(err)
// Wait async processes on adding peer.
time.Sleep(500 * time.Millisecond)
s.addPeerSync(aliceBackend.StatusNode(), mailboxBackend.StatusNode())
s.addPeerSync(bobBackend.StatusNode(), mailboxBackend.StatusNode())
s.addPeerSync(charlieBackend.StatusNode(), mailboxBackend.StatusNode())
// Get whisper service.
mailboxWhisperService, err := mailboxBackend.StatusNode().WhisperService()
s.Require().NoError(err)
aliceWhisperService, err := aliceBackend.StatusNode().WhisperService()
s.Require().NoError(err)
bobWhisperService, err := bobBackend.StatusNode().WhisperService()
@ -177,12 +182,18 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
bobRPCClient := bobBackend.StatusNode().RPCClient()
charlieRPCClient := charlieBackend.StatusNode().RPCClient()
aliceTracer := newTracer()
aliceWhisperService.RegisterEnvelopeTracer(aliceTracer)
bobTracer := newTracer()
bobWhisperService.RegisterEnvelopeTracer(bobTracer)
charlieTracer := newTracer()
charlieWhisperService.RegisterEnvelopeTracer(charlieTracer)
// watchers
envelopeArchivedWatcher := make(chan whisper.EnvelopeEvent, 1024)
sub := mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher)
defer sub.Unsubscribe()
bobEnvelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024)
sub = bobWhisperService.SubscribeEnvelopeEvents(bobEnvelopeAvailableWatcher)
defer sub.Unsubscribe()
charlieEnvelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024)
sub = charlieWhisperService.SubscribeEnvelopeEvents(charlieEnvelopeAvailableWatcher)
defer sub.Unsubscribe()
// Bob and charlie add the mailserver key.
password := mailboxPassword
@ -235,7 +246,8 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
// Bob receive group chat data and add it to his node.
// Bob get group chat details.
messages := s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobMessageFilterID, bobTracer, aliceToBobMessageHash)
s.waitForEnvelopeEvents(bobEnvelopeAvailableWatcher, []string{aliceToBobMessageHash}, whisper.EventEnvelopeAvailable)
messages := s.getMessagesByMessageFilterID(bobRPCClient, bobMessageFilterID)
s.Require().Equal(1, len(messages))
bobGroupChatData := groupChatParams{}
err = bobGroupChatData.Decode(messages[0]["payload"].(string))
@ -251,7 +263,8 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
// Charlie receive group chat data and add it to his node.
// Charlie get group chat details.
messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieMessageFilterID, charlieTracer, aliceToCharlieMessageHash)
s.waitForEnvelopeEvents(charlieEnvelopeAvailableWatcher, []string{aliceToCharlieMessageHash}, whisper.EventEnvelopeAvailable)
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieMessageFilterID)
s.Require().Equal(1, len(messages))
charlieGroupChatData := groupChatParams{}
err = charlieGroupChatData.Decode(messages[0]["payload"].(string))
@ -270,12 +283,14 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
groupChatMessageHash := s.postMessageToGroup(aliceRPCClient, groupChatKeyID, groupChatTopic.String(), helloWorldMessage)
// Bob receive group chat message.
messages = s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobGroupChatMessageFilterID, bobTracer, groupChatMessageHash)
s.waitForEnvelopeEvents(bobEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable)
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
s.Require().Equal(1, len(messages))
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
// Charlie receive group chat message.
messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieGroupChatMessageFilterID, charlieTracer, groupChatMessageHash)
s.waitForEnvelopeEvents(charlieEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable)
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
s.Require().Equal(1, len(messages))
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
@ -285,17 +300,22 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
s.Require().Empty(messages)
// be sure that message has been archived
s.waitForEnvelopeEvents(envelopeArchivedWatcher, []string{groupChatMessageHash}, whisper.EventMailServerEnvelopeArchived)
// Request each one messages from mailbox using enode.
s.requestHistoricMessagesFromLast12Hours(bobWhisperService, bobRPCClient, mailboxEnode, bobMailServerKeyID, groupChatTopic.String(), 0, "")
s.requestHistoricMessagesFromLast12Hours(charlieWhisperService, charlieRPCClient, mailboxEnode, charlieMailServerKeyID, groupChatTopic.String(), 0, "")
// Bob receive p2p message from group chat filter.
messages = s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobGroupChatMessageFilterID, bobTracer, groupChatMessageHash)
s.waitForEnvelopeEvents(bobEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable)
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
s.Require().Equal(1, len(messages))
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
// Charlie receive p2p message from group chat filter.
messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieGroupChatMessageFilterID, charlieTracer, groupChatMessageHash)
s.waitForEnvelopeEvents(charlieEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable)
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
s.Require().Equal(1, len(messages))
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
}
@ -314,7 +334,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() {
clientRPCClient := client.StatusNode().RPCClient()
// Add mailbox to clients's peers
s.addPeerAndWait(client.StatusNode(), mailbox.StatusNode())
s.addPeerSync(client.StatusNode(), mailbox.StatusNode())
// Whisper services
mailboxWhisperService, err := mailbox.StatusNode().WhisperService()
@ -340,15 +360,18 @@ func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() {
// watch envelopes to be archived on mailserver
envelopeArchivedWatcher := make(chan whisper.EnvelopeEvent, 1024)
mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher)
sub := mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher)
defer sub.Unsubscribe()
// watch envelopes to be available for filters in the client
envelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024)
clientWhisperService.SubscribeEnvelopeEvents(envelopeAvailableWatcher)
sub = clientWhisperService.SubscribeEnvelopeEvents(envelopeAvailableWatcher)
defer sub.Unsubscribe()
// watch mailserver responses in the client
mailServerResponseWatcher := make(chan whisper.EnvelopeEvent, 1024)
clientWhisperService.SubscribeEnvelopeEvents(mailServerResponseWatcher)
sub = clientWhisperService.SubscribeEnvelopeEvents(mailServerResponseWatcher)
defer sub.Unsubscribe()
// send envelopes
for i := 0; i < envelopesCount; i++ {
@ -463,7 +486,7 @@ func (s *WhisperMailboxSuite) waitForMailServerResponse(events chan whisper.Enve
}
}
func (s *WhisperMailboxSuite) addPeerAndWait(node, other *node.StatusNode) {
func (s *WhisperMailboxSuite) addPeerSync(node, other *node.StatusNode) {
nodeInfo := node.GethNode().Server().NodeInfo()
nodeID := nodeInfo.ID
nodeEnode := nodeInfo.Enode
@ -612,13 +635,13 @@ func (s *WhisperMailboxSuite) createGroupChatMessageFilter(rpcCli *rpc.Client, s
return messageFilterID
}
func (s *WhisperMailboxSuite) postMessageToPrivate(rpcCli *rpc.Client, bobPubkey string, topic string, payload string) string {
func (s *WhisperMailboxSuite) postMessageToPrivate(rpcCli *rpc.Client, recipientPubkey string, topic string, payload string) string {
resp := rpcCli.CallRaw(`{
"jsonrpc": "2.0",
"method": "shh_post",
"params": [
{
"pubKey": "` + bobPubkey + `",
"pubKey": "` + recipientPubkey + `",
"topic": "` + topic + `",
"payload": "` + payload + `",
"powTarget": 0.001,
@ -679,30 +702,6 @@ func (s *WhisperMailboxSuite) getMessagesByMessageFilterID(rpcCli *rpc.Client, m
return messages.Result
}
func (s *WhisperMailboxSuite) getMessagesByMessageFilterIDWithTracer(rpcCli *rpc.Client, messageFilterID string, tracer *envelopeTracer, messageHash string) (messages []map[string]interface{}) {
select {
case envelope := <-tracer.envelopChan:
s.Require().Equal(envelope.Hash, messageHash)
case <-time.After(5 * time.Second):
s.Fail("Timed out waiting for new messages after 5 seconds")
}
// Attempt to retrieve messages up to 3 times, 1 second apart
// TODO: There is a lag between the time when the envelope is traced by EventTracer
// and when it is decoded and actually available. Ideally this would be event-driven as well
// I.e. instead of signing up for EnvelopeTracer, we'd sign up for an event happening later
// which tells us that a call to shh_getFilterMessages will return some messages
for i := 0; i < 3; i++ {
messages = s.getMessagesByMessageFilterID(rpcCli, messageFilterID)
if len(messages) > 0 {
break
}
time.Sleep(1 * time.Second)
}
return
}
// addSymKey added symkey to node and return symkeyID.
func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) string {
resp := rpcCli.CallRaw(`{"jsonrpc":"2.0","method":"shh_addSymKey",
@ -789,19 +788,3 @@ type baseRPCResponse struct {
Result interface{}
Error interface{}
}
// envelopeTracer traces incoming envelopes. We leverage it to know when a peer has received an envelope
// so we rely less on timeouts for the tests
type envelopeTracer struct {
envelopChan chan *whisper.EnvelopeMeta
}
func newTracer() *envelopeTracer {
return &envelopeTracer{make(chan *whisper.EnvelopeMeta, 1)}
}
// Trace is called for every incoming envelope.
func (t *envelopeTracer) Trace(envelope *whisper.EnvelopeMeta) {
// Do not block notifier
go func() { t.envelopChan <- envelope }()
}