Use `EnvelopeTracer` as an event source to avoid relying on timeouts in tests. Fixes #1021
Fix bug in mailserver implementation which was using upper bound `to` parameter as an exclusive, rather than inclusive parameter, as stated in the documentation: https://github.com/status-im/status-go/wiki/Additional-JSON-RPC-API
This commit is contained in:
parent
fa390a52ae
commit
fac5576988
|
@ -28,7 +28,7 @@ jobs:
|
||||||
# ACCOUNT_PASSWORD environment variable anyway.
|
# ACCOUNT_PASSWORD environment variable anyway.
|
||||||
if: ((type != push) OR (branch = "develop")) AND (fork = false)
|
if: ((type != push) OR (branch = "develop")) AND (fork = false)
|
||||||
script:
|
script:
|
||||||
# Sync the chain first. It will time out after 30 minutes. Used network: Rinkeby.
|
# Sync the chain first. It will time out after 45 minutes. Used network: Rinkeby.
|
||||||
- make statusgo
|
- make statusgo
|
||||||
- ./build/bin/statusd -datadir=.ethereumtest/Rinkeby -les -networkid=4 -sync-and-exit=45 -log=WARN -standalone=false -discovery=false -ntp=false
|
- ./build/bin/statusd -datadir=.ethereumtest/Rinkeby -les -networkid=4 -sync-and-exit=45 -log=WARN -standalone=false -discovery=false -ntp=false
|
||||||
- make test-e2e networkid=4
|
- make test-e2e networkid=4
|
||||||
|
|
|
@ -217,7 +217,7 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl
|
||||||
var err error
|
var err error
|
||||||
var zero common.Hash
|
var zero common.Hash
|
||||||
kl := NewDbKey(lower, zero)
|
kl := NewDbKey(lower, zero)
|
||||||
ku := NewDbKey(upper, zero)
|
ku := NewDbKey(upper+1, zero) // LevelDB is exclusive, while the Whisper API is inclusive
|
||||||
i := s.db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
|
i := s.db.NewIterator(&util.Range{Start: kl.raw, Limit: ku.raw}, nil)
|
||||||
defer i.Release()
|
defer i.Release()
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
|
||||||
err = sender.StatusNode().AddPeer(mailboxEnode)
|
err = sender.StatusNode().AddPeer(mailboxEnode)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
// Wait async processes on adding peer.
|
// Wait async processes on adding peer.
|
||||||
time.Sleep(time.Second)
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
senderWhisperService, err := sender.StatusNode().WhisperService()
|
senderWhisperService, err := sender.StatusNode().WhisperService()
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
@ -72,6 +72,14 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
|
||||||
rpcClient := sender.StatusNode().RPCClient()
|
rpcClient := sender.StatusNode().RPCClient()
|
||||||
s.Require().NotNil(rpcClient)
|
s.Require().NotNil(rpcClient)
|
||||||
|
|
||||||
|
mailboxWhisperService, _ := mailboxBackend.StatusNode().WhisperService()
|
||||||
|
s.Require().NotNil(mailboxWhisperService)
|
||||||
|
mailboxTracer := newTracer()
|
||||||
|
mailboxWhisperService.RegisterEnvelopeTracer(mailboxTracer)
|
||||||
|
|
||||||
|
tracer := newTracer()
|
||||||
|
senderWhisperService.RegisterEnvelopeTracer(tracer)
|
||||||
|
|
||||||
// Create topic.
|
// Create topic.
|
||||||
topic := whisper.BytesToTopic([]byte("topic name"))
|
topic := whisper.BytesToTopic([]byte("topic name"))
|
||||||
|
|
||||||
|
@ -87,14 +95,13 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
|
||||||
|
|
||||||
// There are no messages at filter.
|
// There are no messages at filter.
|
||||||
messages := s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
|
messages := s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
|
||||||
s.Require().Equal(0, len(messages))
|
s.Require().Empty(messages)
|
||||||
|
|
||||||
// Post message matching with filter (key and topic).
|
// Post message matching with filter (key and topic).
|
||||||
s.postMessageToPrivate(rpcClient, pubkey.String(), topic.String(), hexutil.Encode([]byte("Hello world!")))
|
s.postMessageToPrivate(rpcClient, pubkey.String(), topic.String(), hexutil.Encode([]byte("Hello world!")))
|
||||||
|
|
||||||
// Get message to make sure that it will come from the mailbox later.
|
// Get message to make sure that it will come from the mailbox later.
|
||||||
time.Sleep(1 * time.Second)
|
messages = s.getMessagesByMessageFilterIDWithTracer(rpcClient, messageFilterID, mailboxTracer)
|
||||||
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
|
|
||||||
s.Require().Equal(1, len(messages))
|
s.Require().Equal(1, len(messages))
|
||||||
|
|
||||||
// Act.
|
// Act.
|
||||||
|
@ -106,15 +113,13 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
|
||||||
result := s.requestHistoricMessages(senderWhisperService, rpcClient, mailboxPeerStr, MailServerKeyID, topic.String())
|
result := s.requestHistoricMessages(senderWhisperService, rpcClient, mailboxPeerStr, MailServerKeyID, topic.String())
|
||||||
requestID := common.BytesToHash(result)
|
requestID := common.BytesToHash(result)
|
||||||
|
|
||||||
// Wait to receive message.
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
// And we receive message, it comes from mailbox.
|
// And we receive message, it comes from mailbox.
|
||||||
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
|
messages = s.getMessagesByMessageFilterIDWithTracer(rpcClient, messageFilterID, tracer)
|
||||||
s.Require().Equal(1, len(messages))
|
s.Require().Equal(1, len(messages))
|
||||||
|
|
||||||
// Check that there are no messages.
|
// Check that there are no messages.
|
||||||
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
|
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
|
||||||
s.Require().Equal(0, len(messages))
|
s.Require().Empty(messages)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case e := <-events:
|
case e := <-events:
|
||||||
|
@ -153,7 +158,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
|
||||||
err = charlieBackend.StatusNode().AddPeer(mailboxEnode)
|
err = charlieBackend.StatusNode().AddPeer(mailboxEnode)
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
// Wait async processes on adding peer.
|
// Wait async processes on adding peer.
|
||||||
time.Sleep(time.Second)
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
// Get whisper service.
|
// Get whisper service.
|
||||||
aliceWhisperService, err := aliceBackend.StatusNode().WhisperService()
|
aliceWhisperService, err := aliceBackend.StatusNode().WhisperService()
|
||||||
|
@ -167,6 +172,13 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
|
||||||
bobRPCClient := bobBackend.StatusNode().RPCClient()
|
bobRPCClient := bobBackend.StatusNode().RPCClient()
|
||||||
charlieRPCClient := charlieBackend.StatusNode().RPCClient()
|
charlieRPCClient := charlieBackend.StatusNode().RPCClient()
|
||||||
|
|
||||||
|
aliceTracer := newTracer()
|
||||||
|
aliceWhisperService.RegisterEnvelopeTracer(aliceTracer)
|
||||||
|
bobTracer := newTracer()
|
||||||
|
bobWhisperService.RegisterEnvelopeTracer(bobTracer)
|
||||||
|
charlieTracer := newTracer()
|
||||||
|
charlieWhisperService.RegisterEnvelopeTracer(charlieTracer)
|
||||||
|
|
||||||
// Bob and charlie add the mailserver key.
|
// Bob and charlie add the mailserver key.
|
||||||
password := mailboxPassword
|
password := mailboxPassword
|
||||||
bobMailServerKeyID, err := bobWhisperService.AddSymKeyFromPassword(password)
|
bobMailServerKeyID, err := bobWhisperService.AddSymKeyFromPassword(password)
|
||||||
|
@ -216,12 +228,9 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
|
||||||
s.postMessageToPrivate(aliceRPCClient, bobPubkey.String(), bobAliceKeySendTopic.String(), payloadStr)
|
s.postMessageToPrivate(aliceRPCClient, bobPubkey.String(), bobAliceKeySendTopic.String(), payloadStr)
|
||||||
s.postMessageToPrivate(aliceRPCClient, charliePubkey.String(), charlieAliceKeySendTopic.String(), payloadStr)
|
s.postMessageToPrivate(aliceRPCClient, charliePubkey.String(), charlieAliceKeySendTopic.String(), payloadStr)
|
||||||
|
|
||||||
// Wait to receive.
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
|
|
||||||
// Bob receive group chat data and add it to his node.
|
// Bob receive group chat data and add it to his node.
|
||||||
// Bob get group chat details.
|
// Bob get group chat details.
|
||||||
messages := s.getMessagesByMessageFilterID(bobRPCClient, bobMessageFilterID)
|
messages := s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobMessageFilterID, bobTracer)
|
||||||
s.Require().Equal(1, len(messages))
|
s.Require().Equal(1, len(messages))
|
||||||
bobGroupChatData := groupChatParams{}
|
bobGroupChatData := groupChatParams{}
|
||||||
err = bobGroupChatData.Decode(messages[0]["payload"].(string))
|
err = bobGroupChatData.Decode(messages[0]["payload"].(string))
|
||||||
|
@ -237,7 +246,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
|
||||||
|
|
||||||
// Charlie receive group chat data and add it to his node.
|
// Charlie receive group chat data and add it to his node.
|
||||||
// Charlie get group chat details.
|
// Charlie get group chat details.
|
||||||
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieMessageFilterID)
|
messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieMessageFilterID, charlieTracer)
|
||||||
s.Require().Equal(1, len(messages))
|
s.Require().Equal(1, len(messages))
|
||||||
charlieGroupChatData := groupChatParams{}
|
charlieGroupChatData := groupChatParams{}
|
||||||
err = charlieGroupChatData.Decode(messages[0]["payload"].(string))
|
err = charlieGroupChatData.Decode(messages[0]["payload"].(string))
|
||||||
|
@ -254,38 +263,34 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
|
||||||
// Alice send message to group chat.
|
// Alice send message to group chat.
|
||||||
helloWorldMessage := hexutil.Encode([]byte("Hello world!"))
|
helloWorldMessage := hexutil.Encode([]byte("Hello world!"))
|
||||||
s.postMessageToGroup(aliceRPCClient, groupChatKeyID, groupChatTopic.String(), helloWorldMessage)
|
s.postMessageToGroup(aliceRPCClient, groupChatKeyID, groupChatTopic.String(), helloWorldMessage)
|
||||||
// It need to receive envelopes by bob and charlie nodes.
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
|
|
||||||
// Bob receive group chat message.
|
// Bob receive group chat message.
|
||||||
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
|
messages = s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobGroupChatMessageFilterID, bobTracer)
|
||||||
s.Require().Equal(1, len(messages))
|
s.Require().Equal(1, len(messages))
|
||||||
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
||||||
|
|
||||||
// Charlie receive group chat message.
|
// Charlie receive group chat message.
|
||||||
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
|
messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieGroupChatMessageFilterID, charlieTracer)
|
||||||
s.Require().Equal(1, len(messages))
|
s.Require().Equal(1, len(messages))
|
||||||
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
||||||
|
|
||||||
// Check that we don't receive messages each one time.
|
// Check that we don't receive messages each one time.
|
||||||
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
|
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
|
||||||
s.Require().Equal(0, len(messages))
|
s.Require().Empty(messages)
|
||||||
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
|
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
|
||||||
s.Require().Equal(0, len(messages))
|
s.Require().Empty(messages)
|
||||||
|
|
||||||
// Request each one messages from mailbox using enode.
|
// Request each one messages from mailbox using enode.
|
||||||
s.requestHistoricMessages(bobWhisperService, bobRPCClient, mailboxEnode, bobMailServerKeyID, groupChatTopic.String())
|
s.requestHistoricMessages(bobWhisperService, bobRPCClient, mailboxEnode, bobMailServerKeyID, groupChatTopic.String())
|
||||||
s.requestHistoricMessages(charlieWhisperService, charlieRPCClient, mailboxEnode, charlieMailServerKeyID, groupChatTopic.String())
|
s.requestHistoricMessages(charlieWhisperService, charlieRPCClient, mailboxEnode, charlieMailServerKeyID, groupChatTopic.String())
|
||||||
// Wait to receive p2p messages.
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
|
|
||||||
// Bob receive p2p message from group chat filter.
|
// Bob receive p2p message from group chat filter.
|
||||||
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
|
messages = s.getMessagesByMessageFilterIDWithTracer(bobRPCClient, bobGroupChatMessageFilterID, bobTracer)
|
||||||
s.Require().Equal(1, len(messages))
|
s.Require().Equal(1, len(messages))
|
||||||
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
||||||
|
|
||||||
// Charlie receive p2p message from group chat filter.
|
// Charlie receive p2p message from group chat filter.
|
||||||
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
|
messages = s.getMessagesByMessageFilterIDWithTracer(charlieRPCClient, charlieGroupChatMessageFilterID, charlieTracer)
|
||||||
s.Require().Equal(1, len(messages))
|
s.Require().Equal(1, len(messages))
|
||||||
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
||||||
}
|
}
|
||||||
|
@ -467,6 +472,29 @@ func (s *WhisperMailboxSuite) getMessagesByMessageFilterID(rpcCli *rpc.Client, m
|
||||||
return messages.Result
|
return messages.Result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *WhisperMailboxSuite) getMessagesByMessageFilterIDWithTracer(rpcCli *rpc.Client, messageFilterID string, tracer *envelopeTracer) (messages []map[string]interface{}) {
|
||||||
|
select {
|
||||||
|
case <-tracer.envelopChan:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
s.Fail("Timed out waiting for new messages after 5 seconds")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to retrieve messages up to 3 times, 1 second apart
|
||||||
|
// TODO: There is a lag between the time when the envelope is traced by EventTracer
|
||||||
|
// and when it is decoded and actually available. Ideally this would be event-driven as well
|
||||||
|
// I.e. instead of signing up for EnvelopeTracer, we'd sign up for an event happening later
|
||||||
|
// which tells us that a call to shh_getFilterMessages will return some messages
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
messages = s.getMessagesByMessageFilterID(rpcCli, messageFilterID)
|
||||||
|
if len(messages) > 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// addSymKey added symkey to node and return symkeyID.
|
// addSymKey added symkey to node and return symkeyID.
|
||||||
func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) string {
|
func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) string {
|
||||||
resp := rpcCli.CallRaw(`{"jsonrpc":"2.0","method":"shh_addSymKey",
|
resp := rpcCli.CallRaw(`{"jsonrpc":"2.0","method":"shh_addSymKey",
|
||||||
|
@ -483,7 +511,9 @@ func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) strin
|
||||||
|
|
||||||
// requestHistoricMessages asks a mailnode to resend messages.
|
// requestHistoricMessages asks a mailnode to resend messages.
|
||||||
func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) []byte {
|
func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) []byte {
|
||||||
from := w.GetCurrentTime().Add(-12 * time.Hour)
|
currentTime := w.GetCurrentTime()
|
||||||
|
from := currentTime.Add(-12 * time.Hour)
|
||||||
|
to := currentTime
|
||||||
resp := rpcCli.CallRaw(`{
|
resp := rpcCli.CallRaw(`{
|
||||||
"jsonrpc": "2.0",
|
"jsonrpc": "2.0",
|
||||||
"id": 2,
|
"id": 2,
|
||||||
|
@ -493,7 +523,7 @@ func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli
|
||||||
"topic":"` + topic + `",
|
"topic":"` + topic + `",
|
||||||
"symKeyID":"` + mailServerKeyID + `",
|
"symKeyID":"` + mailServerKeyID + `",
|
||||||
"from":` + strconv.FormatInt(from.Unix(), 10) + `,
|
"from":` + strconv.FormatInt(from.Unix(), 10) + `,
|
||||||
"to":` + strconv.FormatInt(w.GetCurrentTime().Unix(), 10) + `
|
"to":` + strconv.FormatInt(to.Unix(), 10) + `
|
||||||
}]
|
}]
|
||||||
}`)
|
}`)
|
||||||
reqMessagesResp := baseRPCResponse{}
|
reqMessagesResp := baseRPCResponse{}
|
||||||
|
@ -527,3 +557,19 @@ type baseRPCResponse struct {
|
||||||
Result interface{}
|
Result interface{}
|
||||||
Error interface{}
|
Error interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// envelopeTracer traces incoming envelopes. We leverage it to know when a peer has received an envelope
|
||||||
|
// so we rely less on timeouts for the tests
|
||||||
|
type envelopeTracer struct {
|
||||||
|
envelopChan chan *whisper.EnvelopeMeta
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTracer() *envelopeTracer {
|
||||||
|
return &envelopeTracer{make(chan *whisper.EnvelopeMeta, 1)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trace is called for every incoming envelope.
|
||||||
|
func (t *envelopeTracer) Trace(envelope *whisper.EnvelopeMeta) {
|
||||||
|
// Do not block notifier
|
||||||
|
go func() { t.envelopChan <- envelope }()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue