mirror of
https://github.com/status-im/status-go.git
synced 2025-01-13 08:05:40 +00:00
965 lines
34 KiB
Go
965 lines
34 KiB
Go
package whisper
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/status-im/status-go/services/shhext"
|
|
|
|
"os"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/status-im/status-go/api"
|
|
"github.com/status-im/status-go/mailserver"
|
|
"github.com/status-im/status-go/params"
|
|
"github.com/status-im/status-go/rpc"
|
|
"github.com/status-im/status-go/t/helpers"
|
|
"github.com/status-im/status-go/t/utils"
|
|
whisper "github.com/status-im/whisper/whisperv6"
|
|
"github.com/stretchr/testify/suite"
|
|
"golang.org/x/crypto/sha3"
|
|
)
|
|
|
|
const mailboxPassword = "status-offline-inbox"
|
|
|
|
type WhisperMailboxSuite struct {
|
|
suite.Suite
|
|
}
|
|
|
|
func TestWhisperMailboxTestSuite(t *testing.T) {
|
|
suite.Run(t, new(WhisperMailboxSuite))
|
|
}
|
|
|
|
func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
|
|
var err error
|
|
// Start mailbox and status node.
|
|
mailboxBackend, stop := s.startMailboxBackend("")
|
|
defer stop()
|
|
s.Require().True(mailboxBackend.IsNodeRunning())
|
|
mailboxNode := mailboxBackend.StatusNode().GethNode()
|
|
mailboxEnode := mailboxNode.Server().NodeInfo().Enode
|
|
|
|
sender, stop := s.startBackend("sender")
|
|
defer stop()
|
|
s.Require().True(sender.IsNodeRunning())
|
|
node := sender.StatusNode().GethNode()
|
|
|
|
s.Require().NotEqual(mailboxEnode, node.Server().NodeInfo().Enode)
|
|
|
|
errCh := helpers.WaitForPeerAsync(sender.StatusNode().Server(), mailboxEnode, p2p.PeerEventTypeAdd, time.Second)
|
|
s.Require().NoError(sender.StatusNode().AddPeer(mailboxEnode))
|
|
s.Require().NoError(<-errCh)
|
|
|
|
senderWhisperService, err := sender.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
|
|
// This sleep still is required because there is still a race condition
|
|
// when adding a peer to p2p.Server and to Whisper service.
|
|
// It may happen that the event about the added peer
|
|
// was emitted by p2p.Server but Whisper service is not aware of it yet.
|
|
time.Sleep(time.Second)
|
|
|
|
// Mark mailbox node trusted.
|
|
err = senderWhisperService.AllowP2PMessagesFromPeer(mailboxNode.Server().Self().ID().Bytes())
|
|
s.Require().NoError(err)
|
|
|
|
// Generate mailbox symkey.
|
|
password := mailboxPassword
|
|
MailServerKeyID, err := senderWhisperService.AddSymKeyFromPassword(password)
|
|
s.Require().NoError(err)
|
|
|
|
rpcClient := sender.StatusNode().RPCPrivateClient()
|
|
s.Require().NotNil(rpcClient)
|
|
|
|
mailboxWhisperService, err := mailboxBackend.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
s.Require().NotNil(mailboxWhisperService)
|
|
|
|
// watch envelopes to be archived on mailserver
|
|
envelopeArchivedWatcher := make(chan whisper.EnvelopeEvent, 1024)
|
|
sub := mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher)
|
|
defer sub.Unsubscribe()
|
|
|
|
// watch envelopes to be available for filters in the client
|
|
envelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024)
|
|
sub = senderWhisperService.SubscribeEnvelopeEvents(envelopeAvailableWatcher)
|
|
defer sub.Unsubscribe()
|
|
|
|
// watch mailserver responses in the client
|
|
mailServerResponseWatcher := make(chan whisper.EnvelopeEvent, 1024)
|
|
sub = senderWhisperService.SubscribeEnvelopeEvents(mailServerResponseWatcher)
|
|
defer sub.Unsubscribe()
|
|
|
|
// Create topic.
|
|
topic := whisper.BytesToTopic([]byte("topic name"))
|
|
|
|
// Add key pair to whisper.
|
|
keyID, err := senderWhisperService.NewKeyPair()
|
|
s.Require().NoError(err)
|
|
key, err := senderWhisperService.GetPrivateKey(keyID)
|
|
s.Require().NoError(err)
|
|
pubkey := hexutil.Bytes(crypto.FromECDSAPub(&key.PublicKey))
|
|
|
|
// Create message filter.
|
|
messageFilterID := s.createPrivateChatMessageFilter(rpcClient, keyID, topic.String())
|
|
|
|
// There are no messages at filter.
|
|
messages := s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
|
|
s.Require().Empty(messages)
|
|
|
|
// Post message matching with filter (key and topic).
|
|
messageHash := s.postMessageToPrivate(rpcClient, pubkey.String(), topic.String(), hexutil.Encode([]byte("Hello world!")))
|
|
|
|
// Get message to make sure that it will come from the mailbox later.
|
|
s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{messageHash}, whisper.EventEnvelopeAvailable)
|
|
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
|
|
s.Require().Equal(1, len(messages))
|
|
|
|
// Act.
|
|
|
|
// wait for mailserver to archive all the envelopes
|
|
s.waitForEnvelopeEvents(envelopeArchivedWatcher, []string{messageHash}, whisper.EventMailServerEnvelopeArchived)
|
|
|
|
// Request messages (including the previous one, expired) from mailbox.
|
|
requestID := s.requestHistoricMessagesFromLast12Hours(senderWhisperService, rpcClient, mailboxNode.Server().Self().String(), MailServerKeyID, topic.String(), 0, "")
|
|
|
|
// wait for mail server response
|
|
resp := s.waitForMailServerResponse(mailServerResponseWatcher, requestID)
|
|
s.Require().Equal(messageHash, resp.LastEnvelopeHash.String())
|
|
s.Require().Empty(resp.Cursor)
|
|
|
|
// wait for last envelope sent by the mailserver to be available for filters
|
|
s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{resp.LastEnvelopeHash.String()}, whisper.EventEnvelopeAvailable)
|
|
|
|
// And we receive message, it comes from mailbox.
|
|
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
|
|
s.Require().Equal(1, len(messages))
|
|
|
|
// Check that there are no messages.
|
|
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
|
|
s.Require().Empty(messages)
|
|
}
|
|
|
|
func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
|
|
var err error
|
|
|
|
// Start mailbox, alice, bob, charlie node.
|
|
mailboxBackend, stop := s.startMailboxBackend("")
|
|
defer stop()
|
|
|
|
aliceBackend, stop := s.startBackend("alice")
|
|
defer stop()
|
|
|
|
bobBackend, stop := s.startBackend("bob")
|
|
defer stop()
|
|
|
|
charlieBackend, stop := s.startBackend("charlie")
|
|
defer stop()
|
|
|
|
// Add mailbox to static peers.
|
|
s.Require().True(mailboxBackend.IsNodeRunning())
|
|
mailboxNode := mailboxBackend.StatusNode().GethNode()
|
|
mailboxEnode := mailboxNode.Server().NodeInfo().Enode
|
|
|
|
aliceErrCh := helpers.WaitForPeerAsync(aliceBackend.StatusNode().Server(), mailboxEnode, p2p.PeerEventTypeAdd, 5*time.Second)
|
|
bobErrCh := helpers.WaitForPeerAsync(bobBackend.StatusNode().Server(), mailboxEnode, p2p.PeerEventTypeAdd, 5*time.Second)
|
|
charlieErrCh := helpers.WaitForPeerAsync(charlieBackend.StatusNode().Server(), mailboxEnode, p2p.PeerEventTypeAdd, 5*time.Second)
|
|
|
|
s.Require().NoError(aliceBackend.StatusNode().AddPeer(mailboxEnode))
|
|
s.Require().NoError(bobBackend.StatusNode().AddPeer(mailboxEnode))
|
|
s.Require().NoError(charlieBackend.StatusNode().AddPeer(mailboxEnode))
|
|
|
|
s.Require().NoError(<-aliceErrCh)
|
|
s.Require().NoError(<-bobErrCh)
|
|
s.Require().NoError(<-charlieErrCh)
|
|
|
|
// Get whisper service.
|
|
mailboxWhisperService, err := mailboxBackend.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
aliceWhisperService, err := aliceBackend.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
bobWhisperService, err := bobBackend.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
charlieWhisperService, err := charlieBackend.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
// Get rpc client.
|
|
aliceRPCClient := aliceBackend.StatusNode().RPCPrivateClient()
|
|
bobRPCClient := bobBackend.StatusNode().RPCPrivateClient()
|
|
charlieRPCClient := charlieBackend.StatusNode().RPCPrivateClient()
|
|
|
|
// watchers
|
|
envelopeArchivedWatcher := make(chan whisper.EnvelopeEvent, 1024)
|
|
sub := mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher)
|
|
defer sub.Unsubscribe()
|
|
|
|
bobEnvelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024)
|
|
sub = bobWhisperService.SubscribeEnvelopeEvents(bobEnvelopeAvailableWatcher)
|
|
defer sub.Unsubscribe()
|
|
|
|
charlieEnvelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024)
|
|
sub = charlieWhisperService.SubscribeEnvelopeEvents(charlieEnvelopeAvailableWatcher)
|
|
defer sub.Unsubscribe()
|
|
|
|
// Bob and charlie add the mailserver key.
|
|
password := mailboxPassword
|
|
bobMailServerKeyID, err := bobWhisperService.AddSymKeyFromPassword(password)
|
|
s.Require().NoError(err)
|
|
charlieMailServerKeyID, err := charlieWhisperService.AddSymKeyFromPassword(password)
|
|
s.Require().NoError(err)
|
|
|
|
// Generate a group chat symkey and topic.
|
|
groupChatKeyID, err := aliceWhisperService.GenerateSymKey()
|
|
s.Require().NoError(err)
|
|
groupChatKey, err := aliceWhisperService.GetSymKey(groupChatKeyID)
|
|
s.Require().NoError(err)
|
|
// Generate a group chat topic.
|
|
groupChatTopic := whisper.BytesToTopic([]byte("groupChatTopic"))
|
|
// sender must be subscribed to message topic it sends
|
|
s.NotNil(s.createGroupChatMessageFilter(aliceRPCClient, groupChatKeyID, groupChatTopic.String()))
|
|
groupChatPayload := newGroupChatParams(groupChatKey, groupChatTopic)
|
|
payloadStr, err := groupChatPayload.Encode()
|
|
s.Require().NoError(err)
|
|
|
|
// Add Bob and Charlie's key pairs to receive the symmetric key for the group chat from Alice.
|
|
bobKeyID, err := bobWhisperService.NewKeyPair()
|
|
s.Require().NoError(err)
|
|
bobKey, err := bobWhisperService.GetPrivateKey(bobKeyID)
|
|
s.Require().NoError(err)
|
|
bobPubkey := hexutil.Bytes(crypto.FromECDSAPub(&bobKey.PublicKey))
|
|
bobAliceKeySendTopic := whisper.BytesToTopic([]byte("bobAliceKeySendTopic "))
|
|
|
|
charlieKeyID, err := charlieWhisperService.NewKeyPair()
|
|
s.Require().NoError(err)
|
|
charlieKey, err := charlieWhisperService.GetPrivateKey(charlieKeyID)
|
|
s.Require().NoError(err)
|
|
charliePubkey := hexutil.Bytes(crypto.FromECDSAPub(&charlieKey.PublicKey))
|
|
charlieAliceKeySendTopic := whisper.BytesToTopic([]byte("charlieAliceKeySendTopic "))
|
|
|
|
// Alice must add peers topics into her own bloom filter.
|
|
aliceKeyID, err := aliceWhisperService.NewKeyPair()
|
|
s.Require().NoError(err)
|
|
s.createPrivateChatMessageFilter(aliceRPCClient, aliceKeyID, bobAliceKeySendTopic.String())
|
|
s.createPrivateChatMessageFilter(aliceRPCClient, aliceKeyID, charlieAliceKeySendTopic.String())
|
|
|
|
// Bob and charlie create message filter.
|
|
bobMessageFilterID := s.createPrivateChatMessageFilter(bobRPCClient, bobKeyID, bobAliceKeySendTopic.String())
|
|
charlieMessageFilterID := s.createPrivateChatMessageFilter(charlieRPCClient, charlieKeyID, charlieAliceKeySendTopic.String())
|
|
|
|
// Alice send message with symkey and topic to Bob and Charlie.
|
|
aliceToBobMessageHash := s.postMessageToPrivate(aliceRPCClient, bobPubkey.String(), bobAliceKeySendTopic.String(), payloadStr)
|
|
aliceToCharlieMessageHash := s.postMessageToPrivate(aliceRPCClient, charliePubkey.String(), charlieAliceKeySendTopic.String(), payloadStr)
|
|
|
|
// Bob receive group chat data and add it to his node.
|
|
// Bob get group chat details.
|
|
s.waitForEnvelopeEvents(bobEnvelopeAvailableWatcher, []string{aliceToBobMessageHash}, whisper.EventEnvelopeAvailable)
|
|
messages := s.getMessagesByMessageFilterID(bobRPCClient, bobMessageFilterID)
|
|
s.Require().Equal(1, len(messages))
|
|
bobGroupChatData := groupChatParams{}
|
|
err = bobGroupChatData.Decode(messages[0]["payload"].(string))
|
|
s.Require().NoError(err)
|
|
s.EqualValues(groupChatPayload, bobGroupChatData)
|
|
|
|
// Bob add symkey to his node.
|
|
bobGroupChatSymkeyID := s.addSymKey(bobRPCClient, bobGroupChatData.Key)
|
|
s.Require().NotEmpty(bobGroupChatSymkeyID)
|
|
|
|
// Bob create message filter to node by group chat topic.
|
|
bobGroupChatMessageFilterID := s.createGroupChatMessageFilter(bobRPCClient, bobGroupChatSymkeyID, bobGroupChatData.Topic)
|
|
|
|
// Charlie receive group chat data and add it to his node.
|
|
// Charlie get group chat details.
|
|
s.waitForEnvelopeEvents(charlieEnvelopeAvailableWatcher, []string{aliceToCharlieMessageHash}, whisper.EventEnvelopeAvailable)
|
|
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieMessageFilterID)
|
|
s.Require().Equal(1, len(messages))
|
|
charlieGroupChatData := groupChatParams{}
|
|
err = charlieGroupChatData.Decode(messages[0]["payload"].(string))
|
|
s.Require().NoError(err)
|
|
s.EqualValues(groupChatPayload, charlieGroupChatData)
|
|
|
|
// Charlie add symkey to his node.
|
|
charlieGroupChatSymkeyID := s.addSymKey(charlieRPCClient, charlieGroupChatData.Key)
|
|
s.Require().NotEmpty(charlieGroupChatSymkeyID)
|
|
|
|
// Charlie create message filter to node by group chat topic.
|
|
charlieGroupChatMessageFilterID := s.createGroupChatMessageFilter(charlieRPCClient, charlieGroupChatSymkeyID, charlieGroupChatData.Topic)
|
|
|
|
// Alice send message to group chat.
|
|
helloWorldMessage := hexutil.Encode([]byte("Hello world!"))
|
|
groupChatMessageHash := s.postMessageToGroup(aliceRPCClient, groupChatKeyID, groupChatTopic.String(), helloWorldMessage)
|
|
|
|
// Bob receive group chat message.
|
|
s.waitForEnvelopeEvents(bobEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable)
|
|
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
|
|
s.Require().Equal(1, len(messages))
|
|
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
|
|
|
// Charlie receive group chat message.
|
|
s.waitForEnvelopeEvents(charlieEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable)
|
|
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
|
|
s.Require().Equal(1, len(messages))
|
|
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
|
|
|
// Check that we don't receive messages each one time.
|
|
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
|
|
s.Require().Empty(messages)
|
|
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
|
|
s.Require().Empty(messages)
|
|
|
|
// be sure that message has been archived
|
|
s.waitForEnvelopeEvents(envelopeArchivedWatcher, []string{groupChatMessageHash}, whisper.EventMailServerEnvelopeArchived)
|
|
|
|
// Request each one messages from mailbox using enode.
|
|
s.requestHistoricMessagesFromLast12Hours(bobWhisperService, bobRPCClient, mailboxEnode, bobMailServerKeyID, groupChatTopic.String(), 0, "")
|
|
s.requestHistoricMessagesFromLast12Hours(charlieWhisperService, charlieRPCClient, mailboxEnode, charlieMailServerKeyID, groupChatTopic.String(), 0, "")
|
|
|
|
// Bob receive p2p message from group chat filter.
|
|
s.waitForEnvelopeEvents(bobEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable)
|
|
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
|
|
s.Require().Equal(1, len(messages))
|
|
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
|
|
|
// Charlie receive p2p message from group chat filter.
|
|
s.waitForEnvelopeEvents(charlieEnvelopeAvailableWatcher, []string{groupChatMessageHash}, whisper.EventEnvelopeAvailable)
|
|
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
|
|
s.Require().Equal(1, len(messages))
|
|
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
|
|
}
|
|
|
|
func (s *WhisperMailboxSuite) TestRequestMessagesWithPagination() {
|
|
// Start mailbox
|
|
mailbox, stop := s.startMailboxBackend("")
|
|
defer stop()
|
|
s.Require().True(mailbox.IsNodeRunning())
|
|
mailboxEnode := mailbox.StatusNode().GethNode().Server().NodeInfo().Enode
|
|
|
|
// Start client
|
|
client, stop := s.startBackend("client")
|
|
defer stop()
|
|
s.Require().True(client.IsNodeRunning())
|
|
clientRPCClient := client.StatusNode().RPCPrivateClient()
|
|
|
|
// Add mailbox to client's peers
|
|
errCh := helpers.WaitForPeerAsync(client.StatusNode().Server(), mailboxEnode, p2p.PeerEventTypeAdd, 5*time.Second)
|
|
s.Require().NoError(client.StatusNode().AddPeer(mailboxEnode))
|
|
s.Require().NoError(<-errCh)
|
|
|
|
// Whisper services
|
|
mailboxWhisperService, err := mailbox.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
clientWhisperService, err := client.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
// mailserver sym key
|
|
mailServerKeyID, err := clientWhisperService.AddSymKeyFromPassword(mailboxPassword)
|
|
s.Require().NoError(err)
|
|
|
|
// public chat
|
|
var (
|
|
keyID string
|
|
topic whisper.TopicType
|
|
filterID string
|
|
)
|
|
publicChatName := "test public chat"
|
|
keyID, topic, filterID = s.joinPublicChat(clientWhisperService, clientRPCClient, publicChatName)
|
|
|
|
// envelopes to be sent
|
|
envelopesCount := 5
|
|
sentEnvelopesHashes := make([]string, 0)
|
|
|
|
// watch envelopes to be archived on mailserver
|
|
envelopeArchivedWatcher := make(chan whisper.EnvelopeEvent, 1024)
|
|
sub := mailboxWhisperService.SubscribeEnvelopeEvents(envelopeArchivedWatcher)
|
|
defer sub.Unsubscribe()
|
|
|
|
// watch envelopes to be available for filters in the client
|
|
envelopeAvailableWatcher := make(chan whisper.EnvelopeEvent, 1024)
|
|
sub = clientWhisperService.SubscribeEnvelopeEvents(envelopeAvailableWatcher)
|
|
defer sub.Unsubscribe()
|
|
|
|
// watch mailserver responses in the client
|
|
mailServerResponseWatcher := make(chan whisper.EnvelopeEvent, 1024)
|
|
sub = clientWhisperService.SubscribeEnvelopeEvents(mailServerResponseWatcher)
|
|
defer sub.Unsubscribe()
|
|
|
|
// send envelopes
|
|
for i := 0; i < envelopesCount; i++ {
|
|
hash := s.postMessageToGroup(clientRPCClient, keyID, topic.String(), "")
|
|
sentEnvelopesHashes = append(sentEnvelopesHashes, hash)
|
|
}
|
|
|
|
// get messages from filter before requesting them to mailserver
|
|
lastEnvelopeHash := sentEnvelopesHashes[len(sentEnvelopesHashes)-1]
|
|
s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{lastEnvelopeHash}, whisper.EventEnvelopeAvailable)
|
|
messages := s.getMessagesByMessageFilterID(clientRPCClient, filterID)
|
|
s.Equal(envelopesCount, len(messages))
|
|
|
|
messages = s.getMessagesByMessageFilterID(clientRPCClient, filterID)
|
|
s.Equal(0, len(messages))
|
|
|
|
limit := 3
|
|
|
|
getMessages := func() []string {
|
|
envelopes := s.getMessagesByMessageFilterID(clientRPCClient, filterID)
|
|
hashes := make([]string, 0)
|
|
for _, e := range envelopes {
|
|
hashes = append(hashes, e["hash"].(string))
|
|
}
|
|
|
|
return hashes
|
|
}
|
|
|
|
requestMessages := func(cursor string) common.Hash {
|
|
return s.requestHistoricMessagesFromLast12Hours(clientWhisperService, clientRPCClient, mailboxEnode, mailServerKeyID, topic.String(), limit, cursor)
|
|
}
|
|
|
|
// wait for mailserver to archive all the envelopes
|
|
s.waitForEnvelopeEvents(envelopeArchivedWatcher, sentEnvelopesHashes, whisper.EventMailServerEnvelopeArchived)
|
|
|
|
// first page
|
|
// send request
|
|
requestID := requestMessages("")
|
|
// wait for mail server response
|
|
resp := s.waitForMailServerResponse(mailServerResponseWatcher, requestID)
|
|
s.NotEmpty(resp.LastEnvelopeHash)
|
|
s.NotEmpty(resp.Cursor)
|
|
// wait for last envelope sent by the mailserver to be available for filters
|
|
s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{resp.LastEnvelopeHash.String()}, whisper.EventEnvelopeAvailable)
|
|
// get messages
|
|
firstPageHashes := getMessages()
|
|
s.Equal(3, len(firstPageHashes))
|
|
|
|
// second page
|
|
// send request
|
|
requestID = requestMessages(fmt.Sprintf("%x", resp.Cursor))
|
|
// wait for mail server response
|
|
resp = s.waitForMailServerResponse(mailServerResponseWatcher, requestID)
|
|
s.NotEmpty(resp.LastEnvelopeHash)
|
|
// all messages have been sent, no more pages available
|
|
s.Empty(resp.Cursor)
|
|
// wait for last envelope sent by the mailserver to be available for filters
|
|
s.waitForEnvelopeEvents(envelopeAvailableWatcher, []string{resp.LastEnvelopeHash.String()}, whisper.EventEnvelopeAvailable)
|
|
// get messages
|
|
secondPageHashes := getMessages()
|
|
s.Equal(2, len(secondPageHashes))
|
|
|
|
allReceivedHashes := append(firstPageHashes, secondPageHashes...)
|
|
s.Equal(envelopesCount, len(allReceivedHashes))
|
|
|
|
// check that all the envelopes have been received
|
|
sort.Strings(sentEnvelopesHashes)
|
|
sort.Strings(allReceivedHashes)
|
|
s.Equal(sentEnvelopesHashes, allReceivedHashes)
|
|
}
|
|
|
|
// TestSyncBetweenTwoMailServers tests syncing state between two mail servers
|
|
// using `shhext_requestMessages`.
|
|
func (s *WhisperMailboxSuite) TestSyncBetweenTwoMailServers() {
|
|
var (
|
|
chatName = "some-public-chat"
|
|
)
|
|
|
|
// Start mailbox but with mail server disabled.
|
|
// MailServer will be registered below manually.
|
|
mailbox, stop := s.startMailboxBackendWithCallback("", func(c *params.NodeConfig) {
|
|
c.WhisperConfig.EnableMailServer = false
|
|
})
|
|
defer stop()
|
|
s.Require().True(mailbox.IsNodeRunning())
|
|
mailboxEnode := mailbox.StatusNode().GethNode().Server().NodeInfo().Enode
|
|
|
|
// Whisper services
|
|
mailboxWhisperService, err := mailbox.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
|
|
// symmetric key for public chats
|
|
symKeyID, err := mailboxWhisperService.AddSymKeyFromPassword(chatName)
|
|
s.Require().NoError(err)
|
|
publicChatSymKey, err := mailboxWhisperService.GetSymKey(symKeyID)
|
|
s.Require().NoError(err)
|
|
|
|
var mailServer mailserver.WMailServer
|
|
err = mailServer.Init(mailboxWhisperService, &mailbox.StatusNode().Config().WhisperConfig)
|
|
s.Require().NoError(err)
|
|
mailboxWhisperService.RegisterServer(&mailServer)
|
|
|
|
// envelopes to archive
|
|
envelopesCount := 5
|
|
topic := s.createPublicChatTopic(chatName)
|
|
for i := 0; i < envelopesCount; i++ {
|
|
params := whisper.MessageParams{
|
|
Topic: topic,
|
|
WorkTime: 10,
|
|
PoW: 2.0,
|
|
Payload: []byte("some-payload"),
|
|
KeySym: publicChatSymKey,
|
|
}
|
|
sentMessage, err := whisper.NewSentMessage(¶ms)
|
|
s.Require().NoError(err)
|
|
envelope, err := sentMessage.Wrap(¶ms, time.Now())
|
|
s.Require().NoError(err)
|
|
|
|
mailServer.Archive(envelope)
|
|
}
|
|
|
|
// Start a second mail server which would like to sync the state.
|
|
emptyMailbox, stopEmptyMailbox := s.startMailboxBackend("empty")
|
|
defer stopEmptyMailbox()
|
|
s.Require().True(emptyMailbox.IsNodeRunning())
|
|
|
|
emptyMailboxEnode := emptyMailbox.StatusNode().Server().Self().String()
|
|
|
|
errCh := helpers.WaitForPeerAsync(emptyMailbox.StatusNode().Server(), mailboxEnode, p2p.PeerEventTypeAdd, 5*time.Second)
|
|
s.Require().NoError(emptyMailbox.StatusNode().AddPeer(mailboxEnode))
|
|
s.Require().NoError(<-errCh)
|
|
|
|
emptyMailboxWhisperService, err := emptyMailbox.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
|
|
err = utils.Eventually(func() error {
|
|
return emptyMailboxWhisperService.AllowP2PMessagesFromPeer(
|
|
mailbox.StatusNode().Server().Self().ID().Bytes(),
|
|
)
|
|
}, time.Second*5, time.Millisecond*100)
|
|
s.Require().NoError(err)
|
|
|
|
emptyMailboxRPCClient := emptyMailbox.StatusNode().RPCPrivateClient()
|
|
s.Require().NotNil(emptyMailboxRPCClient)
|
|
|
|
// Ask to sync the first batch of messages.
|
|
// We artificially set Limit to 1 in order to test pagination.
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
|
|
var syncMessagesResponse shhext.SyncMessagesResponse
|
|
err = emptyMailboxRPCClient.CallContext(
|
|
ctx,
|
|
&syncMessagesResponse,
|
|
"shhext_syncMessages",
|
|
shhext.SyncMessagesRequest{
|
|
MailServerPeer: mailbox.StatusNode().Server().Self().String(),
|
|
From: 0,
|
|
To: uint32(time.Now().Unix()),
|
|
Limit: 1,
|
|
},
|
|
)
|
|
s.Require().NoError(err)
|
|
s.Require().NotEmpty(syncMessagesResponse.Cursor)
|
|
s.Require().Empty(syncMessagesResponse.Error)
|
|
|
|
// Ask to sync the rest of messages.
|
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
|
|
err = emptyMailboxRPCClient.CallContext(
|
|
ctx,
|
|
&syncMessagesResponse,
|
|
"shhext_syncMessages",
|
|
shhext.SyncMessagesRequest{
|
|
MailServerPeer: mailbox.StatusNode().Server().Self().String(),
|
|
From: 0,
|
|
To: uint32(time.Now().Unix()),
|
|
Limit: 10,
|
|
Cursor: syncMessagesResponse.Cursor,
|
|
},
|
|
)
|
|
s.Require().NoError(err)
|
|
s.Require().Empty(syncMessagesResponse.Cursor)
|
|
s.Require().Empty(syncMessagesResponse.Error)
|
|
|
|
// create and start a client
|
|
client, stop := s.startBackend("client")
|
|
defer stop()
|
|
s.Require().True(client.IsNodeRunning())
|
|
clientWhisperService, err := client.StatusNode().WhisperService()
|
|
s.Require().NoError(err)
|
|
clientRPCClient := client.StatusNode().RPCPrivateClient()
|
|
|
|
// create filter for messages
|
|
messagesKeyID, err := clientWhisperService.AddSymKeyFromPassword(chatName)
|
|
s.Require().NoError(err)
|
|
messageFilterID := s.createGroupChatMessageFilter(clientRPCClient, messagesKeyID, topic.String())
|
|
|
|
// add mailbox password
|
|
mailServerKeyID, err := clientWhisperService.AddSymKeyFromPassword(mailboxPassword)
|
|
s.Require().NoError(err)
|
|
|
|
// add the second mail server as a peer
|
|
errCh = helpers.WaitForPeerAsync(
|
|
client.StatusNode().Server(),
|
|
emptyMailboxEnode,
|
|
p2p.PeerEventTypeAdd,
|
|
5*time.Second)
|
|
s.Require().NoError(client.StatusNode().AddPeer(emptyMailboxEnode))
|
|
s.Require().NoError(<-errCh)
|
|
|
|
// request for messages
|
|
mailboxResponseWatcher := make(chan whisper.EnvelopeEvent, 1024)
|
|
sub := clientWhisperService.SubscribeEnvelopeEvents(mailboxResponseWatcher)
|
|
defer sub.Unsubscribe()
|
|
|
|
requestID := s.requestHistoricMessagesFromLast12Hours(
|
|
clientWhisperService,
|
|
clientRPCClient,
|
|
emptyMailboxEnode,
|
|
mailServerKeyID,
|
|
topic.String(),
|
|
0,
|
|
"",
|
|
)
|
|
response := s.waitForMailServerResponse(mailboxResponseWatcher, requestID)
|
|
s.Require().NotEqual(common.Hash{}.Bytes(), response.LastEnvelopeHash.Bytes())
|
|
|
|
// get messages
|
|
messages := s.getMessagesByMessageFilterID(clientRPCClient, messageFilterID)
|
|
s.Require().Len(messages, envelopesCount)
|
|
}
|
|
|
|
func (s *WhisperMailboxSuite) waitForEnvelopeEvents(events chan whisper.EnvelopeEvent, hashes []string, event whisper.EventType) {
|
|
check := make(map[string]struct{})
|
|
for _, hash := range hashes {
|
|
check[hash] = struct{}{}
|
|
}
|
|
|
|
timeout := time.NewTimer(time.Second * 5)
|
|
for {
|
|
select {
|
|
case e := <-events:
|
|
if e.Event == event {
|
|
delete(check, e.Hash.String())
|
|
if len(check) == 0 {
|
|
timeout.Stop()
|
|
return
|
|
}
|
|
}
|
|
case <-timeout.C:
|
|
s.FailNow("timed out while waiting for event on envelopes", "event: %s", event)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *WhisperMailboxSuite) waitForMailServerResponse(events chan whisper.EnvelopeEvent, requestID common.Hash) *whisper.MailServerResponse {
|
|
timeout := time.NewTimer(time.Second * 5)
|
|
for {
|
|
select {
|
|
case event := <-events:
|
|
if event.Event == whisper.EventMailServerRequestCompleted && event.Hash == requestID {
|
|
timeout.Stop()
|
|
resp, ok := event.Data.(*whisper.MailServerResponse)
|
|
if !ok {
|
|
s.FailNow("mailserver response error", "expected whisper.MailServerResponse, got: %+v", resp)
|
|
}
|
|
|
|
return resp
|
|
}
|
|
case <-timeout.C:
|
|
s.FailNow("timed out while waiting for mailserver response")
|
|
}
|
|
}
|
|
}
|
|
|
|
func newGroupChatParams(symkey []byte, topic whisper.TopicType) groupChatParams {
|
|
groupChatKeyStr := hexutil.Bytes(symkey).String()
|
|
return groupChatParams{
|
|
Key: groupChatKeyStr,
|
|
Topic: topic.String(),
|
|
}
|
|
}
|
|
|
|
type groupChatParams struct {
|
|
Key string
|
|
Topic string
|
|
}
|
|
|
|
func (d *groupChatParams) Decode(i string) error {
|
|
b, err := hexutil.Decode(i)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return json.Unmarshal(b, &d)
|
|
}
|
|
|
|
func (d *groupChatParams) Encode() (string, error) {
|
|
payload, err := json.Marshal(d)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return hexutil.Bytes(payload).String(), nil
|
|
}
|
|
|
|
// Start status node.
|
|
func (s *WhisperMailboxSuite) startBackend(name string) (*api.StatusBackend, func()) {
|
|
datadir := filepath.Join(utils.RootDir, ".ethereumtest/mailbox", name)
|
|
backend := api.NewStatusBackend()
|
|
nodeConfig, err := utils.MakeTestNodeConfig(utils.GetNetworkID())
|
|
nodeConfig.DataDir = datadir
|
|
s.Require().NoError(err)
|
|
s.Require().False(backend.IsNodeRunning())
|
|
|
|
nodeConfig.WhisperConfig.LightClient = true
|
|
|
|
if addr, err := utils.GetRemoteURL(); err == nil {
|
|
nodeConfig.UpstreamConfig.Enabled = true
|
|
nodeConfig.UpstreamConfig.URL = addr
|
|
}
|
|
|
|
s.Require().NoError(backend.StartNode(nodeConfig))
|
|
s.Require().True(backend.IsNodeRunning())
|
|
|
|
return backend, func() {
|
|
s.True(backend.IsNodeRunning())
|
|
s.NoError(backend.StopNode())
|
|
s.False(backend.IsNodeRunning())
|
|
err = os.RemoveAll(datadir)
|
|
s.Require().NoError(err)
|
|
}
|
|
|
|
}
|
|
|
|
// Start mailbox node.
|
|
func (s *WhisperMailboxSuite) startMailboxBackend(name string) (*api.StatusBackend, func()) {
|
|
return s.startMailboxBackendWithCallback(name, nil)
|
|
}
|
|
|
|
func (s *WhisperMailboxSuite) startMailboxBackendWithCallback(
|
|
name string,
|
|
callback func(*params.NodeConfig),
|
|
) (*api.StatusBackend, func()) {
|
|
if name == "" {
|
|
name = "mailserver"
|
|
}
|
|
|
|
mailboxConfig, err := utils.MakeTestNodeConfig(utils.GetNetworkID())
|
|
s.Require().NoError(err)
|
|
|
|
mailboxBackend := api.NewStatusBackend()
|
|
datadir := filepath.Join(utils.RootDir, ".ethereumtest/mailbox", name)
|
|
|
|
mailboxConfig.LightEthConfig.Enabled = false
|
|
mailboxConfig.WhisperConfig.Enabled = true
|
|
mailboxConfig.KeyStoreDir = datadir
|
|
mailboxConfig.WhisperConfig.EnableMailServer = true
|
|
mailboxConfig.WhisperConfig.MailServerPassword = "status-offline-inbox"
|
|
mailboxConfig.WhisperConfig.DataDir = filepath.Join(datadir, "data")
|
|
mailboxConfig.DataDir = datadir
|
|
|
|
if callback != nil {
|
|
callback(mailboxConfig)
|
|
}
|
|
|
|
s.Require().False(mailboxBackend.IsNodeRunning())
|
|
s.Require().NoError(mailboxBackend.StartNode(mailboxConfig))
|
|
s.Require().True(mailboxBackend.IsNodeRunning())
|
|
return mailboxBackend, func() {
|
|
s.True(mailboxBackend.IsNodeRunning())
|
|
s.NoError(mailboxBackend.StopNode())
|
|
s.False(mailboxBackend.IsNodeRunning())
|
|
err := os.RemoveAll(datadir)
|
|
s.Require().NoError(err)
|
|
}
|
|
}
|
|
|
|
// createPrivateChatMessageFilter create message filter with asymmetric encryption.
|
|
func (s *WhisperMailboxSuite) createPrivateChatMessageFilter(rpcCli *rpc.Client, privateKeyID string, topic string) string {
|
|
resp := rpcCli.CallRaw(`{
|
|
"jsonrpc": "2.0",
|
|
"method": "shh_newMessageFilter", "params": [
|
|
{"privateKeyID": "` + privateKeyID + `", "topics": [ "` + topic + `"], "allowP2P":true}
|
|
],
|
|
"id": 1
|
|
}`)
|
|
|
|
msgFilterResp := returnedIDResponse{}
|
|
err := json.Unmarshal([]byte(resp), &msgFilterResp)
|
|
messageFilterID := msgFilterResp.Result
|
|
s.Require().NoError(err)
|
|
s.Require().Nil(msgFilterResp.Error)
|
|
s.Require().NotEqual("", messageFilterID, resp)
|
|
return messageFilterID
|
|
}
|
|
|
|
// createGroupChatMessageFilter create message filter with symmetric encryption.
|
|
func (s *WhisperMailboxSuite) createGroupChatMessageFilter(rpcCli *rpc.Client, symkeyID string, topic string) string {
|
|
resp := rpcCli.CallRaw(`{
|
|
"jsonrpc": "2.0",
|
|
"method": "shh_newMessageFilter", "params": [
|
|
{"symKeyID": "` + symkeyID + `", "topics": ["` + topic + `"], "allowP2P": true}
|
|
],
|
|
"id": 1
|
|
}`)
|
|
|
|
msgFilterResp := returnedIDResponse{}
|
|
err := json.Unmarshal([]byte(resp), &msgFilterResp)
|
|
messageFilterID := msgFilterResp.Result
|
|
s.Require().NoError(err)
|
|
s.Require().Nil(msgFilterResp.Error)
|
|
s.Require().NotEqual("", messageFilterID, resp)
|
|
return messageFilterID
|
|
}
|
|
|
|
func (s *WhisperMailboxSuite) postMessageToPrivate(rpcCli *rpc.Client, recipientPubkey string, topic string, payload string) string {
|
|
resp := rpcCli.CallRaw(`{
|
|
"jsonrpc": "2.0",
|
|
"method": "shh_post",
|
|
"params": [
|
|
{
|
|
"pubKey": "` + recipientPubkey + `",
|
|
"topic": "` + topic + `",
|
|
"payload": "` + payload + `",
|
|
"powTarget": 0.001,
|
|
"powTime": 2
|
|
}
|
|
],
|
|
"id": 1}`)
|
|
postResp := baseRPCResponse{}
|
|
err := json.Unmarshal([]byte(resp), &postResp)
|
|
s.Require().NoError(err)
|
|
s.Require().Nil(postResp.Error)
|
|
|
|
return postResp.Result.(string)
|
|
}
|
|
|
|
func (s *WhisperMailboxSuite) postMessageToGroup(rpcCli *rpc.Client, groupChatKeyID string, topic string, payload string) string {
|
|
resp := rpcCli.CallRaw(`{
|
|
"jsonrpc": "2.0",
|
|
"method": "shh_post",
|
|
"params": [
|
|
{
|
|
"symKeyID": "` + groupChatKeyID + `",
|
|
"topic": "` + topic + `",
|
|
"payload": "` + payload + `",
|
|
"powTarget": 0.001,
|
|
"powTime": 2
|
|
}
|
|
],
|
|
"id": 1}`)
|
|
postResp := baseRPCResponse{}
|
|
err := json.Unmarshal([]byte(resp), &postResp)
|
|
s.Require().NoError(err)
|
|
s.Require().Nil(postResp.Error)
|
|
|
|
hash, ok := postResp.Result.(string)
|
|
if !ok {
|
|
s.FailNow("error decoding result", "expected string, got: %+v", postResp.Result)
|
|
}
|
|
|
|
if !strings.HasPrefix(hash, "0x") {
|
|
s.FailNow("hash format error", "expected hex string, got: %s", hash)
|
|
}
|
|
|
|
return hash
|
|
}
|
|
|
|
// getMessagesByMessageFilterID gets received messages by messageFilterID.
|
|
func (s *WhisperMailboxSuite) getMessagesByMessageFilterID(rpcCli *rpc.Client, messageFilterID string) []map[string]interface{} {
|
|
resp := rpcCli.CallRaw(`{
|
|
"jsonrpc": "2.0",
|
|
"method": "shh_getFilterMessages",
|
|
"params": ["` + messageFilterID + `"],
|
|
"id": 1}`)
|
|
messages := getFilterMessagesResponse{}
|
|
err := json.Unmarshal([]byte(resp), &messages)
|
|
s.Require().NoError(err)
|
|
s.Require().Nil(messages.Error)
|
|
return messages.Result
|
|
}
|
|
|
|
// addSymKey added symkey to node and return symkeyID.
|
|
func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) string {
|
|
resp := rpcCli.CallRaw(`{"jsonrpc":"2.0","method":"shh_addSymKey",
|
|
"params":["` + symkey + `"],
|
|
"id":1}`)
|
|
symkeyAddResp := returnedIDResponse{}
|
|
err := json.Unmarshal([]byte(resp), &symkeyAddResp)
|
|
s.Require().NoError(err)
|
|
s.Require().Nil(symkeyAddResp.Error)
|
|
symkeyID := symkeyAddResp.Result
|
|
s.Require().NotEmpty(symkeyID)
|
|
return symkeyID
|
|
}
|
|
|
|
// requestHistoricMessagesFromLast12Hours asks a mailnode to resend messages from last 12 hours.
|
|
func (s *WhisperMailboxSuite) requestHistoricMessagesFromLast12Hours(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string, limit int, cursor string) common.Hash {
|
|
currentTime := w.GetCurrentTime()
|
|
from := currentTime.Add(-12 * time.Hour)
|
|
to := currentTime
|
|
return s.requestHistoricMessages(w, rpcCli, mailboxEnode, mailServerKeyID, topic, from, to, limit, cursor)
|
|
}
|
|
|
|
// requestHistoricMessages asks a mailnode to resend messages.
|
|
func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string, from, to time.Time, limit int, cursor string) common.Hash {
|
|
resp := rpcCli.CallRaw(`{
|
|
"jsonrpc": "2.0",
|
|
"id": 2,
|
|
"method": "shhext_requestMessages",
|
|
"params": [{
|
|
"mailServerPeer":"` + mailboxEnode + `",
|
|
"topic":"` + topic + `",
|
|
"symKeyID":"` + mailServerKeyID + `",
|
|
"from":` + strconv.FormatInt(from.Unix(), 10) + `,
|
|
"to":` + strconv.FormatInt(to.Unix(), 10) + `,
|
|
"limit": ` + fmt.Sprintf("%d", limit) + `,
|
|
"cursor": "` + cursor + `"
|
|
}]
|
|
}`)
|
|
reqMessagesResp := baseRPCResponse{}
|
|
err := json.Unmarshal([]byte(resp), &reqMessagesResp)
|
|
s.Require().NoError(err)
|
|
s.Require().Nil(reqMessagesResp.Error)
|
|
|
|
switch hash := reqMessagesResp.Result.(type) {
|
|
case string:
|
|
s.Require().True(strings.HasPrefix(hash, "0x"))
|
|
b, err := hex.DecodeString(hash[2:])
|
|
s.Require().NoError(err)
|
|
return common.BytesToHash(b)
|
|
default:
|
|
s.Failf("failed reading shh_newMessageFilter result", "expected a hash, got: %+v", reqMessagesResp.Result)
|
|
}
|
|
|
|
return common.Hash{}
|
|
}
|
|
|
|
func (s *WhisperMailboxSuite) createPublicChatTopic(name string) whisper.TopicType {
|
|
h := sha3.NewLegacyKeccak256()
|
|
_, err := h.Write([]byte(name))
|
|
if err != nil {
|
|
s.Fail("error generating topic", "failed gerating topic from chat name, %+v", err)
|
|
}
|
|
return whisper.BytesToTopic(h.Sum(nil))
|
|
}
|
|
|
|
func (s *WhisperMailboxSuite) joinPublicChat(w *whisper.Whisper, rpcClient *rpc.Client, name string) (string, whisper.TopicType, string) {
|
|
keyID, err := w.AddSymKeyFromPassword(name)
|
|
s.Require().NoError(err)
|
|
|
|
topic := s.createPublicChatTopic(name)
|
|
|
|
filterID := s.createGroupChatMessageFilter(rpcClient, keyID, topic.String())
|
|
|
|
return keyID, topic, filterID
|
|
}
|
|
|
|
type getFilterMessagesResponse struct {
|
|
Result []map[string]interface{}
|
|
Error interface{}
|
|
}
|
|
|
|
type returnedIDResponse struct {
|
|
Result string
|
|
Error interface{}
|
|
}
|
|
type baseRPCResponse struct {
|
|
Result interface{}
|
|
Error interface{}
|
|
}
|