Mailserver: return error response. (#1244)

This commit is contained in:
Igor Mandrigin 2018-10-18 12:25:00 +02:00 committed by GitHub
parent b71058d73f
commit ba504e99c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 283 additions and 83 deletions

6
Gopkg.lock generated
View File

@ -821,12 +821,12 @@
revision = "55370cdfd9f288059b03c04e23784bb8829a894c" revision = "55370cdfd9f288059b03c04e23784bb8829a894c"
[[projects]] [[projects]]
branch = "master" digest = "1:9552f5ea7e82e95910e182bd41b6c71933b50717990a61ac517bfa1cc6f653dc"
digest = "1:a2075e4e314cff4aadaf3ffe18b77659d259b1a46be9d2bdb0d5b9f8222f420c"
name = "github.com/status-im/whisper" name = "github.com/status-im/whisper"
packages = ["whisperv6"] packages = ["whisperv6"]
pruneopts = "NUT" pruneopts = "NUT"
revision = "f41584950703450b0370e85560ba8d9322794ead" revision = "14e1bbfd9ba956e7fc7649e815aa19e4386b26da"
version = "v1.2.0"
[[projects]] [[projects]]
digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e"

View File

@ -163,5 +163,5 @@
branch = "master" branch = "master"
[[constraint]] [[constraint]]
branch = "master" version = "v1.2.0"
name = "github.com/status-im/whisper" name = "github.com/status-im/whisper"

View File

@ -247,25 +247,33 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
} }
if s.exceedsPeerRequests(peer.ID()) { if s.exceedsPeerRequests(peer.ID()) {
requestErrorsCounter.Inc(1) requestErrorsCounter.Inc(1)
log.Error("Peer exceeded request per seconds limit", "peerID", peer.ID())
s.trySendHistoricMessageErrorResponse(peer, request, fmt.Errorf("rate limit exceeded"))
return return
} }
defer recoverLevelDBPanics("DeliverMail") defer recoverLevelDBPanics("DeliverMail")
if ok, lower, upper, bloom, limit, cursor := s.validateRequest(peer.ID(), request); ok { if lower, upper, bloom, limit, cursor, err := s.validateRequest(peer.ID(), request); err != nil {
requestValidationErrorsCounter.Inc(1)
log.Error("Mailserver request failed validaton", "peerID", peer.ID())
s.trySendHistoricMessageErrorResponse(peer, request, err)
} else {
_, lastEnvelopeHash, nextPageCursor, err := s.processRequest(peer, lower, upper, bloom, limit, cursor) _, lastEnvelopeHash, nextPageCursor, err := s.processRequest(peer, lower, upper, bloom, limit, cursor)
if err != nil { if err != nil {
processRequestErrorsCounter.Inc(1) processRequestErrorsCounter.Inc(1)
log.Error(fmt.Sprintf("error in DeliverMail: %s", err)) log.Error("Error while delivering mail to the peer", "err", err, "peerID", peer.ID())
s.trySendHistoricMessageErrorResponse(peer, request, err)
return return
} }
if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil { if err := s.sendHistoricMessageResponse(peer, request, lastEnvelopeHash, nextPageCursor); err != nil {
historicResponseErrorsCounter.Inc(1) historicResponseErrorsCounter.Inc(1)
log.Error(fmt.Sprintf("SendHistoricMessageResponse error: %s", err)) log.Error("Error while sending historic message response", "err", err, "peerID", peer.ID())
// we still want to try to report error even it it is a p2p error and it is unlikely
s.trySendHistoricMessageErrorResponse(peer, request, err)
} }
} else {
requestValidationErrorsCounter.Inc(1)
} }
} }
@ -325,7 +333,7 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl
var envelope whisper.Envelope var envelope whisper.Envelope
decodeErr := rlp.DecodeBytes(i.Value(), &envelope) decodeErr := rlp.DecodeBytes(i.Value(), &envelope)
if decodeErr != nil { if decodeErr != nil {
log.Error(fmt.Sprintf("RLP decoding failed: %s", decodeErr)) log.Error(fmt.Sprintf("failed to decode RLP: %s", decodeErr))
continue continue
} }
@ -336,7 +344,7 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl
} else { } else {
err = s.w.SendP2PDirect(peer, &envelope) err = s.w.SendP2PDirect(peer, &envelope)
if err != nil { if err != nil {
log.Error(fmt.Sprintf("Failed to send direct message to peer: %s", err)) log.Error(fmt.Sprintf("failed to send direct message to peer: %s", err))
return return
} }
lastEnvelopeHash = envelope.Hash() lastEnvelopeHash = envelope.Hash()
@ -364,12 +372,22 @@ func (s *WMailServer) processRequest(peer *whisper.Peer, lower, upper uint32, bl
} }
func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope, lastEnvelopeHash common.Hash, cursor cursorType) error { func (s *WMailServer) sendHistoricMessageResponse(peer *whisper.Peer, request *whisper.Envelope, lastEnvelopeHash common.Hash, cursor cursorType) error {
requestID := request.Hash() payload := whisper.CreateMailServerRequestCompletedPayload(request.Hash(), lastEnvelopeHash, cursor)
payload := append(requestID[:], lastEnvelopeHash[:]...)
payload = append(payload, cursor...)
return s.w.SendHistoricMessageResponse(peer, payload) return s.w.SendHistoricMessageResponse(peer, payload)
} }
// this method doesn't return an error because it is already in the error handling chain
func (s *WMailServer) trySendHistoricMessageErrorResponse(peer *whisper.Peer, request *whisper.Envelope, errorToReport error) {
payload := whisper.CreateMailServerRequestFailedPayload(request.Hash(), errorToReport)
err := s.w.SendHistoricMessageResponse(peer, payload)
// if we can't report an error, probably something is wrong with p2p connection,
// so we just print a log entry to document this sad fact
if err != nil {
log.Error("Error while reporting error response", "err", err, "peerID", peer.ID())
}
}
// openEnvelope tries to decrypt an envelope, first based on asymetric key (if // openEnvelope tries to decrypt an envelope, first based on asymetric key (if
// provided) and second on the symetric key (if provided) // provided) and second on the symetric key (if provided)
func (s *WMailServer) openEnvelope(request *whisper.Envelope) *whisper.ReceivedMessage { func (s *WMailServer) openEnvelope(request *whisper.Envelope) *whisper.ReceivedMessage {
@ -387,41 +405,38 @@ func (s *WMailServer) openEnvelope(request *whisper.Envelope) *whisper.ReceivedM
} }
// validateRequest runs different validations on the current request. // validateRequest runs different validations on the current request.
func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (bool, uint32, uint32, []byte, uint32, cursorType) { func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope) (uint32, uint32, []byte, uint32, cursorType, error) {
if s.pow > 0.0 && request.PoW() < s.pow { if s.pow > 0.0 && request.PoW() < s.pow {
return false, 0, 0, nil, 0, nil return 0, 0, nil, 0, nil, fmt.Errorf("PoW() is too low")
} }
decrypted := s.openEnvelope(request) decrypted := s.openEnvelope(request)
if decrypted == nil { if decrypted == nil {
log.Warn("Failed to decrypt p2p request") return 0, 0, nil, 0, nil, fmt.Errorf("failed to decrypt p2p request")
return false, 0, 0, nil, 0, nil
} }
if err := s.checkMsgSignature(decrypted, peerID); err != nil { if err := s.checkMsgSignature(decrypted, peerID); err != nil {
log.Warn(err.Error()) return 0, 0, nil, 0, nil, err
return false, 0, 0, nil, 0, nil
} }
bloom, err := s.bloomFromReceivedMessage(decrypted) bloom, err := s.bloomFromReceivedMessage(decrypted)
if err != nil { if err != nil {
log.Warn(err.Error()) return 0, 0, nil, 0, nil, err
return false, 0, 0, nil, 0, nil
} }
lower := binary.BigEndian.Uint32(decrypted.Payload[:4]) lower := binary.BigEndian.Uint32(decrypted.Payload[:4])
upper := binary.BigEndian.Uint32(decrypted.Payload[4:8]) upper := binary.BigEndian.Uint32(decrypted.Payload[4:8])
if upper < lower { if upper < lower {
log.Error(fmt.Sprintf("Query range is invalid: from > to (%d > %d)", lower, upper)) err := fmt.Errorf("query range is invalid: from > to (%d > %d)", lower, upper)
return false, 0, 0, nil, 0, nil return 0, 0, nil, 0, nil, err
} }
lowerTime := time.Unix(int64(lower), 0) lowerTime := time.Unix(int64(lower), 0)
upperTime := time.Unix(int64(upper), 0) upperTime := time.Unix(int64(upper), 0)
if upperTime.Sub(lowerTime) > maxQueryRange { if upperTime.Sub(lowerTime) > maxQueryRange {
log.Warn(fmt.Sprintf("Query range too big for peer %s", string(peerID))) err := fmt.Errorf("query range too big for peer %s", string(peerID))
return false, 0, 0, nil, 0, nil return 0, 0, nil, 0, nil, err
} }
var limit uint32 var limit uint32
@ -434,7 +449,8 @@ func (s *WMailServer) validateRequest(peerID []byte, request *whisper.Envelope)
cursor = decrypted.Payload[requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength:] cursor = decrypted.Payload[requestTimeRangeLength+whisper.BloomFilterSize+requestLimitLength:]
} }
return true, lower, upper, bloom, limit, cursor err = nil
return lower, upper, bloom, limit, cursor, err
} }
// checkMsgSignature returns an error in case the message is not correcly signed // checkMsgSignature returns an error in case the message is not correcly signed

