Support for historic messages from MailServer (#503)
Add `shh_requestMessages` RPC method. It sends a message to MailServer that can return cached, possibly expired, Whisper message.
This commit is contained in:
parent
596b7ea2e1
commit
9559ff074a
|
@ -0,0 +1,220 @@
|
|||
package whisper
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
"github.com/ethereum/go-ethereum/whisper/whisperv5"
|
||||
"github.com/status-im/status-go/e2e"
|
||||
"github.com/status-im/status-go/geth/api"
|
||||
. "github.com/status-im/status-go/testing"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type WhisperMailboxSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestWhisperMailboxTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(WhisperMailboxSuite))
|
||||
}
|
||||
|
||||
func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
|
||||
//arrange
|
||||
mailboxBackend, stop := s.startMailboxBackend()
|
||||
defer stop()
|
||||
mailboxNode, err := mailboxBackend.NodeManager().Node()
|
||||
s.Require().NoError(err)
|
||||
mailboxEnode := mailboxNode.Server().NodeInfo().Enode
|
||||
|
||||
sender, stop := s.startBackend()
|
||||
defer stop()
|
||||
node, err := sender.NodeManager().Node()
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.Require().NotEqual(mailboxEnode, node.Server().NodeInfo().Enode)
|
||||
|
||||
err = sender.NodeManager().AddPeer(mailboxEnode)
|
||||
s.Require().NoError(err)
|
||||
//wait async processes on adding peer
|
||||
time.Sleep(time.Second)
|
||||
|
||||
w, err := sender.NodeManager().WhisperService()
|
||||
s.Require().NoError(err)
|
||||
|
||||
//Mark mailbox node trusted
|
||||
mailboxPeer, err := extractIdFromEnode(mailboxNode.Server().NodeInfo().Enode)
|
||||
s.Require().NoError(err)
|
||||
err = w.AllowP2PMessagesFromPeer(mailboxPeer)
|
||||
s.Require().NoError(err)
|
||||
|
||||
//Generate mailbox symkey
|
||||
password := "asdfasdf"
|
||||
MailServerKeyID, err := w.AddSymKeyFromPassword(password)
|
||||
s.Require().NoError(err)
|
||||
|
||||
rpcClient := sender.NodeManager().RPCClient()
|
||||
s.Require().NotNil(rpcClient)
|
||||
|
||||
//create topic
|
||||
topic := whisperv5.BytesToTopic([]byte("topic name"))
|
||||
|
||||
//Add key pair to whisper
|
||||
keyID, err := w.NewKeyPair()
|
||||
s.Require().NoError(err)
|
||||
key, err := w.GetPrivateKey(keyID)
|
||||
s.Require().NoError(err)
|
||||
pubkey := hexutil.Bytes(crypto.FromECDSAPub(&key.PublicKey))
|
||||
|
||||
//Create message filter
|
||||
resp := rpcClient.CallRaw(`{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "shh_newMessageFilter", "params": [
|
||||
{"privateKeyID": "` + keyID + `", "topics": [ "` + topic.String() + `"], "allowP2P":true}
|
||||
],
|
||||
"id": 1
|
||||
}`)
|
||||
msgFilterResp := newMessagesFilterResponse{}
|
||||
err = json.Unmarshal([]byte(resp), &msgFilterResp)
|
||||
messageFilterID := msgFilterResp.Result
|
||||
s.Require().NoError(err)
|
||||
s.Require().NotEqual("", messageFilterID)
|
||||
|
||||
//Threre are no messages at filter
|
||||
resp = rpcClient.CallRaw(`{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "shh_getFilterMessages",
|
||||
"params": ["` + messageFilterID + `"],
|
||||
"id": 1}`)
|
||||
messages := getFilterMessagesResponse{}
|
||||
err = json.Unmarshal([]byte(resp), &messages)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal(0, len(messages.Result))
|
||||
|
||||
//Post message
|
||||
rpcClient.CallRaw(`{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "shh_post",
|
||||
"params": [
|
||||
{
|
||||
"pubKey": "` + pubkey.String() + `",
|
||||
"topic": "` + topic.String() + `",
|
||||
"payload": "0x73656e74206265666f72652066696c7465722077617320616374697665202873796d6d657472696329",
|
||||
"powTarget": 0.001,
|
||||
"powTime": 2
|
||||
}
|
||||
],
|
||||
"id": 1}`)
|
||||
|
||||
//Threre are no messages, because it's sender filter
|
||||
resp = rpcClient.CallRaw(`{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "shh_getFilterMessages",
|
||||
"params": ["` + messageFilterID + `"],
|
||||
"id": 1}`)
|
||||
err = json.Unmarshal([]byte(resp), &messages)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal(0, len(messages.Result))
|
||||
|
||||
//act
|
||||
|
||||
//Request messages from mailbox
|
||||
rpcClient.CallRaw(`{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "shh_requestMessages",
|
||||
"params": [{
|
||||
"peer":"` + string(mailboxPeer) + `",
|
||||
"topic":"` + topic.String() + `",
|
||||
"symKeyID":"` + MailServerKeyID + `",
|
||||
"from":0,
|
||||
"to":` + strconv.FormatInt(time.Now().UnixNano(), 10) + `
|
||||
}]
|
||||
}`)
|
||||
|
||||
//wait to receive message
|
||||
time.Sleep(time.Second)
|
||||
//And we receive message
|
||||
resp = rpcClient.CallRaw(`{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "shh_getFilterMessages",
|
||||
"params": ["` + messageFilterID + `"],
|
||||
"id": 1}`)
|
||||
|
||||
err = json.Unmarshal([]byte(resp), &messages)
|
||||
//assert
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal(1, len(messages.Result))
|
||||
}
|
||||
|
||||
func (s *WhisperMailboxSuite) startBackend() (*api.StatusBackend, func()) {
|
||||
//Start sender node
|
||||
backend := api.NewStatusBackend()
|
||||
nodeConfig, err := e2e.MakeTestNodeConfig(GetNetworkID())
|
||||
s.Require().NoError(err)
|
||||
s.Require().False(backend.IsNodeRunning())
|
||||
nodeStarted, err := backend.StartNode(nodeConfig)
|
||||
s.Require().NoError(err)
|
||||
<-nodeStarted // wait till node is started
|
||||
s.Require().True(backend.IsNodeRunning())
|
||||
|
||||
return backend, func() {
|
||||
s.True(backend.IsNodeRunning())
|
||||
backendStopped, err := backend.StopNode()
|
||||
s.NoError(err)
|
||||
<-backendStopped
|
||||
s.False(backend.IsNodeRunning())
|
||||
}
|
||||
|
||||
}
|
||||
func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func()) {
|
||||
//Start mailbox node
|
||||
mailboxBackend := api.NewStatusBackend()
|
||||
mailboxConfig, err := e2e.MakeTestNodeConfig(GetNetworkID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
mailboxConfig.LightEthConfig.Enabled = false
|
||||
mailboxConfig.WhisperConfig.Enabled = true
|
||||
mailboxConfig.KeyStoreDir = "../../.ethereumtest/mailbox/"
|
||||
mailboxConfig.WhisperConfig.EnableMailServer = true
|
||||
mailboxConfig.WhisperConfig.IdentityFile = "../../static/keys/wnodekey"
|
||||
mailboxConfig.WhisperConfig.PasswordFile = "../../static/keys/wnodepassword"
|
||||
mailboxConfig.WhisperConfig.DataDir = "../../.ethereumtest/mailbox/w2"
|
||||
mailboxConfig.DataDir = "../../.ethereumtest/mailbox/"
|
||||
|
||||
mailboxNodeStarted, err := mailboxBackend.StartNode(mailboxConfig)
|
||||
s.Require().NoError(err)
|
||||
<-mailboxNodeStarted // wait till node is started
|
||||
s.Require().True(mailboxBackend.IsNodeRunning())
|
||||
return mailboxBackend, func() {
|
||||
s.True(mailboxBackend.IsNodeRunning())
|
||||
backendStopped, err := mailboxBackend.StopNode()
|
||||
s.NoError(err)
|
||||
<-backendStopped
|
||||
s.False(mailboxBackend.IsNodeRunning())
|
||||
}
|
||||
}
|
||||
|
||||
type getFilterMessagesResponse struct {
|
||||
Result []map[string]interface{}
|
||||
Err interface{}
|
||||
}
|
||||
|
||||
type newMessagesFilterResponse struct {
|
||||
Result string
|
||||
Err interface{}
|
||||
}
|
||||
|
||||
func extractIdFromEnode(s string) ([]byte, error) {
|
||||
n, err := discover.ParseNode(s)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to parse enode: %s", err)
|
||||
}
|
||||
return n.ID[:], nil
|
||||
}
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
"github.com/status-im/status-go/geth/txqueue"
|
||||
"github.com/status-im/status-go/geth/whisper"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -243,9 +244,13 @@ func (m *StatusBackend) registerHandlers() error {
|
|||
return node.ErrRPCClient
|
||||
}
|
||||
|
||||
handler, err := whisper.RequestHistoricMessagesHandler(m.nodeManager)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rpcClient.RegisterHandler("shh_requestMessages", handler)
|
||||
rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler())
|
||||
rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler)
|
||||
|
||||
m.txQueueManager.SetTransactionQueueHandler(m.txQueueManager.TransactionQueueHandler())
|
||||
log.Info("Registered handler", "fn", "TransactionQueueHandler")
|
||||
|
||||
|
|
|
@ -0,0 +1,167 @@
|
|||
package whisper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/whisper/whisperv5"
|
||||
"github.com/status-im/status-go/geth/common"
|
||||
"github.com/status-im/status-go/geth/rpc"
|
||||
)
|
||||
|
||||
var (
|
||||
//ErrInvalidNumberOfArgs - error invalid aruments in request
|
||||
ErrInvalidNumberOfArgs = fmt.Errorf("invalid number of arguments, expected 1")
|
||||
//ErrInvalidArgs - error invalid request format
|
||||
ErrInvalidArgs = fmt.Errorf("invalid args")
|
||||
//ErrTopicNotExist - error topic field doesn't exist in request
|
||||
ErrTopicNotExist = fmt.Errorf("topic value does not exist")
|
||||
//ErrTopicNotString - error topic is not string type
|
||||
ErrTopicNotString = fmt.Errorf("topic value is not string")
|
||||
//ErrMailboxSymkeyIDNotExist - error symKeyID field doesn't exist in request
|
||||
ErrMailboxSymkeyIDNotExist = fmt.Errorf("symKeyID does not exist")
|
||||
//ErrMailboxSymkeyIDNotString - error symKeyID is not string type
|
||||
ErrMailboxSymkeyIDNotString = fmt.Errorf("symKeyID is not string")
|
||||
//ErrPeerNotExist - error peer field doesn't exist in request
|
||||
ErrPeerNotExist = fmt.Errorf("peer does not exist")
|
||||
//ErrPeerNotString - error peer is not string type
|
||||
ErrPeerNotString = fmt.Errorf("peer is not string")
|
||||
)
|
||||
|
||||
const defaultWorkTime = 5
|
||||
|
||||
//RequestHistoricMessagesHandler returns an RPC handler which sends a p2p request for historic messages.
|
||||
func RequestHistoricMessagesHandler(nodeManager common.NodeManager) (rpc.Handler, error) {
|
||||
whisper, err := nodeManager.WhisperService()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
node, err := nodeManager.Node()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return func(ctx context.Context, args ...interface{}) (interface{}, error) {
|
||||
r, err := parseArgs(args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
symkey, err := whisper.GetSymKey(r.SymkeyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.PoW = whisper.MinPow()
|
||||
env, err := makeEnvelop(r, symkey, node.Server().PrivateKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = whisper.RequestHistoricMessages(r.Peer, env)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
type historicMessagesRequest struct {
|
||||
Peer []byte //mailbox peer
|
||||
TimeLow uint32 //resend messages from
|
||||
TimeUp uint32 //resend messages to
|
||||
Topic whisperv5.TopicType //resend messages by topic
|
||||
SymkeyID string //Mailbox symmetric key id
|
||||
PoW float64 //whisper proof of work
|
||||
}
|
||||
|
||||
func parseArgs(args ...interface{}) (historicMessagesRequest, error) {
|
||||
var (
|
||||
r = historicMessagesRequest{
|
||||
TimeLow: uint32(time.Now().Add(-24 * time.Hour).Unix()),
|
||||
TimeUp: uint32(time.Now().Unix()),
|
||||
}
|
||||
)
|
||||
|
||||
if len(args) != 1 {
|
||||
return historicMessagesRequest{}, ErrInvalidNumberOfArgs
|
||||
}
|
||||
|
||||
historicMessagesArgs, ok := args[0].(map[string]interface{})
|
||||
if !ok {
|
||||
return historicMessagesRequest{}, ErrInvalidArgs
|
||||
}
|
||||
|
||||
if t, ok := historicMessagesArgs["from"]; ok {
|
||||
if parsed, ok := t.(uint32); ok {
|
||||
r.TimeLow = parsed
|
||||
}
|
||||
}
|
||||
if t, ok := historicMessagesArgs["to"]; ok {
|
||||
if parsed, ok := t.(uint32); ok {
|
||||
r.TimeUp = parsed
|
||||
}
|
||||
}
|
||||
topicInterfaceValue, ok := historicMessagesArgs["topic"]
|
||||
if !ok {
|
||||
return historicMessagesRequest{}, ErrTopicNotExist
|
||||
}
|
||||
|
||||
topicStringValue, ok := topicInterfaceValue.(string)
|
||||
if !ok {
|
||||
return historicMessagesRequest{}, ErrTopicNotString
|
||||
}
|
||||
|
||||
if err := r.Topic.UnmarshalText([]byte(topicStringValue)); err != nil {
|
||||
return historicMessagesRequest{}, nil
|
||||
}
|
||||
|
||||
symkeyIDInterfaceValue, ok := historicMessagesArgs["symKeyID"]
|
||||
if !ok {
|
||||
return historicMessagesRequest{}, ErrMailboxSymkeyIDNotExist
|
||||
}
|
||||
r.SymkeyID, ok = symkeyIDInterfaceValue.(string)
|
||||
if !ok {
|
||||
return historicMessagesRequest{}, ErrMailboxSymkeyIDNotString
|
||||
}
|
||||
|
||||
peerInterfaceValue, ok := historicMessagesArgs["peer"]
|
||||
if !ok {
|
||||
return historicMessagesRequest{}, ErrPeerNotExist
|
||||
}
|
||||
r.Peer, ok = peerInterfaceValue.([]byte)
|
||||
if !ok {
|
||||
return historicMessagesRequest{}, ErrPeerNotString
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
//makeEnvelop make envelop for request histtoric messages. symmetric key to authenticate to MailServer node and pk is the current node ID.
|
||||
func makeEnvelop(r historicMessagesRequest, symkey []byte, pk *ecdsa.PrivateKey) (*whisperv5.Envelope, error) {
|
||||
var params whisperv5.MessageParams
|
||||
params.PoW = r.PoW
|
||||
params.Payload = makePayloadData(r)
|
||||
params.KeySym = symkey
|
||||
params.WorkTime = defaultWorkTime
|
||||
params.Src = pk
|
||||
|
||||
message, err := whisperv5.NewSentMessage(¶ms)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return message.Wrap(¶ms)
|
||||
}
|
||||
|
||||
//makePayloadData make specific payload for mailserver
|
||||
func makePayloadData(r historicMessagesRequest) []byte {
|
||||
data := make([]byte, 8+whisperv5.TopicLength)
|
||||
binary.BigEndian.PutUint32(data, r.TimeLow)
|
||||
binary.BigEndian.PutUint32(data[4:], r.TimeUp)
|
||||
copy(data[8:], r.Topic[:])
|
||||
return data
|
||||
}
|
Loading…
Reference in New Issue