RequestHistoricMessages response (#1009)

* refactor TestRequestMessageFromMailboxAsync to use s.requestHistoricMessages helper

* send p2pRequestResponseCode from mailserver

* send p2p message response to after sending all historic messages

* mailserver sends `whisper.NewSentMessage` as response

* add mailserver Client and p2pRequestAckCode watchers

* send event with envelopeFeed when p2pRequestAckCode is received

* test request completed event in tracker

* rename mailserver response events and code to RequestCompleteCode

* wait for mailserver response in e2e test

* use SendHistoricMessageResponse method name for mailserver response

* fix lint warnings

* add mailserver request expiration

* send mailserver response without envelope

* add `ttl` to Request struct in shhext_requestMessages

* test that tracker calls handler.MailServerRequestExpired

* add geth patch

* rename TTL to Timeout

* split tracker.handleEvent in multiple methods
This commit is contained in:
Andrea Franz 2018-06-15 17:12:31 +02:00 committed by GitHub
parent b3a8991eb1
commit fa390a52ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 399 additions and 81 deletions

View File

@ -0,0 +1,110 @@
diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go
index 4bbf554..2fcc9e6 100644
--- a/whisper/whisperv6/doc.go
+++ b/whisper/whisperv6/doc.go
@@ -44,13 +44,14 @@ const (
ProtocolName = "shh" // Nickname of the protocol in geth
// whisper protocol message codes, according to EIP-627
- statusCode = 0 // used by whisper protocol
- messagesCode = 1 // normal whisper message
- powRequirementCode = 2 // PoW requirement
- bloomFilterExCode = 3 // bloom filter exchange
- p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
- p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
- NumberOfMessageCodes = 128
+ statusCode = 0 // used by whisper protocol
+ messagesCode = 1 // normal whisper message
+ powRequirementCode = 2 // PoW requirement
+ bloomFilterExCode = 3 // bloom filter exchange
+ p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
+ p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
+ p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
+ NumberOfMessageCodes = 128
SizeMask = byte(3) // mask used to extract the size of payload size field from the flags
signatureFlag = byte(4)
diff --git a/whisper/whisperv6/events.go b/whisper/whisperv6/events.go
index e03ec9d..1665539 100644
--- a/whisper/whisperv6/events.go
+++ b/whisper/whisperv6/events.go
@@ -1,23 +1,27 @@
package whisperv6
import (
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/p2p/discover"
)
// EventType used to define known envelope events.
type EventType string
const (
- // EventEnvelopeSent fires when envelope was sent to a peer.
- EventEnvelopeSent EventType = "envelope.sent"
- // EventEnvelopeExpired fires when envelop expired
- EventEnvelopeExpired EventType = "envelope.expired"
+ // EventEnvelopeSent fires when envelope was sent to a peer.
+ EventEnvelopeSent EventType = "envelope.sent"
+ // EventEnvelopeExpired fires when envelop expired
+ EventEnvelopeExpired EventType = "envelope.expired"
+ // EventMailServerRequestCompleted fires after mailserver sends all the requested messages
+ EventMailServerRequestCompleted EventType = "mailserver.request.completed"
+ // EventMailServerRequestExpired fires after mailserver the request TTL ends
+ EventMailServerRequestExpired EventType = "mailserver.request.expired"
)
// EnvelopeEvent used for envelopes events.
type EnvelopeEvent struct {
- Event EventType
- Hash common.Hash
- Peer discover.NodeID
+ Event EventType
+ Hash common.Hash
+ Peer discover.NodeID
}
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
index 697f0ec..4a7b006 100644
--- a/whisper/whisperv6/whisper.go
+++ b/whisper/whisperv6/whisper.go
@@ -378,6 +378,15 @@ func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelop
return p2p.Send(p.ws, p2pRequestCode, envelope)
}
+func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, requestID common.Hash) error {
+ size, r, err := rlp.EncodeToReader(requestID)
+ if err != nil {
+ return err
+ }
+
+ return peer.ws.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r})
+}
+
// SendP2PMessage sends a peer-to-peer message to a specific peer.
func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
p, err := whisper.getPeer(peerID)
@@ -821,8 +830,22 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid p2p request")
}
+
whisper.mailServer.DeliverMail(p, &request)
}
+ case p2pRequestCompleteCode:
+ if p.trusted {
+ var requestID common.Hash
+ if err := packet.Decode(&requestID); err != nil {
+ log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ return errors.New("invalid request response message")
+ }
+
+ whisper.envelopeFeed.Send(EnvelopeEvent{
+ Hash: requestID,
+ Event: EventMailServerRequestCompleted,
+ })
+ }
default:
// New message types might be implemented in the future versions of Whisper.
// For forward compatibility, just ignore.