View File

@ -322,8 +322,8 @@ func (s *MailserverSuite) TestRequestPaginationLimit() {
params.limit = 6 params.limit = 6
request := s.createRequest(params) request := s.createRequest(params)
src := crypto.FromECDSAPub(&params.key.PublicKey) src := crypto.FromECDSAPub(&params.key.PublicKey)
ok, lower, upper, bloom, limit, cursor := s.server.validateRequest(src, request) lower, upper, bloom, limit, cursor, err := s.server.validateRequest(src, request)
s.True(ok) s.True(err == nil)
s.Nil(cursor) s.Nil(cursor)
s.Equal(params.limit, limit) s.Equal(params.limit, limit)
@ -428,9 +428,9 @@ func (s *MailserverSuite) TestMailServer() {
s.T().Run(tc.info, func(*testing.T) { s.T().Run(tc.info, func(*testing.T) {
request := s.createRequest(tc.params) request := s.createRequest(tc.params)
src := crypto.FromECDSAPub(&tc.params.key.PublicKey) src := crypto.FromECDSAPub(&tc.params.key.PublicKey)
ok, lower, upper, bloom, limit, _ := s.server.validateRequest(src, request) lower, upper, bloom, limit, _, err := s.server.validateRequest(src, request)
s.Equal(tc.isOK, ok) s.Equal(tc.isOK, err == nil)
if ok { if err == nil {
s.Equal(tc.params.low, lower) s.Equal(tc.params.low, lower)
s.Equal(tc.params.upp, upper) s.Equal(tc.params.upp, upper)
s.Equal(tc.params.limit, limit) s.Equal(tc.params.limit, limit)
@ -438,8 +438,8 @@ func (s *MailserverSuite) TestMailServer() {
s.Equal(tc.expect, s.messageExists(env, tc.params.low, tc.params.upp, bloom, tc.params.limit)) s.Equal(tc.expect, s.messageExists(env, tc.params.low, tc.params.upp, bloom, tc.params.limit))
src[0]++ src[0]++
ok, _, _, _, _, _ = s.server.validateRequest(src, request) _, _, _, _, _, err = s.server.validateRequest(src, request)
s.True(ok) s.True(err == nil)
} }
}) })
} }

View File

@ -38,7 +38,7 @@ const (
type EnvelopeEventsHandler interface { type EnvelopeEventsHandler interface {
EnvelopeSent(common.Hash) EnvelopeSent(common.Hash)
EnvelopeExpired(common.Hash) EnvelopeExpired(common.Hash)
MailServerRequestCompleted(common.Hash, common.Hash, []byte) MailServerRequestCompleted(common.Hash, common.Hash, []byte, error)
MailServerRequestExpired(common.Hash) MailServerRequestExpired(common.Hash)
} }
@ -291,7 +291,7 @@ func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEv
delete(t.cache, event.Hash) delete(t.cache, event.Hash)
if t.handler != nil { if t.handler != nil {
if resp, ok := event.Data.(*whisper.MailServerResponse); ok { if resp, ok := event.Data.(*whisper.MailServerResponse); ok {
t.handler.MailServerRequestCompleted(event.Hash, resp.LastEnvelopeHash, resp.Cursor) t.handler.MailServerRequestCompleted(event.Hash, resp.LastEnvelopeHash, resp.Cursor, resp.Error)
} }
} }
} }

