Merge pull request #767 from status-im/issue/allow-messages-without-subscription-#766

[#766]: Allow messages without subscription
This commit is contained in:
Evgeny Danilenko 2018-03-27 19:13:10 +03:00 committed by GitHub
commit ce45237d64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 155 additions and 66 deletions

View File

@ -0,0 +1,54 @@
diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go
index a98760b9..06736e55 100644
--- a/whisper/whisperv6/doc.go
+++ b/whisper/whisperv6/doc.go
@@ -104,6 +104,9 @@ const (
peerSource envelopeSource = iota
// p2pSource indicates that envelop was received from a trusted peer.
p2pSource
+
+ forwarded = true
+ inHouse = false
)
// EnvelopeMeta keeps metadata of received envelopes.
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
index a9e12d4a..8bd991a9 100644
--- a/whisper/whisperv6/whisper.go
+++ b/whisper/whisperv6/whisper.go
@@ -658,7 +658,7 @@ func (whisper *Whisper) Unsubscribe(id string) error {
// Send injects a message into the whisper send queue, to be distributed in the
// network in the coming cycles.
func (whisper *Whisper) Send(envelope *Envelope) error {
- ok, err := whisper.add(envelope)
+ ok, err := whisper.add(envelope, inHouse)
if err != nil {
return err
}
@@ -745,7 +745,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
trouble := false
for _, env := range envelopes {
whisper.traceEnvelope(env, !whisper.isEnvelopeCached(env.Hash()), peerSource, p)
- cached, err := whisper.add(env)
+ cached, err := whisper.add(env, forwarded)
if err != nil {
trouble = true
log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
@@ -819,7 +819,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped.
-func (whisper *Whisper) add(envelope *Envelope) (bool, error) {
+func (whisper *Whisper) add(envelope *Envelope, isForwarded bool) (bool, error) {
now := uint32(time.Now().Unix())
sent := envelope.Expiry - envelope.TTL
@@ -852,7 +852,7 @@ func (whisper *Whisper) add(envelope *Envelope) (bool, error) {
}
}
- if !bloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) {
+ if isForwarded && !bloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) {
// maybe the value was recently changed, and the peers did not adjust yet.
// in this case the previous value is retrieved by BloomFilterTolerance()
// for a short period of peer synchronization.

View File

@ -36,6 +36,8 @@ Instructions for creating a patch from the command line:
- [`0014-whisperv6-notifications.patch`](./0014-whisperv6-notifications.patch) — adds Whisper v6 notifications (need to be reviewed and documented) - [`0014-whisperv6-notifications.patch`](./0014-whisperv6-notifications.patch) — adds Whisper v6 notifications (need to be reviewed and documented)
- [`0015-whisperv6-envelopes-tracing.patch`](./0015-whisperv6-envelopes-tracing.patch) — adds Whisper v6 envelope tracing (need to be reviewed and documented) - [`0015-whisperv6-envelopes-tracing.patch`](./0015-whisperv6-envelopes-tracing.patch) — adds Whisper v6 envelope tracing (need to be reviewed and documented)
- [`0018-geth-181-whisperv6-peer-race-cond-fix.patch`](./0018-geth-181-whisperv6-peer-race-cond-fix.patch) — Fixes race condition in Whisper v6. This has been merged upstream and this patch will need to be removed for 1.8.2. - [`0018-geth-181-whisperv6-peer-race-cond-fix.patch`](./0018-geth-181-whisperv6-peer-race-cond-fix.patch) — Fixes race condition in Whisper v6. This has been merged upstream and this patch will need to be removed for 1.8.2.
- [`0019-whisperv6-send-self-messages-without-subscribe.patch`](./0019-whisperv6-send-self-messages-without-subscribe.patch) — Allows user to send own messages without the subscription to it's topic
# Updating # Updating

View File

@ -28,7 +28,7 @@ func TestWhisperMailboxTestSuite(t *testing.T) {
} }
func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
//Start mailbox and status node // Start mailbox and status node.
mailboxBackend, stop := s.startMailboxBackend() mailboxBackend, stop := s.startMailboxBackend()
defer stop() defer stop()
mailboxNode, err := mailboxBackend.NodeManager().Node() mailboxNode, err := mailboxBackend.NodeManager().Node()
@ -44,13 +44,13 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
err = sender.NodeManager().AddPeer(mailboxEnode) err = sender.NodeManager().AddPeer(mailboxEnode)
s.Require().NoError(err) s.Require().NoError(err)
//wait async processes on adding peer // Wait async processes on adding peer.
time.Sleep(time.Second) time.Sleep(time.Second)
senderWhisperService, err := sender.NodeManager().WhisperService() senderWhisperService, err := sender.NodeManager().WhisperService()
s.Require().NoError(err) s.Require().NoError(err)
//Mark mailbox node trusted // Mark mailbox node trusted.
parsedNode, err := discover.ParseNode(mailboxNode.Server().NodeInfo().Enode) parsedNode, err := discover.ParseNode(mailboxNode.Server().NodeInfo().Enode)
s.Require().NoError(err) s.Require().NoError(err)
mailboxPeer := parsedNode.ID[:] mailboxPeer := parsedNode.ID[:]
@ -58,7 +58,7 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
err = senderWhisperService.AllowP2PMessagesFromPeer(mailboxPeer) err = senderWhisperService.AllowP2PMessagesFromPeer(mailboxPeer)
s.Require().NoError(err) s.Require().NoError(err)
//Generate mailbox symkey // Generate mailbox symkey.
password := "status-offline-inbox" password := "status-offline-inbox"
MailServerKeyID, err := senderWhisperService.AddSymKeyFromPassword(password) MailServerKeyID, err := senderWhisperService.AddSymKeyFromPassword(password)
s.Require().NoError(err) s.Require().NoError(err)
@ -66,34 +66,34 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
rpcClient := sender.NodeManager().RPCClient() rpcClient := sender.NodeManager().RPCClient()
s.Require().NotNil(rpcClient) s.Require().NotNil(rpcClient)
//create topic // Create topic.
topic := whisper.BytesToTopic([]byte("topic name")) topic := whisper.BytesToTopic([]byte("topic name"))
//Add key pair to whisper // Add key pair to whisper.
keyID, err := senderWhisperService.NewKeyPair() keyID, err := senderWhisperService.NewKeyPair()
s.Require().NoError(err) s.Require().NoError(err)
key, err := senderWhisperService.GetPrivateKey(keyID) key, err := senderWhisperService.GetPrivateKey(keyID)
s.Require().NoError(err) s.Require().NoError(err)
pubkey := hexutil.Bytes(crypto.FromECDSAPub(&key.PublicKey)) pubkey := hexutil.Bytes(crypto.FromECDSAPub(&key.PublicKey))
//Create message filter // Create message filter.
messageFilterID := s.createPrivateChatMessageFilter(rpcClient, keyID, topic.String()) messageFilterID := s.createPrivateChatMessageFilter(rpcClient, keyID, topic.String())
//There are no messages at filter // There are no messages at filter.
messages := s.getMessagesByMessageFilterID(rpcClient, messageFilterID) messages := s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
s.Require().Equal(0, len(messages)) s.Require().Equal(0, len(messages))
//Post message matching with filter (key and topic) // Post message matching with filter (key and topic).
s.postMessageToPrivate(rpcClient, pubkey.String(), topic.String(), hexutil.Encode([]byte("Hello world!"))) s.postMessageToPrivate(rpcClient, pubkey.String(), topic.String(), hexutil.Encode([]byte("Hello world!")))
//Get message to make sure that it will come from the mailbox later // Get message to make sure that it will come from the mailbox later.
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
s.Require().Equal(1, len(messages)) s.Require().Equal(1, len(messages))
//act // Act.
//Request messages (including the previous one, expired) from mailbox // Request messages (including the previous one, expired) from mailbox.
reqMessagesBody := `{ reqMessagesBody := `{
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 1, "id": 1,
@ -110,21 +110,21 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
reqMessagesResp := baseRPCResponse{} reqMessagesResp := baseRPCResponse{}
err = json.Unmarshal([]byte(resp), &reqMessagesResp) err = json.Unmarshal([]byte(resp), &reqMessagesResp)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Nil(reqMessagesResp.Err) s.Require().Nil(reqMessagesResp.Error)
//wait to receive message // Wait to receive message.
time.Sleep(time.Second) time.Sleep(time.Second)
//And we receive message, it comes from mailbox // And we receive message, it comes from mailbox.
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
s.Require().Equal(1, len(messages)) s.Require().Equal(1, len(messages))
//check that there are no messages // Check that there are no messages.
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
s.Require().Equal(0, len(messages)) s.Require().Equal(0, len(messages))
} }
func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
//Start mailbox, alice, bob, charlie node // Start mailbox, alice, bob, charlie node.
mailboxBackend, stop := s.startMailboxBackend() mailboxBackend, stop := s.startMailboxBackend()
defer stop() defer stop()
@ -137,7 +137,7 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
charlieBackend, stop := s.startBackend("charlie") charlieBackend, stop := s.startBackend("charlie")
defer stop() defer stop()
//add mailbox to static peers // Add mailbox to static peers.
mailboxNode, err := mailboxBackend.NodeManager().Node() mailboxNode, err := mailboxBackend.NodeManager().Node()
s.Require().NoError(err) s.Require().NoError(err)
mailboxEnode := mailboxNode.Server().NodeInfo().Enode mailboxEnode := mailboxNode.Server().NodeInfo().Enode
@ -148,40 +148,40 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
s.Require().NoError(err) s.Require().NoError(err)
err = charlieBackend.NodeManager().AddPeer(mailboxEnode) err = charlieBackend.NodeManager().AddPeer(mailboxEnode)
s.Require().NoError(err) s.Require().NoError(err)
//wait async processes on adding peer // Wait async processes on adding peer.
time.Sleep(time.Second) time.Sleep(time.Second)
//get whisper service // Get whisper service.
aliceWhisperService, err := aliceBackend.NodeManager().WhisperService() aliceWhisperService, err := aliceBackend.NodeManager().WhisperService()
s.Require().NoError(err) s.Require().NoError(err)
bobWhisperService, err := bobBackend.NodeManager().WhisperService() bobWhisperService, err := bobBackend.NodeManager().WhisperService()
s.Require().NoError(err) s.Require().NoError(err)
charlieWhisperService, err := charlieBackend.NodeManager().WhisperService() charlieWhisperService, err := charlieBackend.NodeManager().WhisperService()
s.Require().NoError(err) s.Require().NoError(err)
//get rpc client // Get rpc client.
aliceRPCClient := aliceBackend.NodeManager().RPCClient() aliceRPCClient := aliceBackend.NodeManager().RPCClient()
bobRPCClient := bobBackend.NodeManager().RPCClient() bobRPCClient := bobBackend.NodeManager().RPCClient()
charlieRPCClient := charlieBackend.NodeManager().RPCClient() charlieRPCClient := charlieBackend.NodeManager().RPCClient()
//bob and charlie add mailserver key // Bob and charlie add the mailserver key.
password := "status-offline-inbox" password := "status-offline-inbox"
bobMailServerKeyID, err := bobWhisperService.AddSymKeyFromPassword(password) bobMailServerKeyID, err := bobWhisperService.AddSymKeyFromPassword(password)
s.Require().NoError(err) s.Require().NoError(err)
charlieMailServerKeyID, err := charlieWhisperService.AddSymKeyFromPassword(password) charlieMailServerKeyID, err := charlieWhisperService.AddSymKeyFromPassword(password)
s.Require().NoError(err) s.Require().NoError(err)
//generate group chat symkey and topic // Generate a group chat symkey and topic.
groupChatKeyID, err := aliceWhisperService.GenerateSymKey() groupChatKeyID, err := aliceWhisperService.GenerateSymKey()
s.Require().NoError(err) s.Require().NoError(err)
groupChatKey, err := aliceWhisperService.GetSymKey(groupChatKeyID) groupChatKey, err := aliceWhisperService.GetSymKey(groupChatKeyID)
s.Require().NoError(err) s.Require().NoError(err)
//generate group chat topic // Generate a group chat topic.
groupChatTopic := whisper.BytesToTopic([]byte("groupChatTopic")) groupChatTopic := whisper.BytesToTopic([]byte("groupChatTopic"))
groupChatPayload := newGroupChatParams(groupChatKey, groupChatTopic) groupChatPayload := newGroupChatParams(groupChatKey, groupChatTopic)
payloadStr, err := groupChatPayload.Encode() payloadStr, err := groupChatPayload.Encode()
s.Require().NoError(err) s.Require().NoError(err)
//Add bob and charlie create key pairs to receive symmetric key for group chat from alice // Add Bob and Charlie's key pairs to receive the symmetric key for the group chat from Alice.
bobKeyID, err := bobWhisperService.NewKeyPair() bobKeyID, err := bobWhisperService.NewKeyPair()
s.Require().NoError(err) s.Require().NoError(err)
bobKey, err := bobWhisperService.GetPrivateKey(bobKeyID) bobKey, err := bobWhisperService.GetPrivateKey(bobKeyID)
@ -196,19 +196,19 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
charliePubkey := hexutil.Bytes(crypto.FromECDSAPub(&charlieKey.PublicKey)) charliePubkey := hexutil.Bytes(crypto.FromECDSAPub(&charlieKey.PublicKey))
charlieAliceKeySendTopic := whisper.BytesToTopic([]byte("charlieAliceKeySendTopic ")) charlieAliceKeySendTopic := whisper.BytesToTopic([]byte("charlieAliceKeySendTopic "))
//bob and charlie create message filter // Bob and charlie create message filter.
bobMessageFilterID := s.createPrivateChatMessageFilter(bobRPCClient, bobKeyID, bobAliceKeySendTopic.String()) bobMessageFilterID := s.createPrivateChatMessageFilter(bobRPCClient, bobKeyID, bobAliceKeySendTopic.String())
charlieMessageFilterID := s.createPrivateChatMessageFilter(charlieRPCClient, charlieKeyID, charlieAliceKeySendTopic.String()) charlieMessageFilterID := s.createPrivateChatMessageFilter(charlieRPCClient, charlieKeyID, charlieAliceKeySendTopic.String())
//Alice send message with symkey and topic to bob and charlie // Alice send message with symkey and topic to Bob and Charlie.
s.postMessageToPrivate(aliceRPCClient, bobPubkey.String(), bobAliceKeySendTopic.String(), payloadStr) s.postMessageToPrivate(aliceRPCClient, bobPubkey.String(), bobAliceKeySendTopic.String(), payloadStr)
s.postMessageToPrivate(aliceRPCClient, charliePubkey.String(), charlieAliceKeySendTopic.String(), payloadStr) s.postMessageToPrivate(aliceRPCClient, charliePubkey.String(), charlieAliceKeySendTopic.String(), payloadStr)
//wait to receive // Wait to receive.
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
//bob receive group chat data and add it to his node // Bob receive group chat data and add it to his node.
//1. bob get group chat details // Bob get group chat details.
messages := s.getMessagesByMessageFilterID(bobRPCClient, bobMessageFilterID) messages := s.getMessagesByMessageFilterID(bobRPCClient, bobMessageFilterID)
s.Require().Equal(1, len(messages)) s.Require().Equal(1, len(messages))
bobGroupChatData := groupChatParams{} bobGroupChatData := groupChatParams{}
@ -216,15 +216,15 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
s.Require().NoError(err) s.Require().NoError(err)
s.EqualValues(groupChatPayload, bobGroupChatData) s.EqualValues(groupChatPayload, bobGroupChatData)
//2. bob add symkey to his node // Bob add symkey to his node.
bobGroupChatSymkeyID := s.addSymKey(bobRPCClient, bobGroupChatData.Key) bobGroupChatSymkeyID := s.addSymKey(bobRPCClient, bobGroupChatData.Key)
s.Require().NotEmpty(bobGroupChatSymkeyID) s.Require().NotEmpty(bobGroupChatSymkeyID)
//3. bob create message filter to node by group chat topic // Bob create message filter to node by group chat topic.
bobGroupChatMessageFilterID := s.createGroupChatMessageFilter(bobRPCClient, bobGroupChatSymkeyID, bobGroupChatData.Topic) bobGroupChatMessageFilterID := s.createGroupChatMessageFilter(bobRPCClient, bobGroupChatSymkeyID, bobGroupChatData.Topic)
//charlie receive group chat data and add it to his node // Charlie receive group chat data and add it to his node.
//1. charlie get group chat details // Charlie get group chat details.
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieMessageFilterID) messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieMessageFilterID)
s.Require().Equal(1, len(messages)) s.Require().Equal(1, len(messages))
charlieGroupChatData := groupChatParams{} charlieGroupChatData := groupChatParams{}
@ -232,50 +232,78 @@ func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
s.Require().NoError(err) s.Require().NoError(err)
s.EqualValues(groupChatPayload, charlieGroupChatData) s.EqualValues(groupChatPayload, charlieGroupChatData)
//2. charlie add symkey to his node // Charlie add symkey to his node.
charlieGroupChatSymkeyID := s.addSymKey(charlieRPCClient, charlieGroupChatData.Key) charlieGroupChatSymkeyID := s.addSymKey(charlieRPCClient, charlieGroupChatData.Key)
s.Require().NotEmpty(charlieGroupChatSymkeyID) s.Require().NotEmpty(charlieGroupChatSymkeyID)
//3. charlie create message filter to node by group chat topic // Charlie create message filter to node by group chat topic.
charlieGroupChatMessageFilterID := s.createGroupChatMessageFilter(charlieRPCClient, charlieGroupChatSymkeyID, charlieGroupChatData.Topic) charlieGroupChatMessageFilterID := s.createGroupChatMessageFilter(charlieRPCClient, charlieGroupChatSymkeyID, charlieGroupChatData.Topic)
//alice send message to group chat // Alice send message to group chat.
helloWorldMessage := hexutil.Encode([]byte("Hello world!")) helloWorldMessage := hexutil.Encode([]byte("Hello world!"))
s.postMessageToGroup(aliceRPCClient, groupChatKeyID, groupChatTopic.String(), helloWorldMessage) s.postMessageToGroup(aliceRPCClient, groupChatKeyID, groupChatTopic.String(), helloWorldMessage)
time.Sleep(5 * time.Second) //it need to receive envelopes by bob and charlie nodes // It need to receive envelopes by bob and charlie nodes.
time.Sleep(5 * time.Second)
//bob receive group chat message // Bob receive group chat message.
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID) messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
s.Require().Equal(1, len(messages)) s.Require().Equal(1, len(messages))
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
//charlie receive group chat message // Charlie receive group chat message.
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID) messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
s.Require().Equal(1, len(messages)) s.Require().Equal(1, len(messages))
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
//check that we don't receive messages each one time // Check that we don't receive messages each one time.
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID) messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
s.Require().Equal(0, len(messages)) s.Require().Equal(0, len(messages))
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID) messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
s.Require().Equal(0, len(messages)) s.Require().Equal(0, len(messages))
//Request each one messages from mailbox using enode // Request each one messages from mailbox using enode.
s.requestHistoricMessages(bobRPCClient, mailboxEnode, bobMailServerKeyID, groupChatTopic.String()) s.requestHistoricMessages(bobRPCClient, mailboxEnode, bobMailServerKeyID, groupChatTopic.String())
s.requestHistoricMessages(charlieRPCClient, mailboxEnode, charlieMailServerKeyID, groupChatTopic.String()) s.requestHistoricMessages(charlieRPCClient, mailboxEnode, charlieMailServerKeyID, groupChatTopic.String())
time.Sleep(5 * time.Second) //wait to receive p2p messages // Wait to receive p2p messages.
time.Sleep(5 * time.Second)
//bob receive p2p message from grop chat filter // Bob receive p2p message from group chat filter.
messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID) messages = s.getMessagesByMessageFilterID(bobRPCClient, bobGroupChatMessageFilterID)
s.Require().Equal(1, len(messages)) s.Require().Equal(1, len(messages))
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
//charlie receive p2p message from grop chat filter // Charlie receive p2p message from group chat filter.
messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID) messages = s.getMessagesByMessageFilterID(charlieRPCClient, charlieGroupChatMessageFilterID)
s.Require().Equal(1, len(messages)) s.Require().Equal(1, len(messages))
s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string)) s.Require().Equal(helloWorldMessage, messages[0]["payload"].(string))
} }
func (s *WhisperMailboxSuite) TestSendMessageWithoutSubscription() {
aliceBackend, stop := s.startBackend("alice")
defer stop()
// We need to wait >= whisper.DefaultSyncAllowance seconds to update Bloom filter tolerated.
time.Sleep((whisper.DefaultSyncAllowance + 1) * time.Second)
// Get whisper service.
aliceWhisperService, err := aliceBackend.NodeManager().WhisperService()
s.Require().NoError(err)
// Get rpc client.
aliceRPCClient := aliceBackend.NodeManager().RPCClient()
// Generate group chat symkey and topic.
groupChatKeyID, err := aliceWhisperService.GenerateSymKey()
s.Require().NoError(err)
// Generate group chat topic.
groupChatTopic := whisper.BytesToTopic([]byte("groupChatTopic"))
// Alice send message to group chat.
helloWorldMessage := hexutil.Encode([]byte("Hello world!"))
s.postMessageToGroup(aliceRPCClient, groupChatKeyID, groupChatTopic.String(), helloWorldMessage)
}
func newGroupChatParams(symkey []byte, topic whisper.TopicType) groupChatParams { func newGroupChatParams(symkey []byte, topic whisper.TopicType) groupChatParams {
groupChatKeyStr := hexutil.Bytes(symkey).String() groupChatKeyStr := hexutil.Bytes(symkey).String()
return groupChatParams{ return groupChatParams{
@ -305,7 +333,7 @@ func (d *groupChatParams) Encode() (string, error) {
return hexutil.Bytes(payload).String(), nil return hexutil.Bytes(payload).String(), nil
} }
//Start status node // Start status node.
func (s *WhisperMailboxSuite) startBackend(name string) (*api.StatusBackend, func()) { func (s *WhisperMailboxSuite) startBackend(name string) (*api.StatusBackend, func()) {
datadir := filepath.Join(RootDir, ".ethereumtest/mailbox", name) datadir := filepath.Join(RootDir, ".ethereumtest/mailbox", name)
backend := api.NewStatusBackend() backend := api.NewStatusBackend()
@ -314,6 +342,8 @@ func (s *WhisperMailboxSuite) startBackend(name string) (*api.StatusBackend, fun
s.Require().NoError(err) s.Require().NoError(err)
s.Require().False(backend.IsNodeRunning()) s.Require().False(backend.IsNodeRunning())
nodeConfig.WhisperConfig.LightClient = true
if addr, err := GetRemoteURL(); err == nil { if addr, err := GetRemoteURL(); err == nil {
nodeConfig.UpstreamConfig.Enabled = true nodeConfig.UpstreamConfig.Enabled = true
nodeConfig.UpstreamConfig.URL = addr nodeConfig.UpstreamConfig.URL = addr
@ -332,7 +362,7 @@ func (s *WhisperMailboxSuite) startBackend(name string) (*api.StatusBackend, fun
} }
//Start mailbox node // Start mailbox node.
func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func()) { func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func()) {
mailboxBackend := api.NewStatusBackend() mailboxBackend := api.NewStatusBackend()
mailboxConfig, err := MakeTestNodeConfig(GetNetworkID()) mailboxConfig, err := MakeTestNodeConfig(GetNetworkID())
@ -359,7 +389,7 @@ func (s *WhisperMailboxSuite) startMailboxBackend() (*api.StatusBackend, func())
} }
} }
//createPrivateChatMessageFilter create message filter with asymmetric encryption // createPrivateChatMessageFilter create message filter with asymmetric encryption.
func (s *WhisperMailboxSuite) createPrivateChatMessageFilter(rpcCli *rpc.Client, privateKeyID string, topic string) string { func (s *WhisperMailboxSuite) createPrivateChatMessageFilter(rpcCli *rpc.Client, privateKeyID string, topic string) string {
resp := rpcCli.CallRaw(`{ resp := rpcCli.CallRaw(`{
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -373,12 +403,12 @@ func (s *WhisperMailboxSuite) createPrivateChatMessageFilter(rpcCli *rpc.Client,
err := json.Unmarshal([]byte(resp), &msgFilterResp) err := json.Unmarshal([]byte(resp), &msgFilterResp)
messageFilterID := msgFilterResp.Result messageFilterID := msgFilterResp.Result
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Nil(msgFilterResp.Err) s.Require().Nil(msgFilterResp.Error)
s.Require().NotEqual("", messageFilterID, resp) s.Require().NotEqual("", messageFilterID, resp)
return messageFilterID return messageFilterID
} }
//createGroupChatMessageFilter create message filter with symmetric encryption // createGroupChatMessageFilter create message filter with symmetric encryption.
func (s *WhisperMailboxSuite) createGroupChatMessageFilter(rpcCli *rpc.Client, symkeyID string, topic string) string { func (s *WhisperMailboxSuite) createGroupChatMessageFilter(rpcCli *rpc.Client, symkeyID string, topic string) string {
resp := rpcCli.CallRaw(`{ resp := rpcCli.CallRaw(`{
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -392,7 +422,7 @@ func (s *WhisperMailboxSuite) createGroupChatMessageFilter(rpcCli *rpc.Client, s
err := json.Unmarshal([]byte(resp), &msgFilterResp) err := json.Unmarshal([]byte(resp), &msgFilterResp)
messageFilterID := msgFilterResp.Result messageFilterID := msgFilterResp.Result
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Nil(msgFilterResp.Err) s.Require().Nil(msgFilterResp.Error)
s.Require().NotEqual("", messageFilterID, resp) s.Require().NotEqual("", messageFilterID, resp)
return messageFilterID return messageFilterID
} }
@ -414,7 +444,7 @@ func (s *WhisperMailboxSuite) postMessageToPrivate(rpcCli *rpc.Client, bobPubkey
postResp := baseRPCResponse{} postResp := baseRPCResponse{}
err := json.Unmarshal([]byte(resp), &postResp) err := json.Unmarshal([]byte(resp), &postResp)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Nil(postResp.Err) s.Require().Nil(postResp.Error)
} }
func (s *WhisperMailboxSuite) postMessageToGroup(rpcCli *rpc.Client, groupChatKeyID string, topic string, payload string) { func (s *WhisperMailboxSuite) postMessageToGroup(rpcCli *rpc.Client, groupChatKeyID string, topic string, payload string) {
@ -434,10 +464,10 @@ func (s *WhisperMailboxSuite) postMessageToGroup(rpcCli *rpc.Client, groupChatKe
postResp := baseRPCResponse{} postResp := baseRPCResponse{}
err := json.Unmarshal([]byte(resp), &postResp) err := json.Unmarshal([]byte(resp), &postResp)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Nil(postResp.Err) s.Require().Nil(postResp.Error)
} }
//getMessagesByMessageFilterID get received messages by messageFilterID // getMessagesByMessageFilterID gets received messages by messageFilterID.
func (s *WhisperMailboxSuite) getMessagesByMessageFilterID(rpcCli *rpc.Client, messageFilterID string) []map[string]interface{} { func (s *WhisperMailboxSuite) getMessagesByMessageFilterID(rpcCli *rpc.Client, messageFilterID string) []map[string]interface{} {
resp := rpcCli.CallRaw(`{ resp := rpcCli.CallRaw(`{
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -447,11 +477,11 @@ func (s *WhisperMailboxSuite) getMessagesByMessageFilterID(rpcCli *rpc.Client, m
messages := getFilterMessagesResponse{} messages := getFilterMessagesResponse{}
err := json.Unmarshal([]byte(resp), &messages) err := json.Unmarshal([]byte(resp), &messages)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Nil(messages.Err) s.Require().Nil(messages.Error)
return messages.Result return messages.Result
} }
//addSymKey added symkey to node and return symkeyID // addSymKey added symkey to node and return symkeyID.
func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) string { func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) string {
resp := rpcCli.CallRaw(`{"jsonrpc":"2.0","method":"shh_addSymKey", resp := rpcCli.CallRaw(`{"jsonrpc":"2.0","method":"shh_addSymKey",
"params":["` + symkey + `"], "params":["` + symkey + `"],
@ -459,13 +489,13 @@ func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) strin
symkeyAddResp := returnedIDResponse{} symkeyAddResp := returnedIDResponse{}
err := json.Unmarshal([]byte(resp), &symkeyAddResp) err := json.Unmarshal([]byte(resp), &symkeyAddResp)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Nil(symkeyAddResp.Err) s.Require().Nil(symkeyAddResp.Error)
symkeyID := symkeyAddResp.Result symkeyID := symkeyAddResp.Result
s.Require().NotEmpty(symkeyID) s.Require().NotEmpty(symkeyID)
return symkeyID return symkeyID
} }
//requestHistoricMessages ask mailnode to resend messagess // requestHistoricMessages asks a mailnode to resend messages.
func (s *WhisperMailboxSuite) requestHistoricMessages(rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) { func (s *WhisperMailboxSuite) requestHistoricMessages(rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) {
resp := rpcCli.CallRaw(`{ resp := rpcCli.CallRaw(`{
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -482,19 +512,19 @@ func (s *WhisperMailboxSuite) requestHistoricMessages(rpcCli *rpc.Client, mailbo
reqMessagesResp := baseRPCResponse{} reqMessagesResp := baseRPCResponse{}
err := json.Unmarshal([]byte(resp), &reqMessagesResp) err := json.Unmarshal([]byte(resp), &reqMessagesResp)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Nil(reqMessagesResp.Err) s.Require().Nil(reqMessagesResp.Error)
} }
type getFilterMessagesResponse struct { type getFilterMessagesResponse struct {
Result []map[string]interface{} Result []map[string]interface{}
Err interface{} Error interface{}
} }
type returnedIDResponse struct { type returnedIDResponse struct {
Result string Result string
Err interface{} Error interface{}
} }
type baseRPCResponse struct { type baseRPCResponse struct {
Result interface{} Result interface{}
Err interface{} Error interface{}
} }

View File

@ -104,6 +104,9 @@ const (
peerSource envelopeSource = iota peerSource envelopeSource = iota
// p2pSource indicates that envelop was received from a trusted peer. // p2pSource indicates that envelop was received from a trusted peer.
p2pSource p2pSource
forwarded = true
inHouse = false
) )
// EnvelopeMeta keeps metadata of received envelopes. // EnvelopeMeta keeps metadata of received envelopes.

View File

@ -658,7 +658,7 @@ func (whisper *Whisper) Unsubscribe(id string) error {
// Send injects a message into the whisper send queue, to be distributed in the // Send injects a message into the whisper send queue, to be distributed in the
// network in the coming cycles. // network in the coming cycles.
func (whisper *Whisper) Send(envelope *Envelope) error { func (whisper *Whisper) Send(envelope *Envelope) error {
ok, err := whisper.add(envelope) ok, err := whisper.add(envelope, inHouse)
if err != nil { if err != nil {
return err return err
} }
@ -745,7 +745,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
trouble := false trouble := false
for _, env := range envelopes { for _, env := range envelopes {
whisper.traceEnvelope(env, !whisper.isEnvelopeCached(env.Hash()), peerSource, p) whisper.traceEnvelope(env, !whisper.isEnvelopeCached(env.Hash()), peerSource, p)
cached, err := whisper.add(env) cached, err := whisper.add(env, forwarded)
if err != nil { if err != nil {
trouble = true trouble = true
log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
@ -819,7 +819,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// add inserts a new envelope into the message pool to be distributed within the // add inserts a new envelope into the message pool to be distributed within the
// whisper network. It also inserts the envelope into the expiration pool at the // whisper network. It also inserts the envelope into the expiration pool at the
// appropriate time-stamp. In case of error, connection should be dropped. // appropriate time-stamp. In case of error, connection should be dropped.
func (whisper *Whisper) add(envelope *Envelope) (bool, error) { func (whisper *Whisper) add(envelope *Envelope, isForwarded bool) (bool, error) {
now := uint32(time.Now().Unix()) now := uint32(time.Now().Unix())
sent := envelope.Expiry - envelope.TTL sent := envelope.Expiry - envelope.TTL
@ -852,7 +852,7 @@ func (whisper *Whisper) add(envelope *Envelope) (bool, error) {
} }
} }
if !bloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) { if isForwarded && !bloomFilterMatch(whisper.BloomFilter(), envelope.Bloom()) {
// maybe the value was recently changed, and the peers did not adjust yet. // maybe the value was recently changed, and the peers did not adjust yet.
// in this case the previous value is retrieved by BloomFilterTolerance() // in this case the previous value is retrieved by BloomFilterTolerance()
// for a short period of peer synchronization. // for a short period of peer synchronization.