View File

@ -190,6 +190,9 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
if ok, lower, upper, bloom := s.validateRequest(peer.ID(), request); ok { if ok, lower, upper, bloom := s.validateRequest(peer.ID(), request); ok {
s.processRequest(peer, lower, upper, bloom) s.processRequest(peer, lower, upper, bloom)
if err := s.sendHistoricMessageResponse(peer, request); err != nil {
log.Error(fmt.Sprintf("SendHistoricMessageResponse error: %s", err))
}
} }
} }
@ -260,6 +263,10 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl
return ret return ret
} }
func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope) error {
return s.w.SendHistoricMessageResponse(peer, request.Hash())
}
// validateRequest runs different validations on the current request. // validateRequest runs different validations on the current request.
func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (bool, uint32, uint32, []byte) { func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (bool, uint32, uint32, []byte) {
if s.pow > 0.0 && request.PoW() < s.pow { if s.pow > 0.0 && request.PoW() < s.pow {

View File

@ -18,6 +18,8 @@ import (
const ( const (
// defaultWorkTime is a work time reported in messages sent to MailServer nodes. // defaultWorkTime is a work time reported in messages sent to MailServer nodes.
defaultWorkTime = 5 defaultWorkTime = 5
// defaultRequestTimeout is the default request timeout in seconds
defaultRequestTimeout = 10
) )
var ( var (
@ -50,6 +52,10 @@ type MessagesRequest struct {
// SymKeyID is an ID of a symmetric key to authenticate to MailServer. // SymKeyID is an ID of a symmetric key to authenticate to MailServer.
// It's derived from MailServer password. // It's derived from MailServer password.
SymKeyID string `json:"symKeyID"` SymKeyID string `json:"symKeyID"`
// Timeout is the time to live of the request specified in seconds.
// Default is 10 seconds
Timeout time.Duration `json:"timeout"`
} }
func (r *MessagesRequest) setDefaults(now time.Time) { func (r *MessagesRequest) setDefaults(now time.Time) {
@ -66,6 +72,10 @@ func (r *MessagesRequest) setDefaults(now time.Time) {
r.From = r.To - oneDay r.From = r.To - oneDay
} }
} }
if r.Timeout == 0 {
r.Timeout = defaultRequestTimeout
}
} }
// ----- // -----
@ -100,30 +110,34 @@ func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hash he
} }
// RequestMessages sends a request for historic messages to a MailServer. // RequestMessages sends a request for historic messages to a MailServer.
func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (bool, error) { func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hexutil.Bytes, error) {
api.log.Info("RequestMessages", "request", r) api.log.Info("RequestMessages", "request", r)
shh := api.service.w shh := api.service.w
now := api.service.w.GetCurrentTime() now := api.service.w.GetCurrentTime()
r.setDefaults(now) r.setDefaults(now)
mailServerNode, err := discover.ParseNode(r.MailServerPeer) mailServerNode, err := discover.ParseNode(r.MailServerPeer)
if err != nil { if err != nil {
return false, fmt.Errorf("%v: %v", ErrInvalidMailServerPeer, err) return nil, fmt.Errorf("%v: %v", ErrInvalidMailServerPeer, err)
} }
symKey, err := shh.GetSymKey(r.SymKeyID) symKey, err := shh.GetSymKey(r.SymKeyID)
if err != nil { if err != nil {
return false, fmt.Errorf("%v: %v", ErrInvalidSymKeyID, err) return nil, fmt.Errorf("%v: %v", ErrInvalidSymKeyID, err)
} }
envelope, err := makeEnvelop(makePayload(r), symKey, api.service.nodeID, shh.MinPow(), now) envelope, err := makeEnvelop(makePayload(r), symKey, api.service.nodeID, shh.MinPow(), now)
if err != nil { if err != nil {
return false, err return nil, err
}
if err := shh.RequestHistoricMessages(mailServerNode.ID[:], envelope); err != nil {
return false, err
} }
return true, nil hash := envelope.Hash()
if err := shh.RequestHistoricMessages(mailServerNode.ID[:], envelope); err != nil {
return nil, err
}
api.service.tracker.AddRequest(hash, time.After(r.Timeout*time.Second))
return hash[:], nil
} }
// GetNewFilterMessages is a prototype method with deduplication // GetNewFilterMessages is a prototype method with deduplication