View File

@ -23,6 +23,7 @@ func newHandlerMock(buf int) handlerMock {
expirations: make(chan common.Hash, buf), expirations: make(chan common.Hash, buf),
requestsCompleted: make(chan common.Hash, buf), requestsCompleted: make(chan common.Hash, buf),
requestsExpired: make(chan common.Hash, buf), requestsExpired: make(chan common.Hash, buf),
requestsFailed: make(chan common.Hash, buf),
} }
} }
@ -31,6 +32,7 @@ type handlerMock struct {
expirations chan common.Hash expirations chan common.Hash
requestsCompleted chan common.Hash requestsCompleted chan common.Hash
requestsExpired chan common.Hash requestsExpired chan common.Hash
requestsFailed chan common.Hash
} }
func (t handlerMock) EnvelopeSent(hash common.Hash) { func (t handlerMock) EnvelopeSent(hash common.Hash) {
@ -41,8 +43,12 @@ func (t handlerMock) EnvelopeExpired(hash common.Hash) {
t.expirations <- hash t.expirations <- hash
} }
func (t handlerMock) MailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte) { func (t handlerMock) MailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte, err error) {
t.requestsCompleted <- requestID if err == nil {
t.requestsCompleted <- requestID
} else {
t.requestsFailed <- requestID
}
} }
func (t handlerMock) MailServerRequestExpired(hash common.Hash) { func (t handlerMock) MailServerRequestExpired(hash common.Hash) {
@ -459,6 +465,26 @@ func (s *TrackerSuite) TestRequestCompleted() {
} }
} }
func (s *TrackerSuite) TestRequestFailed() {
mock := newHandlerMock(1)
s.tracker.handler = mock
s.tracker.AddRequest(testHash, time.After(defaultRequestTimeout*time.Second))
s.Contains(s.tracker.cache, testHash)
s.Equal(MailServerRequestSent, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
Data: &whisper.MailServerResponse{Error: errors.New("test error")},
})
select {
case requestID := <-mock.requestsFailed:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for a request to be failed")
}
}
func (s *TrackerSuite) TestRequestExpiration() { func (s *TrackerSuite) TestRequestExpiration() {
mock := newHandlerMock(1) mock := newHandlerMock(1)
s.tracker.handler = mock s.tracker.handler = mock

View File

@ -19,8 +19,8 @@ func (h EnvelopeSignalHandler) EnvelopeExpired(hash common.Hash) {
} }
// MailServerRequestCompleted triggered when the mailserver sends a message to notify that the request has been completed // MailServerRequestCompleted triggered when the mailserver sends a message to notify that the request has been completed
func (h EnvelopeSignalHandler) MailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte) { func (h EnvelopeSignalHandler) MailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte, err error) {
signal.SendMailServerRequestCompleted(requestID, lastEnvelopeHash, cursor) signal.SendMailServerRequestCompleted(requestID, lastEnvelopeHash, cursor, err)
} }
// MailServerRequestExpired triggered when the mailserver request expires // MailServerRequestExpired triggered when the mailserver request expires

