From a904d9325e76f18f54d59efc099b63293d3dcad3 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 2 Apr 2019 13:40:45 +0300 Subject: [PATCH] Notify users that envelope was discarded and retry sending it (#1424) * Notify users that envelope was discarded and retry sending it * Update Gopkg files with released whisper version * Forgot to remove signal after refactoring --- Gopkg.lock | 6 +- Gopkg.toml | 2 +- services/shhext/envelopes.go | 44 +++++++++++-- services/shhext/service.go | 2 +- services/shhext/service_test.go | 34 ++++++++-- services/shhext/signal.go | 4 +- signal/events_shhext.go | 15 +++-- .../status-im/whisper/whisperv6/doc.go | 64 +++++++++++++++++++ .../status-im/whisper/whisperv6/whisper.go | 57 +++++++++++++---- 9 files changed, 191 insertions(+), 37 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 66d87109d..edd9ab99e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -830,12 +830,12 @@ version = "v1.1.0" [[projects]] - digest = "1:684e59281a3fd4a35437992b008f43f98a0cf5b25cde717397325b10f94ea69c" + digest = "1:ff23c911716ddbe23acccecf0a88bb99e89132b221a3be8dbad6a8377fd6f3a0" name = "github.com/status-im/whisper" packages = ["whisperv6"] pruneopts = "NUT" - revision = "f60fda29e21f802bc20bc20b4924dc22fe8a0514" - version = "v1.4.9" + revision = "3a4601b568649ac152afa76551ea9c332464b867" + version = "v1.4.10" [[projects]] digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" diff --git a/Gopkg.toml b/Gopkg.toml index c0f730598..48b9731d9 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -46,7 +46,7 @@ [[constraint]] name = "github.com/status-im/whisper" - version = "=v1.4.9" + version = "=v1.4.10" [[constraint]] name = "golang.org/x/text" diff --git a/services/shhext/envelopes.go b/services/shhext/envelopes.go index 83e918023..bbd03dda1 100644 --- a/services/shhext/envelopes.go +++ b/services/shhext/envelopes.go @@ -2,6 +2,7 @@ package shhext import ( "context" + "errors" "hash/fnv" "sync" @@ -193,7 +194,30 @@ func (m *EnvelopesMonitor) handleAcknowledgedBatch(event whisper.EnvelopeEvent) log.Debug("batch is not found", "batch", event.Batch) } log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer) + envelopeErrors, ok := event.Data.([]whisper.EnvelopeError) + if event.Data != nil && !ok { + log.Warn("received unexpected data for the confirmation event", "batch", event.Batch) + } + failedEnvelopes := map[common.Hash]struct{}{} + for i := range envelopeErrors { + envelopeError := envelopeErrors[i] + _, exist := m.envelopes[envelopeError.Hash] + if exist { + log.Warn("envelope that was posted by us is discarded", "hash", envelopeError.Hash, "peer", event.Peer, "error", envelopeError.Description) + var err error + switch envelopeError.Code { + case whisper.EnvelopeTimeNotSynced: + err = errors.New("envelope wasn't delivered due to time sync issues") + } + m.handleEnvelopeFailure(envelopeError.Hash, err) + } + failedEnvelopes[envelopeError.Hash] = struct{}{} + } + for hash := range envelopes { + if _, exist := failedEnvelopes[hash]; exist { + continue + } state, ok := m.envelopes[hash] if !ok || state == EnvelopeSent { continue @@ -209,14 +233,20 @@ func (m *EnvelopesMonitor) handleAcknowledgedBatch(event whisper.EnvelopeEvent) func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) { m.mu.Lock() defer m.mu.Unlock() - if state, ok := m.envelopes[event.Hash]; ok { - message, exist := m.messages[event.Hash] + m.handleEnvelopeFailure(event.Hash, errors.New("envelope expired due to connectivity issues")) +} + +// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock +// must be used on a higher level. +func (m *EnvelopesMonitor) handleEnvelopeFailure(hash common.Hash, err error) { + if state, ok := m.envelopes[hash]; ok { + message, exist := m.messages[hash] if !exist { - log.Error("message was deleted erroneously", "envelope hash", event.Hash) + log.Error("message was deleted erroneously", "envelope hash", hash) } mID := messageID(message) - attempt := m.attempts[event.Hash] - m.clearMessageState(event.Hash) + attempt := m.attempts[hash] + m.clearMessageState(hash) if state == EnvelopeSent { return } @@ -232,9 +262,9 @@ func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event whisper.EnvelopeEven m.messages[envelopeID] = message m.attempts[envelopeID] = attempt + 1 } else { - log.Debug("envelope expired", "hash", event.Hash, "state", state) + log.Debug("envelope expired", "hash", hash, "state", state) if m.handler != nil { - m.handler.EnvelopeExpired(mID) + m.handler.EnvelopeExpired(mID, err) } } } diff --git a/services/shhext/service.go b/services/shhext/service.go index ccd80877f..a1bab31eb 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -34,7 +34,7 @@ var errProtocolNotInitialized = errors.New("procotol is not initialized") // EnvelopeEventsHandler used for two different event types. type EnvelopeEventsHandler interface { EnvelopeSent(common.Hash) - EnvelopeExpired(common.Hash) + EnvelopeExpired(common.Hash, error) MailServerRequestCompleted(common.Hash, common.Hash, []byte, error) MailServerRequestExpired(common.Hash) } diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index c76a44cb8..1557082d3 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -35,10 +35,15 @@ const ( p2pRequestCompleteCode = 125 ) +type failureMessage struct { + Hash common.Hash + Error error +} + func newHandlerMock(buf int) handlerMock { return handlerMock{ confirmations: make(chan common.Hash, buf), - expirations: make(chan common.Hash, buf), + expirations: make(chan failureMessage, buf), requestsCompleted: make(chan common.Hash, buf), requestsExpired: make(chan common.Hash, buf), requestsFailed: make(chan common.Hash, buf), @@ -47,7 +52,7 @@ func newHandlerMock(buf int) handlerMock { type handlerMock struct { confirmations chan common.Hash - expirations chan common.Hash + expirations chan failureMessage requestsCompleted chan common.Hash requestsExpired chan common.Hash requestsFailed chan common.Hash @@ -57,8 +62,8 @@ func (t handlerMock) EnvelopeSent(hash common.Hash) { t.confirmations <- hash } -func (t handlerMock) EnvelopeExpired(hash common.Hash) { - t.expirations <- hash +func (t handlerMock) EnvelopeExpired(hash common.Hash, err error) { + t.expirations <- failureMessage{Hash: hash, Error: err} } func (t handlerMock) MailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte, err error) { @@ -168,7 +173,7 @@ func (s *ShhExtSuite) TestPostMessageWithConfirmation() { } } -func (s *ShhExtSuite) TestWaitMessageExpired() { +func (s *ShhExtSuite) testWaitMessageExpired(expectedError string, ttl uint32) { mock := newHandlerMock(1) s.services[0].envelopesMonitor.handler = mock symID, err := s.whisper[0].GenerateSymKey() @@ -180,7 +185,7 @@ func (s *ShhExtSuite) TestWaitMessageExpired() { SymKeyID: symID, PowTarget: whisper.DefaultMinimumPoW, PowTime: 200, - TTL: 1, + TTL: ttl, Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, Payload: []byte("hello"), } @@ -189,7 +194,8 @@ func (s *ShhExtSuite) TestWaitMessageExpired() { s.NoError(err) select { case expired := <-mock.expirations: - s.Equal(mid, expired) + s.Equal(mid, expired.Hash) + s.EqualError(expired.Error, expectedError) case confirmed := <-mock.confirmations: s.Fail("unexpected confirmation for hash", confirmed) case <-time.After(10 * time.Second): @@ -197,6 +203,20 @@ func (s *ShhExtSuite) TestWaitMessageExpired() { } } +func (s *ShhExtSuite) TestWaitMessageExpired() { + s.testWaitMessageExpired("envelope expired due to connectivity issues", 1) +} + +func (s *ShhExtSuite) TestErrorOnEnvelopeDelivery() { + // in the test we are sending message from peer 0 to peer 1 + s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self()) + s.Require().NoError(s.services[0].UpdateMailservers([]*enode.Node{s.nodes[1].Server().Self()})) + s.whisper[1].SetTimeSource(func() time.Time { + return time.Now().Add(time.Hour) + }) + s.testWaitMessageExpired("envelope wasn't delivered due to time sync issues", 100) +} + func (s *ShhExtSuite) TestRequestMessagesErrors() { var err error diff --git a/services/shhext/signal.go b/services/shhext/signal.go index 91ec55af8..36c840d9a 100644 --- a/services/shhext/signal.go +++ b/services/shhext/signal.go @@ -14,8 +14,8 @@ func (h EnvelopeSignalHandler) EnvelopeSent(hash common.Hash) { } // EnvelopeExpired triggered when envelope is expired but wasn't delivered to any peer. -func (h EnvelopeSignalHandler) EnvelopeExpired(hash common.Hash) { - signal.SendEnvelopeExpired(hash) +func (h EnvelopeSignalHandler) EnvelopeExpired(hash common.Hash, err error) { + signal.SendEnvelopeExpired(hash, err) } // MailServerRequestCompleted triggered when the mailserver sends a message to notify that the request has been completed diff --git a/signal/events_shhext.go b/signal/events_shhext.go index 0960671e5..4209502b3 100644 --- a/signal/events_shhext.go +++ b/signal/events_shhext.go @@ -32,7 +32,8 @@ const ( // EnvelopeSignal includes hash of the envelope. type EnvelopeSignal struct { - Hash common.Hash `json:"hash"` + Hash common.Hash `json:"hash"` + Message string `json:"message"` } // MailServerResponseSignal holds the data received in the response from the mailserver. @@ -56,12 +57,16 @@ type BundleAddedSignal struct { // SendEnvelopeSent triggered when envelope delivered at least to 1 peer. func SendEnvelopeSent(hash common.Hash) { - send(EventEnvelopeSent, EnvelopeSignal{hash}) + send(EventEnvelopeSent, EnvelopeSignal{Hash: hash}) } // SendEnvelopeExpired triggered when envelope delivered at least to 1 peer. -func SendEnvelopeExpired(hash common.Hash) { - send(EventEnvelopeExpired, EnvelopeSignal{hash}) +func SendEnvelopeExpired(hash common.Hash, err error) { + var message string + if err != nil { + message = err.Error() + } + send(EventEnvelopeExpired, EnvelopeSignal{Hash: hash, Message: message}) } // SendMailServerRequestCompleted triggered when mail server response has been received @@ -81,7 +86,7 @@ func SendMailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash comm // SendMailServerRequestExpired triggered when mail server request expires func SendMailServerRequestExpired(hash common.Hash) { - send(EventMailServerRequestExpired, EnvelopeSignal{hash}) + send(EventMailServerRequestExpired, EnvelopeSignal{Hash: hash}) } // EnodeDiscoveredSignal includes enode address and topic diff --git a/vendor/github.com/status-im/whisper/whisperv6/doc.go b/vendor/github.com/status-im/whisper/whisperv6/doc.go index eb1da0618..75b71572f 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/doc.go +++ b/vendor/github.com/status-im/whisper/whisperv6/doc.go @@ -36,6 +36,9 @@ import ( "errors" "fmt" "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" ) // Whisper protocol parameters @@ -50,6 +53,7 @@ const ( powRequirementCode = 2 // PoW requirement bloomFilterExCode = 3 // bloom filter exchange batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received + messageResponseCode = 12 // includes confirmation for delivery and information about errors p2pSyncRequestCode = 123 // used to sync envelopes between two mail servers p2pSyncResponseCode = 124 // used to sync envelopes between two mail servers p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol @@ -84,6 +88,9 @@ const ( DefaultSyncAllowance = 10 // seconds MaxLimitInSyncMailRequest = 1000 + + EnvelopeTimeNotSynced uint = iota + 1 + EnvelopeOtherError ) // MailServer represents a mail server, capable of @@ -134,3 +141,60 @@ type SyncResponse struct { Final bool // if true it means all envelopes were processed Error string } + +// MessagesResponse sent as a response after processing batch of envelopes. +type MessagesResponse struct { + // Hash is a hash of all envelopes sent in the single batch. + Hash common.Hash + // Per envelope error. + Errors []EnvelopeError +} + +// EnvelopeError code and optional description of the error. +type EnvelopeError struct { + Hash common.Hash + Code uint + Description string +} + +// MultiVersionResponse allows to decode response into chosen version. +type MultiVersionResponse struct { + Version uint + Response rlp.RawValue +} + +// DecodeResponse1 decodes response into first version of the messages response. +func (m MultiVersionResponse) DecodeResponse1() (resp MessagesResponse, err error) { + return resp, rlp.DecodeBytes(m.Response, &resp) +} + +// Version1MessageResponse first version of the message response. +type Version1MessageResponse struct { + Version uint + Response MessagesResponse +} + +// NewMessagesResponse returns instane of the version messages response. +func NewMessagesResponse(batch common.Hash, errors []EnvelopeError) Version1MessageResponse { + return Version1MessageResponse{ + Version: 1, + Response: MessagesResponse{ + Hash: batch, + Errors: errors, + }, + } +} + +// ErrorToEnvelopeError converts common golang error into EnvelopeError with a code. +func ErrorToEnvelopeError(hash common.Hash, err error) EnvelopeError { + code := EnvelopeOtherError + switch err.(type) { + case TimeSyncError: + code = EnvelopeTimeNotSynced + } + return EnvelopeError{ + Hash: hash, + Code: code, + Description: err.Error(), + } +} diff --git a/vendor/github.com/status-im/whisper/whisperv6/whisper.go b/vendor/github.com/status-im/whisper/whisperv6/whisper.go index 434b95bb6..455f2465a 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/whisper.go +++ b/vendor/github.com/status-im/whisper/whisperv6/whisper.go @@ -20,6 +20,7 @@ import ( "bytes" "crypto/ecdsa" "crypto/sha256" + "errors" "fmt" "io" "io/ioutil" @@ -37,11 +38,13 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" - "github.com/syndtr/goleveldb/leveldb/errors" "golang.org/x/crypto/pbkdf2" "golang.org/x/sync/syncmap" ) +// TimeSyncError error for clock skew errors. +type TimeSyncError error + // Statistics holds several message-related counter for analytics // purposes. type Statistics struct { @@ -835,8 +838,13 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { return whisper.runMessageLoop(whisperPeer, rw) } -func (whisper *Whisper) sendConfirmation(peer enode.ID, rw p2p.MsgReadWriter, data []byte) { +func (whisper *Whisper) sendConfirmation(peer enode.ID, rw p2p.MsgReadWriter, data []byte, + envelopeErrors []EnvelopeError) { batchHash := crypto.Keccak256Hash(data) + if err := p2p.Send(rw, messageResponseCode, NewMessagesResponse(batchHash, envelopeErrors)); err != nil { + log.Warn("failed to deliver messages response", "hash", batchHash, "envelopes errors", envelopeErrors, + "peer", peer, "error", err) + } if err := p2p.Send(rw, batchAcknowledgedCode, batchHash); err != nil { log.Warn("failed to deliver confirmation", "hash", batchHash, "peer", peer, "error", err) } @@ -867,9 +875,6 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to read envelopes data", "peer", p.peer.ID(), "error", err) return errors.New("invalid enveloopes") } - if !whisper.disableConfirmations { - go whisper.sendConfirmation(p.peer.ID(), rw, data) - } var envelopes []*Envelope if err := rlp.DecodeBytes(data, &envelopes); err != nil { @@ -877,12 +882,18 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { return errors.New("invalid envelopes") } trouble := false + envelopeErrors := []EnvelopeError{} for _, env := range envelopes { cached, err := whisper.add(env, whisper.LightClientMode()) if err != nil { - trouble = true - log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) + _, isTimeSyncError := err.(TimeSyncError) + if !isTimeSyncError { + trouble = true + log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) + } + envelopeErrors = append(envelopeErrors, ErrorToEnvelopeError(env.Hash(), err)) } + whisper.envelopeFeed.Send(EnvelopeEvent{ Event: EventEnvelopeReceived, Hash: env.Hash(), @@ -892,14 +903,37 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { p.mark(env) } } + if !whisper.disableConfirmations { + go whisper.sendConfirmation(p.peer.ID(), rw, data, envelopeErrors) + } if trouble { return errors.New("invalid envelope") } + case messageResponseCode: + var multiResponse MultiVersionResponse + if err := packet.Decode(&multiResponse); err != nil { + log.Error("failed to decode messages response", "peer", p.peer.ID(), "error", err) + return errors.New("invalid response message") + } + if multiResponse.Version == 1 { + response, err := multiResponse.DecodeResponse1() + if err != nil { + log.Error("failed to decode messages response into first version of response", "peer", p.peer.ID(), "error", err) + } + whisper.envelopeFeed.Send(EnvelopeEvent{ + Batch: response.Hash, + Event: EventBatchAcknowledged, + Peer: p.peer.ID(), + Data: response.Errors, + }) + } else { + log.Warn("unknown version of the messages response was received. response is ignored", "peer", p.peer.ID(), "version", multiResponse.Version) + } case batchAcknowledgedCode: var batchHash common.Hash if err := packet.Decode(&batchHash); err != nil { - log.Warn("failed to decode confirmation into common.Hash", "peer", p.peer.ID(), "error", err) + log.Error("failed to decode confirmation into common.Hash", "peer", p.peer.ID(), "error", err) return errors.New("invalid confirmation message") } whisper.envelopeFeed.Send(EnvelopeEvent{ @@ -1076,11 +1110,11 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { sent := envelope.Expiry - envelope.TTL envelopeAddedCounter.Inc(1) - if sent > now { if sent-DefaultSyncAllowance > now { envelopeErrFromFutureCounter.Inc(1) - return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash()) + log.Warn("envelope created in the future", "hash", envelope.Hash()) + return false, TimeSyncError(errors.New("envelope from future")) } // recalculate PoW, adjusted for the time difference, plus one second for latency envelope.calculatePoW(sent - now + 1) @@ -1089,7 +1123,8 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { if envelope.Expiry < now { if envelope.Expiry+DefaultSyncAllowance*2 < now { envelopeErrVeryOldCounter.Inc(1) - return false, fmt.Errorf("very old message") + log.Warn("very old envelope", "hash", envelope.Hash()) + return false, TimeSyncError(errors.New("very old envelope")) } log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex()) envelopeErrExpiredCounter.Inc(1)