View File

@ -23,20 +23,25 @@ func TestMessagesRequest_setDefaults(t *testing.T) {
}{ }{
{ {
&MessagesRequest{From: 0, To: 0}, &MessagesRequest{From: 0, To: 0},
&MessagesRequest{From: yesterday, To: now}, &MessagesRequest{From: yesterday, To: now, Timeout: defaultRequestTimeout},
}, },
{ {
&MessagesRequest{From: 1, To: 0}, &MessagesRequest{From: 1, To: 0},
&MessagesRequest{From: uint32(1), To: now}, &MessagesRequest{From: uint32(1), To: now, Timeout: defaultRequestTimeout},
}, },
{ {
&MessagesRequest{From: 0, To: yesterday}, &MessagesRequest{From: 0, To: yesterday},
&MessagesRequest{From: daysAgo(tnow, 2), To: yesterday}, &MessagesRequest{From: daysAgo(tnow, 2), To: yesterday, Timeout: defaultRequestTimeout},
}, },
// 100 - 1 day would be invalid, so we set From to 0 // 100 - 1 day would be invalid, so we set From to 0
{ {
&MessagesRequest{From: 0, To: 100}, &MessagesRequest{From: 0, To: 100},
&MessagesRequest{From: 0, To: 100}, &MessagesRequest{From: 0, To: 100, Timeout: defaultRequestTimeout},
},
// set Timeout
{
&MessagesRequest{From: 0, To: 0, Timeout: 100},
&MessagesRequest{From: yesterday, To: now, Timeout: 100},
}, },
} }

View File