View File

@ -38,6 +38,7 @@ type MailServerResponseSignal struct {
RequestID common.Hash `json:"requestID"` RequestID common.Hash `json:"requestID"`
LastEnvelopeHash common.Hash `json:"lastEnvelopeHash"` LastEnvelopeHash common.Hash `json:"lastEnvelopeHash"`
Cursor string `json:"cursor"` Cursor string `json:"cursor"`
ErrorMsg string `json:"errorMessage"`
} }
// DecryptMessageFailedSignal holds the sender of the message that could not be decrypted // DecryptMessageFailedSignal holds the sender of the message that could not be decrypted
@ -62,11 +63,16 @@ func SendEnvelopeExpired(hash common.Hash) {
} }
// SendMailServerRequestCompleted triggered when mail server response has been received // SendMailServerRequestCompleted triggered when mail server response has been received
func SendMailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte) { func SendMailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte, err error) {
errorMsg := ""
if err != nil {
errorMsg = err.Error()
}
sig := MailServerResponseSignal{ sig := MailServerResponseSignal{
RequestID: requestID, RequestID: requestID,
LastEnvelopeHash: lastEnvelopeHash, LastEnvelopeHash: lastEnvelopeHash,
Cursor: string(cursor), Cursor: string(cursor),
ErrorMsg: errorMsg,
} }
send(EventMailServerRequestCompleted, sig) send(EventMailServerRequestCompleted, sig)
} }

View File

