From fa390a52ae470eecc92c421f8ab9ccbc0a6a391f Mon Sep 17 00:00:00 2001 From: Andrea Franz Date: Fri, 15 Jun 2018 17:12:31 +0200 Subject: [PATCH] RequestHistoricMessages response (#1009) * refactor TestRequestMessageFromMailboxAsync to use s.requestHistoricMessages helper * send p2pRequestResponseCode from mailserver * send p2p message response to after sending all historic messages * mailserver sends `whisper.NewSentMessage` as response * add mailserver Client and p2pRequestAckCode watchers * send event with envelopeFeed when p2pRequestAckCode is received * test request completed event in tracker * rename mailserver response events and code to RequestCompleteCode * wait for mailserver response in e2e test * use SendHistoricMessageResponse method name for mailserver response * fix lint warnings * add mailserver request expiration * send mailserver response without envelope * add `ttl` to Request struct in shhext_requestMessages * test that tracker calls handler.MailServerRequestExpired * add geth patch * rename TTL to Timeout * split tracker.handleEvent in multiple methods --- ...nd-mailserver-request-completed-code.patch | 110 +++++++++++++++++ mailserver/mailserver.go | 7 ++ services/shhext/api.go | 30 +++-- services/shhext/api_test.go | 13 +- services/shhext/service.go | 111 ++++++++++++++---- services/shhext/service_test.go | 71 +++++++++-- services/shhext/signal.go | 10 ++ signal/events_shhext.go | 20 +++- t/e2e/whisper/whisper_mailbox_test.go | 48 +++++--- .../go-ethereum/whisper/whisperv6/doc.go | 15 +-- .../go-ethereum/whisper/whisperv6/events.go | 22 ++-- .../go-ethereum/whisper/whisperv6/whisper.go | 23 ++++ 12 files changed, 399 insertions(+), 81 deletions(-) create mode 100644 _assets/patches/geth/0032-send-mailserver-request-completed-code.patch diff --git a/_assets/patches/geth/0032-send-mailserver-request-completed-code.patch b/_assets/patches/geth/0032-send-mailserver-request-completed-code.patch new file mode 100644 index 000000000..78774ac66 --- /dev/null +++ b/_assets/patches/geth/0032-send-mailserver-request-completed-code.patch @@ -0,0 +1,110 @@ +diff --git a/whisper/whisperv6/doc.go b/whisper/whisperv6/doc.go +index 4bbf554..2fcc9e6 100644 +--- a/whisper/whisperv6/doc.go ++++ b/whisper/whisperv6/doc.go +@@ -44,13 +44,14 @@ const ( + ProtocolName = "shh" // Nickname of the protocol in geth + + // whisper protocol message codes, according to EIP-627 +- statusCode = 0 // used by whisper protocol +- messagesCode = 1 // normal whisper message +- powRequirementCode = 2 // PoW requirement +- bloomFilterExCode = 3 // bloom filter exchange +- p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol +- p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) +- NumberOfMessageCodes = 128 ++ statusCode = 0 // used by whisper protocol ++ messagesCode = 1 // normal whisper message ++ powRequirementCode = 2 // PoW requirement ++ bloomFilterExCode = 3 // bloom filter exchange ++ p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol ++ p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol ++ p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) ++ NumberOfMessageCodes = 128 + + SizeMask = byte(3) // mask used to extract the size of payload size field from the flags + signatureFlag = byte(4) +diff --git a/whisper/whisperv6/events.go b/whisper/whisperv6/events.go +index e03ec9d..1665539 100644 +--- a/whisper/whisperv6/events.go ++++ b/whisper/whisperv6/events.go +@@ -1,23 +1,27 @@ + package whisperv6 + + import ( +- "github.com/ethereum/go-ethereum/common" +- "github.com/ethereum/go-ethereum/p2p/discover" ++ "github.com/ethereum/go-ethereum/common" ++ "github.com/ethereum/go-ethereum/p2p/discover" + ) + + // EventType used to define known envelope events. + type EventType string + + const ( +- // EventEnvelopeSent fires when envelope was sent to a peer. +- EventEnvelopeSent EventType = "envelope.sent" +- // EventEnvelopeExpired fires when envelop expired +- EventEnvelopeExpired EventType = "envelope.expired" ++ // EventEnvelopeSent fires when envelope was sent to a peer. ++ EventEnvelopeSent EventType = "envelope.sent" ++ // EventEnvelopeExpired fires when envelop expired ++ EventEnvelopeExpired EventType = "envelope.expired" ++ // EventMailServerRequestCompleted fires after mailserver sends all the requested messages ++ EventMailServerRequestCompleted EventType = "mailserver.request.completed" ++ // EventMailServerRequestExpired fires after mailserver the request TTL ends ++ EventMailServerRequestExpired EventType = "mailserver.request.expired" + ) + + // EnvelopeEvent used for envelopes events. + type EnvelopeEvent struct { +- Event EventType +- Hash common.Hash +- Peer discover.NodeID ++ Event EventType ++ Hash common.Hash ++ Peer discover.NodeID + } +diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go +index 697f0ec..4a7b006 100644 +--- a/whisper/whisperv6/whisper.go ++++ b/whisper/whisperv6/whisper.go +@@ -378,6 +378,15 @@ func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelop + return p2p.Send(p.ws, p2pRequestCode, envelope) + } + ++func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, requestID common.Hash) error { ++ size, r, err := rlp.EncodeToReader(requestID) ++ if err != nil { ++ return err ++ } ++ ++ return peer.ws.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r}) ++} ++ + // SendP2PMessage sends a peer-to-peer message to a specific peer. + func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { + p, err := whisper.getPeer(peerID) +@@ -821,8 +830,22 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { + log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid p2p request") + } ++ + whisper.mailServer.DeliverMail(p, &request) + } ++ case p2pRequestCompleteCode: ++ if p.trusted { ++ var requestID common.Hash ++ if err := packet.Decode(&requestID); err != nil { ++ log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err) ++ return errors.New("invalid request response message") ++ } ++ ++ whisper.envelopeFeed.Send(EnvelopeEvent{ ++ Hash: requestID, ++ Event: EventMailServerRequestCompleted, ++ }) ++ } + default: + // New message types might be implemented in the future versions of Whisper. + // For forward compatibility, just ignore. diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index d78ab63ed..97db56b07 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -190,6 +190,9 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) if ok, lower, upper, bloom := s.validateRequest(peer.ID(), request); ok { s.processRequest(peer, lower, upper, bloom) + if err := s.sendHistoricMessageResponse(peer, request); err != nil { + log.Error(fmt.Sprintf("SendHistoricMessageResponse error: %s", err)) + } } } @@ -260,6 +263,10 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl return ret } +func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope) error { + return s.w.SendHistoricMessageResponse(peer, request.Hash()) +} + // validateRequest runs different validations on the current request. func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (bool, uint32, uint32, []byte) { if s.pow > 0.0 && request.PoW() < s.pow { diff --git a/services/shhext/api.go b/services/shhext/api.go index 50d7c81c2..d6c32d853 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -18,6 +18,8 @@ import ( const ( // defaultWorkTime is a work time reported in messages sent to MailServer nodes. defaultWorkTime = 5 + // defaultRequestTimeout is the default request timeout in seconds + defaultRequestTimeout = 10 ) var ( @@ -50,6 +52,10 @@ type MessagesRequest struct { // SymKeyID is an ID of a symmetric key to authenticate to MailServer. // It's derived from MailServer password. SymKeyID string `json:"symKeyID"` + + // Timeout is the time to live of the request specified in seconds. + // Default is 10 seconds + Timeout time.Duration `json:"timeout"` } func (r *MessagesRequest) setDefaults(now time.Time) { @@ -66,6 +72,10 @@ func (r *MessagesRequest) setDefaults(now time.Time) { r.From = r.To - oneDay } } + + if r.Timeout == 0 { + r.Timeout = defaultRequestTimeout + } } // ----- @@ -100,30 +110,34 @@ func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hash he } // RequestMessages sends a request for historic messages to a MailServer. -func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (bool, error) { +func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hexutil.Bytes, error) { api.log.Info("RequestMessages", "request", r) shh := api.service.w now := api.service.w.GetCurrentTime() r.setDefaults(now) mailServerNode, err := discover.ParseNode(r.MailServerPeer) if err != nil { - return false, fmt.Errorf("%v: %v", ErrInvalidMailServerPeer, err) + return nil, fmt.Errorf("%v: %v", ErrInvalidMailServerPeer, err) } symKey, err := shh.GetSymKey(r.SymKeyID) if err != nil { - return false, fmt.Errorf("%v: %v", ErrInvalidSymKeyID, err) + return nil, fmt.Errorf("%v: %v", ErrInvalidSymKeyID, err) } envelope, err := makeEnvelop(makePayload(r), symKey, api.service.nodeID, shh.MinPow(), now) if err != nil { - return false, err - } - if err := shh.RequestHistoricMessages(mailServerNode.ID[:], envelope); err != nil { - return false, err + return nil, err } - return true, nil + hash := envelope.Hash() + if err := shh.RequestHistoricMessages(mailServerNode.ID[:], envelope); err != nil { + return nil, err + } + + api.service.tracker.AddRequest(hash, time.After(r.Timeout*time.Second)) + + return hash[:], nil } // GetNewFilterMessages is a prototype method with deduplication diff --git a/services/shhext/api_test.go b/services/shhext/api_test.go index dbbf78eb8..a10cac613 100644 --- a/services/shhext/api_test.go +++ b/services/shhext/api_test.go @@ -23,20 +23,25 @@ func TestMessagesRequest_setDefaults(t *testing.T) { }{ { &MessagesRequest{From: 0, To: 0}, - &MessagesRequest{From: yesterday, To: now}, + &MessagesRequest{From: yesterday, To: now, Timeout: defaultRequestTimeout}, }, { &MessagesRequest{From: 1, To: 0}, - &MessagesRequest{From: uint32(1), To: now}, + &MessagesRequest{From: uint32(1), To: now, Timeout: defaultRequestTimeout}, }, { &MessagesRequest{From: 0, To: yesterday}, - &MessagesRequest{From: daysAgo(tnow, 2), To: yesterday}, + &MessagesRequest{From: daysAgo(tnow, 2), To: yesterday, Timeout: defaultRequestTimeout}, }, // 100 - 1 day would be invalid, so we set From to 0 { &MessagesRequest{From: 0, To: 100}, - &MessagesRequest{From: 0, To: 100}, + &MessagesRequest{From: 0, To: 100, Timeout: defaultRequestTimeout}, + }, + // set Timeout + { + &MessagesRequest{From: 0, To: 0, Timeout: 100}, + &MessagesRequest{From: yesterday, To: now, Timeout: 100}, }, } diff --git a/services/shhext/service.go b/services/shhext/service.go index d366051ef..56690a1ca 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -3,6 +3,7 @@ package shhext import ( "crypto/ecdsa" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -22,12 +23,16 @@ const ( EnvelopePosted EnvelopeState = iota // EnvelopeSent is set when envelope is sent to atleast one peer. EnvelopeSent + // MailServerRequestSent is set when p2p request is sent to the mailserver + MailServerRequestSent ) // EnvelopeEventsHandler used for two different event types. type EnvelopeEventsHandler interface { EnvelopeSent(common.Hash) EnvelopeExpired(common.Hash) + MailServerRequestCompleted(common.Hash) + MailServerRequestExpired(common.Hash) } // Service is a service that provides some additional Whisper API. @@ -123,6 +128,26 @@ func (t *tracker) Add(hash common.Hash) { t.cache[hash] = EnvelopePosted } +// Add request hash to a tracker. +func (t *tracker) AddRequest(hash common.Hash, timerC <-chan time.Time) { + t.mu.Lock() + defer t.mu.Unlock() + t.cache[hash] = MailServerRequestSent + go t.expireRequest(hash, timerC) +} + +func (t *tracker) expireRequest(hash common.Hash, timerC <-chan time.Time) { + select { + case <-t.quit: + return + case <-timerC: + t.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestExpired, + Hash: hash, + }) + } +} + // handleEnvelopeEvents processes whisper envelope events func (t *tracker) handleEnvelopeEvents() { events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper @@ -141,31 +166,77 @@ func (t *tracker) handleEnvelopeEvents() { // handleEvent based on type of the event either triggers // confirmation handler or removes hash from tracker func (t *tracker) handleEvent(event whisper.EnvelopeEvent) { + handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){ + whisper.EventEnvelopeSent: t.handleEventEnvelopeSent, + whisper.EventEnvelopeExpired: t.handleEventEnvelopeExpired, + whisper.EventMailServerRequestCompleted: t.handleEventMailServerRequestCompleted, + whisper.EventMailServerRequestExpired: t.handleEventMailServerRequestExpired, + } + + if handler, ok := handlers[event.Event]; ok { + handler(event) + } +} + +func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) { t.mu.Lock() defer t.mu.Unlock() - switch event.Event { - case whisper.EventEnvelopeSent: - state, ok := t.cache[event.Hash] - // if we didn't send a message using extension - skip it - // if message was already confirmed - skip it - if !ok || state == EnvelopeSent { + + state, ok := t.cache[event.Hash] + // if we didn't send a message using extension - skip it + // if message was already confirmed - skip it + if !ok || state == EnvelopeSent { + return + } + log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer) + t.cache[event.Hash] = EnvelopeSent + if t.handler != nil { + t.handler.EnvelopeSent(event.Hash) + } +} + +func (t *tracker) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) { + t.mu.Lock() + defer t.mu.Unlock() + + if state, ok := t.cache[event.Hash]; ok { + log.Debug("envelope expired", "hash", event.Hash, "state", state) + delete(t.cache, event.Hash) + if state == EnvelopeSent { return } - log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer) - t.cache[event.Hash] = EnvelopeSent if t.handler != nil { - t.handler.EnvelopeSent(event.Hash) - } - case whisper.EventEnvelopeExpired: - if state, ok := t.cache[event.Hash]; ok { - log.Debug("envelope expired", "hash", event.Hash, "state", state) - delete(t.cache, event.Hash) - if state == EnvelopeSent { - return - } - if t.handler != nil { - t.handler.EnvelopeExpired(event.Hash) - } + t.handler.EnvelopeExpired(event.Hash) } } } + +func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) { + t.mu.Lock() + defer t.mu.Unlock() + + state, ok := t.cache[event.Hash] + if !ok || state != MailServerRequestSent { + return + } + log.Debug("mailserver response received", "hash", event.Hash) + delete(t.cache, event.Hash) + if t.handler != nil { + t.handler.MailServerRequestCompleted(event.Hash) + } +} + +func (t *tracker) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) { + t.mu.Lock() + defer t.mu.Unlock() + + state, ok := t.cache[event.Hash] + if !ok || state != MailServerRequestSent { + return + } + log.Debug("mailserver response expired", "hash", event.Hash) + delete(t.cache, event.Hash) + if t.handler != nil { + t.handler.MailServerRequestExpired(event.Hash) + } +} diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index 5afac5401..72a15fa1b 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -16,14 +16,18 @@ import ( func newHandlerMock(buf int) handlerMock { return handlerMock{ - confirmations: make(chan common.Hash, buf), - expirations: make(chan common.Hash, buf), + confirmations: make(chan common.Hash, buf), + expirations: make(chan common.Hash, buf), + requestsCompleted: make(chan common.Hash, buf), + requestsExpired: make(chan common.Hash, buf), } } type handlerMock struct { - confirmations chan common.Hash - expirations chan common.Hash + confirmations chan common.Hash + expirations chan common.Hash + requestsCompleted chan common.Hash + requestsExpired chan common.Hash } func (t handlerMock) EnvelopeSent(hash common.Hash) { @@ -34,6 +38,14 @@ func (t handlerMock) EnvelopeExpired(hash common.Hash) { t.expirations <- hash } +func (t handlerMock) MailServerRequestCompleted(hash common.Hash) { + t.requestsCompleted <- hash +} + +func (t handlerMock) MailServerRequestExpired(hash common.Hash) { + t.requestsExpired <- hash +} + func TestShhExtSuite(t *testing.T) { suite.Run(t, new(ShhExtSuite)) } @@ -159,29 +171,29 @@ func (s *ShhExtSuite) TestRequestMessages() { mailServerPeer = "enode://b7e65e1bedc2499ee6cbd806945af5e7df0e59e4070c96821570bd581473eade24a489f5ec95d060c0db118c879403ab88d827d3766978f28708989d35474f87@[::]:51920" ) - var result bool + var hash []byte // invalid MailServer enode address - result, err = api.RequestMessages(context.TODO(), MessagesRequest{MailServerPeer: "invalid-address"}) - s.False(result) + hash, err = api.RequestMessages(context.TODO(), MessagesRequest{MailServerPeer: "invalid-address"}) + s.Nil(hash) s.EqualError(err, "invalid mailServerPeer value: invalid URL scheme, want \"enode\"") // non-existent symmetric key - result, err = api.RequestMessages(context.TODO(), MessagesRequest{ + hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ MailServerPeer: mailServerPeer, }) - s.False(result) + s.Nil(hash) s.EqualError(err, "invalid symKeyID value: non-existent key ID") // with a symmetric key symKeyID, symKeyErr := shh.AddSymKeyFromPassword("some-pass") s.NoError(symKeyErr) - result, err = api.RequestMessages(context.TODO(), MessagesRequest{ + hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ MailServerPeer: mailServerPeer, SymKeyID: symKeyID, }) s.Contains(err.Error(), "Could not find peer with ID") - s.False(result) + s.Nil(hash) // with a peer acting as a mailserver // prepare a node first @@ -209,12 +221,14 @@ func (s *ShhExtSuite) TestRequestMessages() { time.Sleep(time.Second) // wait for the peer to be added // send a request - result, err = api.RequestMessages(context.TODO(), MessagesRequest{ + hash, err = api.RequestMessages(context.TODO(), MessagesRequest{ MailServerPeer: mailNode.Server().Self().String(), SymKeyID: symKeyID, }) s.NoError(err) - s.True(result) + s.NotNil(hash) + + s.Contains(api.service.tracker.cache, common.BytesToHash(hash)) } func (s *ShhExtSuite) TearDown() { @@ -272,3 +286,34 @@ func (s *TrackerSuite) TestRemoved() { }) s.NotContains(s.tracker.cache, testHash) } + +func (s *TrackerSuite) TestRequestCompleted() { + s.tracker.AddRequest(testHash, time.After(defaultRequestTimeout*time.Second)) + s.Contains(s.tracker.cache, testHash) + s.Equal(MailServerRequestSent, s.tracker.cache[testHash]) + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestCompleted, + Hash: testHash, + }) + s.NotContains(s.tracker.cache, testHash) +} + +func (s *TrackerSuite) TestRequestExpiration() { + mock := newHandlerMock(1) + s.tracker.handler = mock + c := make(chan time.Time) + s.tracker.AddRequest(testHash, c) + s.Contains(s.tracker.cache, testHash) + s.Equal(MailServerRequestSent, s.tracker.cache[testHash]) + s.tracker.handleEvent(whisper.EnvelopeEvent{ + Event: whisper.EventMailServerRequestExpired, + Hash: testHash, + }) + select { + case requestID := <-mock.requestsExpired: + s.Equal(testHash, requestID) + s.NotContains(s.tracker.cache, testHash) + case <-time.After(10 * time.Second): + s.Fail("timed out while waiting for request expiration") + } +} diff --git a/services/shhext/signal.go b/services/shhext/signal.go index f34004f13..11fc992cb 100644 --- a/services/shhext/signal.go +++ b/services/shhext/signal.go @@ -17,3 +17,13 @@ func (h EnvelopeSignalHandler) EnvelopeSent(hash common.Hash) { func (h EnvelopeSignalHandler) EnvelopeExpired(hash common.Hash) { signal.SendEnvelopeExpired(hash) } + +// MailServerRequestCompleted triggered when mailserver send a ack with a requesID sent previously +func (h EnvelopeSignalHandler) MailServerRequestCompleted(hash common.Hash) { + signal.SendMailServerRequestCompleted(hash) +} + +// MailServerRequestExpired triggered when mail server request expires +func (h EnvelopeSignalHandler) MailServerRequestExpired(hash common.Hash) { + signal.SendMailServerRequestExpired(hash) +} diff --git a/signal/events_shhext.go b/signal/events_shhext.go index 5a2293130..27de6c3b4 100644 --- a/signal/events_shhext.go +++ b/signal/events_shhext.go @@ -1,6 +1,8 @@ package signal -import "github.com/ethereum/go-ethereum/common" +import ( + "github.com/ethereum/go-ethereum/common" +) const ( // EventEnvelopeSent is triggered when envelope was sent at least to a one peer. @@ -9,6 +11,12 @@ const ( // EventEnvelopeExpired is triggered when envelop was dropped by a whisper without being sent // to any peer EventEnvelopeExpired = "envelope.expired" + + // EventMailServerRequestCompleted is triggered when whisper receives a message ack from the mailserver + EventMailServerRequestCompleted = "mailserver.request.completed" + + // EventMailServerRequestExpired is triggered when request TTL ends + EventMailServerRequestExpired = "mailserver.request.expired" ) // EnvelopeSignal includes hash of the envelope. @@ -25,3 +33,13 @@ func SendEnvelopeSent(hash common.Hash) { func SendEnvelopeExpired(hash common.Hash) { send(EventEnvelopeExpired, EnvelopeSignal{hash}) } + +// SendMailServerRequestCompleted triggered when mail server response has been received +func SendMailServerRequestCompleted(hash common.Hash) { + send(EventMailServerRequestCompleted, EnvelopeSignal{hash}) +} + +// SendMailServerRequestExpired triggered when mail server request expires +func SendMailServerRequestExpired(hash common.Hash) { + send(EventMailServerRequestExpired, EnvelopeSignal{hash}) +} diff --git a/t/e2e/whisper/whisper_mailbox_test.go b/t/e2e/whisper/whisper_mailbox_test.go index 128fb91d2..42fbcd64f 100644 --- a/t/e2e/whisper/whisper_mailbox_test.go +++ b/t/e2e/whisper/whisper_mailbox_test.go @@ -1,14 +1,17 @@ package whisper import ( + "encoding/hex" "encoding/json" "path/filepath" "strconv" + "strings" "testing" "time" "os" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/discover" @@ -96,25 +99,12 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { // Act. + events := make(chan whisper.EnvelopeEvent) + senderWhisperService.SubscribeEnvelopeEvents(events) + // Request messages (including the previous one, expired) from mailbox. - from := senderWhisperService.GetCurrentTime().Add(-12 * time.Hour) - reqMessagesBody := `{ - "jsonrpc": "2.0", - "id": 1, - "method": "shhext_requestMessages", - "params": [{ - "mailServerPeer":"` + mailboxPeerStr + `", - "topic":"` + topic.String() + `", - "symKeyID":"` + MailServerKeyID + `", - "from":` + strconv.FormatInt(from.Unix(), 10) + `, - "to":` + strconv.FormatInt(senderWhisperService.GetCurrentTime().Unix(), 10) + ` - }] - }` - resp := rpcClient.CallRaw(reqMessagesBody) - reqMessagesResp := baseRPCResponse{} - err = json.Unmarshal([]byte(resp), &reqMessagesResp) - s.Require().NoError(err) - s.Require().Nil(reqMessagesResp.Error) + result := s.requestHistoricMessages(senderWhisperService, rpcClient, mailboxPeerStr, MailServerKeyID, topic.String()) + requestID := common.BytesToHash(result) // Wait to receive message. time.Sleep(time.Second) @@ -125,6 +115,14 @@ func (s *WhisperMailboxSuite) TestRequestMessageFromMailboxAsync() { // Check that there are no messages. messages = s.getMessagesByMessageFilterID(rpcClient, messageFilterID) s.Require().Equal(0, len(messages)) + + select { + case e := <-events: + s.Equal(whisper.EventMailServerRequestCompleted, e.Event) + s.Equal(requestID, e.Hash) + case <-time.After(time.Second): + s.Fail("timed out while waiting for request completed event") + } } func (s *WhisperMailboxSuite) TestRequestMessagesInGroupChat() { @@ -484,7 +482,7 @@ func (s *WhisperMailboxSuite) addSymKey(rpcCli *rpc.Client, symkey string) strin } // requestHistoricMessages asks a mailnode to resend messages. -func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) { +func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli *rpc.Client, mailboxEnode, mailServerKeyID, topic string) []byte { from := w.GetCurrentTime().Add(-12 * time.Hour) resp := rpcCli.CallRaw(`{ "jsonrpc": "2.0", @@ -502,6 +500,18 @@ func (s *WhisperMailboxSuite) requestHistoricMessages(w *whisper.Whisper, rpcCli err := json.Unmarshal([]byte(resp), &reqMessagesResp) s.Require().NoError(err) s.Require().Nil(reqMessagesResp.Error) + + switch hash := reqMessagesResp.Result.(type) { + case string: + s.Require().True(strings.HasPrefix(hash, "0x")) + b, err := hex.DecodeString(hash[2:]) + s.Require().NoError(err) + return b + default: + s.Failf("failed reading shh_newMessageFilter result", "expected a hash, got: %+v", reqMessagesResp.Result) + } + + return nil } type getFilterMessagesResponse struct { diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/doc.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/doc.go index 4bbf5546b..2fcc9e6e9 100644 --- a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/doc.go +++ b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/doc.go @@ -44,13 +44,14 @@ const ( ProtocolName = "shh" // Nickname of the protocol in geth // whisper protocol message codes, according to EIP-627 - statusCode = 0 // used by whisper protocol - messagesCode = 1 // normal whisper message - powRequirementCode = 2 // PoW requirement - bloomFilterExCode = 3 // bloom filter exchange - p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol - p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) - NumberOfMessageCodes = 128 + statusCode = 0 // used by whisper protocol + messagesCode = 1 // normal whisper message + powRequirementCode = 2 // PoW requirement + bloomFilterExCode = 3 // bloom filter exchange + p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol + p2pRequestCode = 126 // peer-to-peer message, used by Dapp protocol + p2pMessageCode = 127 // peer-to-peer message (to be consumed by the peer, but not forwarded any further) + NumberOfMessageCodes = 128 SizeMask = byte(3) // mask used to extract the size of payload size field from the flags signatureFlag = byte(4) diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/events.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/events.go index e03ec9de6..1665539d6 100644 --- a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/events.go +++ b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/events.go @@ -1,23 +1,27 @@ package whisperv6 import ( - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p/discover" ) // EventType used to define known envelope events. type EventType string const ( - // EventEnvelopeSent fires when envelope was sent to a peer. - EventEnvelopeSent EventType = "envelope.sent" - // EventEnvelopeExpired fires when envelop expired - EventEnvelopeExpired EventType = "envelope.expired" + // EventEnvelopeSent fires when envelope was sent to a peer. + EventEnvelopeSent EventType = "envelope.sent" + // EventEnvelopeExpired fires when envelop expired + EventEnvelopeExpired EventType = "envelope.expired" + // EventMailServerRequestCompleted fires after mailserver sends all the requested messages + EventMailServerRequestCompleted EventType = "mailserver.request.completed" + // EventMailServerRequestExpired fires after mailserver the request TTL ends + EventMailServerRequestExpired EventType = "mailserver.request.expired" ) // EnvelopeEvent used for envelopes events. type EnvelopeEvent struct { - Event EventType - Hash common.Hash - Peer discover.NodeID + Event EventType + Hash common.Hash + Peer discover.NodeID } diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/whisper.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/whisper.go index 697f0ecb8..4a7b00653 100644 --- a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/whisper.go +++ b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/whisper.go @@ -378,6 +378,15 @@ func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelop return p2p.Send(p.ws, p2pRequestCode, envelope) } +func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, requestID common.Hash) error { + size, r, err := rlp.EncodeToReader(requestID) + if err != nil { + return err + } + + return peer.ws.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r}) +} + // SendP2PMessage sends a peer-to-peer message to a specific peer. func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { p, err := whisper.getPeer(peerID) @@ -821,8 +830,22 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid p2p request") } + whisper.mailServer.DeliverMail(p, &request) } + case p2pRequestCompleteCode: + if p.trusted { + var requestID common.Hash + if err := packet.Decode(&requestID); err != nil { + log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + return errors.New("invalid request response message") + } + + whisper.envelopeFeed.Send(EnvelopeEvent{ + Hash: requestID, + Event: EventMailServerRequestCompleted, + }) + } default: // New message types might be implemented in the future versions of Whisper. // For forward compatibility, just ignore.