@ -3,6 +3,7 @@ package shhext
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -22,12 +23,16 @@ const (
EnvelopePosted EnvelopeState = iota EnvelopePosted EnvelopeState = iota
// EnvelopeSent is set when envelope is sent to atleast one peer. // EnvelopeSent is set when envelope is sent to atleast one peer.
EnvelopeSent EnvelopeSent
// MailServerRequestSent is set when p2p request is sent to the mailserver
MailServerRequestSent
) )
// EnvelopeEventsHandler used for two different event types. // EnvelopeEventsHandler used for two different event types.
type EnvelopeEventsHandler interface { type EnvelopeEventsHandler interface {
EnvelopeSent(common.Hash) EnvelopeSent(common.Hash)
EnvelopeExpired(common.Hash) EnvelopeExpired(common.Hash)
MailServerRequestCompleted(common.Hash)
MailServerRequestExpired(common.Hash)
} }
// Service is a service that provides some additional Whisper API. // Service is a service that provides some additional Whisper API.
@ -123,6 +128,26 @@ func (t *tracker) Add(hash common.Hash) {
t.cache[hash] = EnvelopePosted t.cache[hash] = EnvelopePosted
} }
// Add request hash to a tracker.
func (t *tracker) AddRequest(hash common.Hash, timerC <-chan time.Time) {
t.mu.Lock()
defer t.mu.Unlock()
t.cache[hash] = MailServerRequestSent
go t.expireRequest(hash, timerC)
}
func (t *tracker) expireRequest(hash common.Hash, timerC <-chan time.Time) {
select {
case <-t.quit:
return
case <-timerC:
t.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestExpired,
Hash: hash,
})
}
}
// handleEnvelopeEvents processes whisper envelope events // handleEnvelopeEvents processes whisper envelope events
func (t *tracker) handleEnvelopeEvents() { func (t *tracker) handleEnvelopeEvents() {
events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
@ -141,10 +166,22 @@ func (t *tracker) handleEnvelopeEvents() {
// handleEvent based on type of the event either triggers // handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker // confirmation handler or removes hash from tracker
func (t *tracker) handleEvent(event whisper.EnvelopeEvent) { func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){
whisper.EventEnvelopeSent: t.handleEventEnvelopeSent,
whisper.EventEnvelopeExpired: t.handleEventEnvelopeExpired,
whisper.EventMailServerRequestCompleted: t.handleEventMailServerRequestCompleted,
whisper.EventMailServerRequestExpired: t.handleEventMailServerRequestExpired,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
}
func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
switch event.Event {
case whisper.EventEnvelopeSent:
state, ok := t.cache[event.Hash] state, ok := t.cache[event.Hash]
// if we didn't send a message using extension - skip it // if we didn't send a message using extension - skip it
// if message was already confirmed - skip it // if message was already confirmed - skip it
@ -156,7 +193,12 @@ func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
if t.handler != nil { if t.handler != nil {
t.handler.EnvelopeSent(event.Hash) t.handler.EnvelopeSent(event.Hash)
} }
case whisper.EventEnvelopeExpired: }
func (t *tracker) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
if state, ok := t.cache[event.Hash]; ok { if state, ok := t.cache[event.Hash]; ok {
log.Debug("envelope expired", "hash", event.Hash, "state", state) log.Debug("envelope expired", "hash", event.Hash, "state", state)
delete(t.cache, event.Hash) delete(t.cache, event.Hash)
@ -167,5 +209,34 @@ func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
t.handler.EnvelopeExpired(event.Hash) t.handler.EnvelopeExpired(event.Hash)
} }
} }
}
func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response received", "hash", event.Hash)
delete(t.cache, event.Hash)
if t.handler != nil {
t.handler.MailServerRequestCompleted(event.Hash)
}
}
func (t *tracker) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response expired", "hash", event.Hash)
delete(t.cache, event.Hash)
if t.handler != nil {
t.handler.MailServerRequestExpired(event.Hash)
} }
} }

View File