@ -0,0 +1,136 @@
package whisperv6
import (
"bytes"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
)
const (
mailServerFailedPayloadPrefix = "ERROR="
cursorSize = 36
)
func invalidResponseSizeError(size int) error {
return fmt.Errorf("unexpected payload size: %d", size)
}
// CreateMailServerRequestCompletedPayload creates a payload representing
// a successful request to mailserver
func CreateMailServerRequestCompletedPayload(requestID, lastEnvelopeHash common.Hash, cursor []byte) []byte {
payload := append(requestID[:], lastEnvelopeHash[:]...)
payload = append(payload, cursor...)
return payload
}
// CreateMailServerRequestFailedPayload creates a payload representing
// a failed request to a mailserver
func CreateMailServerRequestFailedPayload(requestID common.Hash, err error) []byte {
payloadPrefix := []byte(mailServerFailedPayloadPrefix)
errorString := []byte(err.Error())
payload := append(payloadPrefix, requestID[:]...)
payload = append(payload, errorString[:]...)
return payload
}
// CreateMailServerEvent returns EnvelopeEvent with correct data
// if payload corresponds to any of the know mailserver events:
// * request completed successfully
// * request failed
// If the payload is unknown/unparseable, it returns `nil`
func CreateMailServerEvent(payload []byte) (*EnvelopeEvent, error) {
if len(payload) < common.HashLength {
return nil, invalidResponseSizeError(len(payload))
}
event, err := tryCreateMailServerRequestFailedEvent(payload)
if err != nil || event != nil {
return event, err
}
return tryCreateMailServerRequestCompletedEvent(payload)
}
func tryCreateMailServerRequestFailedEvent(payload []byte) (*EnvelopeEvent, error) {
if len(payload) < common.HashLength+len(mailServerFailedPayloadPrefix) {
return nil, nil
}
prefix, remainder := extractPrefix(payload, len(mailServerFailedPayloadPrefix))
if !bytes.Equal(prefix, []byte(mailServerFailedPayloadPrefix)) {
return nil, nil
}
var (
requestID common.Hash
errorMsg string
)
requestID, remainder = extractHash(remainder)
errorMsg = string(remainder)
event := EnvelopeEvent{
Hash: requestID,
Event: EventMailServerRequestCompleted,
Data: &MailServerResponse{
Error: errors.New(errorMsg),
},
}
return &event, nil
}
func tryCreateMailServerRequestCompletedEvent(payload []byte) (*EnvelopeEvent, error) {
// check if payload is
// - requestID or
// - requestID + lastEnvelopeHash or
// - requestID + lastEnvelopeHash + cursor
// requestID is the hash of the request envelope.
// lastEnvelopeHash is the last envelope sent by the mail server
// cursor is the db key, 36 bytes: 4 for the timestamp + 32 for the envelope hash.
if len(payload) > common.HashLength*2+cursorSize {
return nil, invalidResponseSizeError(len(payload))
}
var (
requestID common.Hash
lastEnvelopeHash common.Hash
cursor []byte
)
requestID, remainder := extractHash(payload)
if len(remainder) >= common.HashLength {
lastEnvelopeHash, remainder = extractHash(remainder)
}
if len(remainder) >= cursorSize {
cursor = remainder
}
event := EnvelopeEvent{
Hash: requestID,
Event: EventMailServerRequestCompleted,
Data: &MailServerResponse{
LastEnvelopeHash: lastEnvelopeHash,
Cursor: cursor,
},
}
return &event, nil
}
func extractHash(payload []byte) (common.Hash, []byte) {
prefix, remainder := extractPrefix(payload, common.HashLength)
return common.BytesToHash(prefix), remainder
}
func extractPrefix(payload []byte, size int) ([]byte, []byte) {
return payload[:size], payload[size:]
}

View File

@ -21,6 +21,8 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"io"
"io/ioutil"
"math" "math"
"runtime" "runtime"
"sync" "sync"
@ -64,6 +66,7 @@ const (
type MailServerResponse struct { type MailServerResponse struct {
LastEnvelopeHash common.Hash LastEnvelopeHash common.Hash
Cursor []byte Cursor []byte
Error error
} }
// Whisper represents a dark communication interface through the Ethereum // Whisper represents a dark communication interface through the Ethereum
@ -418,17 +421,23 @@ func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte)
} }
// SendP2PMessage sends a peer-to-peer message to a specific peer. // SendP2PMessage sends a peer-to-peer message to a specific peer.
func (whisper *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error { func (whisper *Whisper) SendP2PMessage(peerID []byte, envelopes ...*Envelope) error {
p, err := whisper.getPeer(peerID) p, err := whisper.getPeer(peerID)
if err != nil { if err != nil {
return err return err
} }
return whisper.SendP2PDirect(p, envelope) return whisper.SendP2PDirect(p, envelopes...)
} }
// SendP2PDirect sends a peer-to-peer message to a specific peer. // SendP2PDirect sends a peer-to-peer message to a specific peer.
func (whisper *Whisper) SendP2PDirect(peer *Peer, envelope *Envelope) error { // If only a single envelope is given, data is sent as a single object
return p2p.Send(peer.ws, p2pMessageCode, envelope) // rather than a slice. This is important to keep this method backward compatible
// as it used to send only single envelopes.
func (whisper *Whisper) SendP2PDirect(peer *Peer, envelopes ...*Envelope) error {
if len(envelopes) == 1 {
return p2p.Send(peer.ws, p2pMessageCode, envelopes[0])
}
return p2p.Send(peer.ws, p2pMessageCode, envelopes)
} }
// NewKeyPair generates a new cryptographic identity for the client, and injects // NewKeyPair generates a new cryptographic identity for the client, and injects
@ -843,12 +852,46 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// therefore might not satisfy the PoW, expiry and other requirements. // therefore might not satisfy the PoW, expiry and other requirements.
// these messages are only accepted from the trusted peer. // these messages are only accepted from the trusted peer.
if p.trusted { if p.trusted {
var envelope Envelope var (
if err := packet.Decode(&envelope); err != nil { envelope *Envelope
log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err) envelopes []*Envelope
return errors.New("invalid direct message") err error
)
// Read all data as we will try to decode it possibly twice
// to keep backward compatibility.
data, err := ioutil.ReadAll(packet.Payload)
if err != nil {
return fmt.Errorf("invalid direct messages: %v", err)
}
r := bytes.NewReader(data)
packet.Payload = r
if err = packet.Decode(&envelopes); err == nil {
for _, envelope := range envelopes {
whisper.postEvent(envelope, true)
}
continue
}
// As we failed to decode envelopes, let's set the offset
// to the beginning and try decode data again.
// Decoding to a single Envelope is required
// to be backward compatible.
if _, err := r.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("invalid direct messages: %v", err)
}
if err = packet.Decode(&envelope); err == nil {
whisper.postEvent(envelope, true)
continue
}
if err != nil {
log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return fmt.Errorf("invalid direct message: %v", err)
} }
whisper.postEvent(&envelope, true)
} }
case p2pRequestCode: case p2pRequestCode:
// Must be processed if mail server is implemented. Otherwise ignore. // Must be processed if mail server is implemented. Otherwise ignore.
@ -869,44 +912,17 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
return errors.New("invalid request response message") return errors.New("invalid request response message")
} }
// check if payload is event, err := CreateMailServerEvent(payload)
// - requestID or
// - requestID + lastEnvelopeHash or
// - requestID + lastEnvelopeHash + cursor
// requestID is the hash of the request envelope.
// lastEnvelopeHash is the last envelope sent by the mail server
// cursor is the db key, 36 bytes: 4 for the timestamp + 32 for the envelope hash.
// length := len(payload)
if len(payload) < common.HashLength || len(payload) > common.HashLength*3+4 { if err != nil {
log.Warn("invalid response message, peer will be disconnected", "peer", p.peer.ID(), "err", err, "payload size", len(payload)) log.Warn("error while parsing request complete code, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid response size") return err
} }
var ( if event != nil {
requestID common.Hash whisper.envelopeFeed.Send(*event)
lastEnvelopeHash common.Hash
cursor []byte
)
requestID = common.BytesToHash(payload[:common.HashLength])
if len(payload) >= common.HashLength*2 {
lastEnvelopeHash = common.BytesToHash(payload[common.HashLength : common.HashLength*2])
} }
if len(payload) >= common.HashLength*2+36 {
cursor = payload[common.HashLength*2 : common.HashLength*2+36]
}
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: requestID,
Event: EventMailServerRequestCompleted,
Data: &MailServerResponse{
LastEnvelopeHash: lastEnvelopeHash,
Cursor: cursor,
},
})
} }
default: default:
// New message types might be implemented in the future versions of Whisper. // New message types might be implemented in the future versions of Whisper.