@ -18,12 +18,16 @@ func newHandlerMock(buf int) handlerMock {
return handlerMock{ return handlerMock{
confirmations: make(chan common.Hash, buf), confirmations: make(chan common.Hash, buf),
expirations: make(chan common.Hash, buf), expirations: make(chan common.Hash, buf),
requestsCompleted: make(chan common.Hash, buf),
requestsExpired: make(chan common.Hash, buf),
} }
} }
type handlerMock struct { type handlerMock struct {
confirmations chan common.Hash confirmations chan common.Hash
expirations chan common.Hash expirations chan common.Hash
requestsCompleted chan common.Hash
requestsExpired chan common.Hash
} }
func (t handlerMock) EnvelopeSent(hash common.Hash) { func (t handlerMock) EnvelopeSent(hash common.Hash) {
@ -34,6 +38,14 @@ func (t handlerMock) EnvelopeExpired(hash common.Hash) {
t.expirations <- hash t.expirations <- hash
} }
func (t handlerMock) MailServerRequestCompleted(hash common.Hash) {
t.requestsCompleted <- hash
}
func (t handlerMock) MailServerRequestExpired(hash common.Hash) {
t.requestsExpired <- hash
}
func TestShhExtSuite(t *testing.T) { func TestShhExtSuite(t *testing.T) {
suite.Run(t, new(ShhExtSuite)) suite.Run(t, new(ShhExtSuite))
} }
@ -159,29 +171,29 @@ func (s *ShhExtSuite) TestRequestMessages() {
mailServerPeer = "enode://b7e65e1bedc2499ee6cbd806945af5e7df0e59e4070c96821570bd581473eade24a489f5ec95d060c0db118c879403ab88d827d3766978f28708989d35474f87@[::]:51920" mailServerPeer = "enode://b7e65e1bedc2499ee6cbd806945af5e7df0e59e4070c96821570bd581473eade24a489f5ec95d060c0db118c879403ab88d827d3766978f28708989d35474f87@[::]:51920"
) )
var result bool var hash []byte
// invalid MailServer enode address // invalid MailServer enode address
result, err = api.RequestMessages(context.TODO(), MessagesRequest{MailServerPeer: "invalid-address"}) hash, err = api.RequestMessages(context.TODO(), MessagesRequest{MailServerPeer: "invalid-address"})
s.False(result) s.Nil(hash)
s.EqualError(err, "invalid mailServerPeer value: invalid URL scheme, want \"enode\"") s.EqualError(err, "invalid mailServerPeer value: invalid URL scheme, want \"enode\"")
// non-existent symmetric key // non-existent symmetric key
result, err = api.RequestMessages(context.TODO(), MessagesRequest{ hash, err = api.RequestMessages(context.TODO(), MessagesRequest{
MailServerPeer: mailServerPeer, MailServerPeer: mailServerPeer,
}) })
s.False(result) s.Nil(hash)
s.EqualError(err, "invalid symKeyID value: non-existent key ID") s.EqualError(err, "invalid symKeyID value: non-existent key ID")
// with a symmetric key // with a symmetric key
symKeyID, symKeyErr := shh.AddSymKeyFromPassword("some-pass") symKeyID, symKeyErr := shh.AddSymKeyFromPassword("some-pass")
s.NoError(symKeyErr) s.NoError(symKeyErr)
result, err = api.RequestMessages(context.TODO(), MessagesRequest{ hash, err = api.RequestMessages(context.TODO(), MessagesRequest{
MailServerPeer: mailServerPeer, MailServerPeer: mailServerPeer,
SymKeyID: symKeyID, SymKeyID: symKeyID,
}) })
s.Contains(err.Error(), "Could not find peer with ID") s.Contains(err.Error(), "Could not find peer with ID")
s.False(result) s.Nil(hash)
// with a peer acting as a mailserver // with a peer acting as a mailserver
// prepare a node first // prepare a node first
@ -209,12 +221,14 @@ func (s *ShhExtSuite) TestRequestMessages() {
time.Sleep(time.Second) // wait for the peer to be added time.Sleep(time.Second) // wait for the peer to be added
// send a request // send a request
result, err = api.RequestMessages(context.TODO(), MessagesRequest{ hash, err = api.RequestMessages(context.TODO(), MessagesRequest{
MailServerPeer: mailNode.Server().Self().String(), MailServerPeer: mailNode.Server().Self().String(),
SymKeyID: symKeyID, SymKeyID: symKeyID,
}) })
s.NoError(err) s.NoError(err)
s.True(result) s.NotNil(hash)
s.Contains(api.service.tracker.cache, common.BytesToHash(hash))
} }
func (s *ShhExtSuite) TearDown() { func (s *ShhExtSuite) TearDown() {
@ -272,3 +286,34 @@ func (s *TrackerSuite) TestRemoved() {
}) })
s.NotContains(s.tracker.cache, testHash) s.NotContains(s.tracker.cache, testHash)
} }
func (s *TrackerSuite) TestRequestCompleted() {
s.tracker.AddRequest(testHash, time.After(defaultRequestTimeout*time.Second))
s.Contains(s.tracker.cache, testHash)
s.Equal(MailServerRequestSent, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
})
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestRequestExpiration() {
mock := newHandlerMock(1)
s.tracker.handler = mock
c := make(chan time.Time)
s.tracker.AddRequest(testHash, c)
s.Contains(s.tracker.cache, testHash)
s.Equal(MailServerRequestSent, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestExpired,
Hash: testHash,
})
select {
case requestID := <-mock.requestsExpired:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for request expiration")
}
}

View File

@ -17,3 +17,13 @@ func (h EnvelopeSignalHandler) EnvelopeSent(hash common.Hash) {
func (h EnvelopeSignalHandler) EnvelopeExpired(hash common.Hash) { func (h EnvelopeSignalHandler) EnvelopeExpired(hash common.Hash) {
signal.SendEnvelopeExpired(hash) signal.SendEnvelopeExpired(hash)
} }
// MailServerRequestCompleted triggered when mailserver send a ack with a requesID sent previously
func (h EnvelopeSignalHandler) MailServerRequestCompleted(hash common.Hash) {
signal.SendMailServerRequestCompleted(hash)
}
// MailServerRequestExpired triggered when mail server request expires
func (h EnvelopeSignalHandler) MailServerRequestExpired(hash common.Hash) {
signal.SendMailServerRequestExpired(hash)
}

View File

@ -1,6 +1,8 @@
package signal package signal
import "github.com/ethereum/go-ethereum/common" import (
"github.com/ethereum/go-ethereum/common"
)
const ( const (
// EventEnvelopeSent is triggered when envelope was sent at least to a one peer. // EventEnvelopeSent is triggered when envelope was sent at least to a one peer.
@ -9,6 +11,12 @@ const (
// EventEnvelopeExpired is triggered when envelop was dropped by a whisper without being sent // EventEnvelopeExpired is triggered when envelop was dropped by a whisper without being sent
// to any peer // to any peer
EventEnvelopeExpired = "envelope.expired" EventEnvelopeExpired = "envelope.expired"
// EventMailServerRequestCompleted is triggered when whisper receives a message ack from the mailserver
EventMailServerRequestCompleted = "mailserver.request.completed"
// EventMailServerRequestExpired is triggered when request TTL ends
EventMailServerRequestExpired = "mailserver.request.expired"
) )
// EnvelopeSignal includes hash of the envelope. // EnvelopeSignal includes hash of the envelope.
@ -25,3 +33,13 @@ func SendEnvelopeSent(hash common.Hash) {
func SendEnvelopeExpired(hash common.Hash) { func SendEnvelopeExpired(hash common.Hash) {
send(EventEnvelopeExpired, EnvelopeSignal{hash}) send(EventEnvelopeExpired, EnvelopeSignal{hash})
} }
// SendMailServerRequestCompleted triggered when mail server response has been received
func SendMailServerRequestCompleted(hash common.Hash) {
send(EventMailServerRequestCompleted, EnvelopeSignal{hash})
}
// SendMailServerRequestExpired triggered when mail server request expires
func SendMailServerRequestExpired(hash common.Hash) {
send(EventMailServerRequestExpired, EnvelopeSignal{hash})
}

View File

@ -1,14 +1,17 @@
package whisper package whisper
import ( import (
"encoding/hex"
"encoding/json" "encoding/json"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings"
"testing" "testing"
"time" "time"
"os" "os"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
@ -96,25 +99,12 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
// Act. // Act.
events := make(chan whisper.EnvelopeEvent)
senderWhisperService.SubscribeEnvelopeEvents(events)
// Request messages (including the previous one, expired) from mailbox. // Request messages (including the previous one, expired) from mailbox.
from := senderWhisperService.GetCurrentTime().Add(-12 * time.Hour) result := s.requestHistoricMessages(senderWhisperService, rpcClient, mailboxPeerStr, MailServerKeyID, topic.String())
reqMessagesBody := `{ requestID := common.BytesToHash(result)
"jsonrpc": "2.0",
"id": 1,
"method": "shhext_requestMessages",
"params": [{
"mailServerPeer":"` + mailboxPeerStr + `",
"topic":"` + topic.String() + `",
"symKeyID":"` + MailServerKeyID + `",
"from":` + strconv.FormatInt(from.Unix(), 10) + `,
"to":` + strconv.FormatInt(senderWhisperService.GetCurrentTime().Unix(), 10) + `
}]
}`
resp := rpcClient.CallRaw(reqMessagesBody)
reqMessagesResp := baseRPCResponse{}
err = json.Unmarshal([]byte(resp), &reqMessagesResp)
s.Require().NoError(err)
s.Require().Nil(reqMessagesResp.Error)
// Wait to receive message. // Wait to receive message.
time.Sleep(time.Second) time.Sleep(time.Second)
@ -125,6 +115,14 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() {
// Check that there are no messages. // Check that there are no messages.
messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID)
s.Require().Equal(0, len(messages)) s.Require().Equal(0, len(messages))
select {
case e := <-events:
s.Equal(whisper.EventMailServerRequestCompleted, e.Event)
s.Equal(requestID, e.Hash)
case <-time.After(time.Second):
s.Fail("timed out while waiting for request completed event")
}
} }
func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() {
@ -484,7 +482,7 @@ func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) strin
} }
// requestHistoricMessages asks a mailnode to resend messages. // requestHistoricMessages asks a mailnode to resend messages.
func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) { func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) []byte {
from := w.GetCurrentTime().Add(-12 * time.Hour) from := w.GetCurrentTime().Add(-12 * time.Hour)
resp := rpcCli.CallRaw(`{ resp := rpcCli.CallRaw(`{
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -502,6 +500,18 @@ func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli
err := json.Unmarshal([]byte(resp), &reqMessagesResp) err := json.Unmarshal([]byte(resp), &reqMessagesResp)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Nil(reqMessagesResp.Error) s.Require().Nil(reqMessagesResp.Error)
switch hash := reqMessagesResp.Result.(type) {
case string:
s.Require().True(strings.HasPrefix(hash, "0x"))
b, err := hex.DecodeString(hash[2:])
s.Require().NoError(err)
return b
default:
s.Failf("failed reading shh_newMessageFilter result", "expected a hash, got: %+v", reqMessagesResp.Result)
}
return nil
} }
type getFilterMessagesResponse struct { type getFilterMessagesResponse struct {

View File

@ -48,6 +48,7 @@ const (
messagesCode = 1 // normal whisper message messagesCode = 1 // normal whisper message
powRequirementCode = 2 // PoW requirement powRequirementCode = 2 // PoW requirement
bloomFilterExCode = 3 // bloom filter exchange bloomFilterExCode = 3 // bloom filter exchange
p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol
p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further)
NumberOfMessageCodes = 128 NumberOfMessageCodes = 128

View File

@ -13,6 +13,10 @@ const (
EventEnvelopeSent EventType = "envelope.sent" EventEnvelopeSent EventType = "envelope.sent"
// EventEnvelopeExpired fires when envelop expired // EventEnvelopeExpired fires when envelop expired
EventEnvelopeExpired EventType = "envelope.expired" EventEnvelopeExpired EventType = "envelope.expired"
// EventMailServerRequestCompleted fires after mailserver sends all the requested messages
EventMailServerRequestCompleted EventType = "mailserver.request.completed"
// EventMailServerRequestExpired fires after mailserver the request TTL ends
EventMailServerRequestExpired EventType = "mailserver.request.expired"
) )
// EnvelopeEvent used for envelopes events. // EnvelopeEvent used for envelopes events.

View File

@ -378,6 +378,15 @@ func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelop
return p2p.Send(p.ws, p2pRequestCode, envelope) return p2p.Send(p.ws, p2pRequestCode, envelope)
} }
func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, requestID common.Hash) error {
size, r, err := rlp.EncodeToReader(requestID)
if err != nil {
return err
}
return peer.ws.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r})
}
// SendP2PMessage sends a peer-to-peer message to a specific peer. // SendP2PMessage sends a peer-to-peer message to a specific peer.
func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
p, err := whisper.getPeer(peerID) p, err := whisper.getPeer(peerID)
@ -821,8 +830,22 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err) log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid p2p request") return errors.New("invalid p2p request")
} }
whisper.mailServer.DeliverMail(p, &request) whisper.mailServer.DeliverMail(p, &request)
} }
case p2pRequestCompleteCode:
if p.trusted {
var requestID common.Hash
if err := packet.Decode(&requestID); err != nil {
log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid request response message")
}
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: requestID,
Event: EventMailServerRequestCompleted,
})
}
default: default:
// New message types might be implemented in the future versions of Whisper. // New message types might be implemented in the future versions of Whisper.
// For forward compatibility, just ignore. // For forward compatibility, just ignore.