Notify users that envelope was discarded and retry sending it (#1424)

* Notify users that envelope was discarded and retry sending it

* Update Gopkg files with released whisper version

* Forgot to remove signal after refactoring
This commit is contained in:
Dmitry Shulyak 2019-04-02 13:40:45 +03:00 committed by GitHub
parent 96aba9f3af
commit a904d9325e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 191 additions and 37 deletions

6
Gopkg.lock generated
View File

@ -830,12 +830,12 @@
version = "v1.1.0"
[[projects]]
digest = "1:684e59281a3fd4a35437992b008f43f98a0cf5b25cde717397325b10f94ea69c"
digest = "1:ff23c911716ddbe23acccecf0a88bb99e89132b221a3be8dbad6a8377fd6f3a0"
name = "github.com/status-im/whisper"
packages = ["whisperv6"]
pruneopts = "NUT"
revision = "f60fda29e21f802bc20bc20b4924dc22fe8a0514"
version = "v1.4.9"
revision = "3a4601b568649ac152afa76551ea9c332464b867"
version = "v1.4.10"
[[projects]]
digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e"

View File

@ -46,7 +46,7 @@
[[constraint]]
name = "github.com/status-im/whisper"
version = "=v1.4.9"
version = "=v1.4.10"
[[constraint]]
name = "golang.org/x/text"

View File

@ -2,6 +2,7 @@ package shhext
import (
"context"
"errors"
"hash/fnv"
"sync"
@ -193,7 +194,30 @@ func (m *EnvelopesMonitor) handleAcknowledgedBatch(event whisper.EnvelopeEvent)
log.Debug("batch is not found", "batch", event.Batch)
}
log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer)
envelopeErrors, ok := event.Data.([]whisper.EnvelopeError)
if event.Data != nil && !ok {
log.Warn("received unexpected data for the confirmation event", "batch", event.Batch)
}
failedEnvelopes := map[common.Hash]struct{}{}
for i := range envelopeErrors {
envelopeError := envelopeErrors[i]
_, exist := m.envelopes[envelopeError.Hash]
if exist {
log.Warn("envelope that was posted by us is discarded", "hash", envelopeError.Hash, "peer", event.Peer, "error", envelopeError.Description)
var err error
switch envelopeError.Code {
case whisper.EnvelopeTimeNotSynced:
err = errors.New("envelope wasn't delivered due to time sync issues")
}
m.handleEnvelopeFailure(envelopeError.Hash, err)
}
failedEnvelopes[envelopeError.Hash] = struct{}{}
}
for hash := range envelopes {
if _, exist := failedEnvelopes[hash]; exist {
continue
}
state, ok := m.envelopes[hash]
if !ok || state == EnvelopeSent {
continue
@ -209,14 +233,20 @@ func (m *EnvelopesMonitor) handleAcknowledgedBatch(event whisper.EnvelopeEvent)
func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
if state, ok := m.envelopes[event.Hash]; ok {
message, exist := m.messages[event.Hash]
m.handleEnvelopeFailure(event.Hash, errors.New("envelope expired due to connectivity issues"))
}
// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock
// must be used on a higher level.
func (m *EnvelopesMonitor) handleEnvelopeFailure(hash common.Hash, err error) {
if state, ok := m.envelopes[hash]; ok {
message, exist := m.messages[hash]
if !exist {
log.Error("message was deleted erroneously", "envelope hash", event.Hash)
log.Error("message was deleted erroneously", "envelope hash", hash)
}
mID := messageID(message)
attempt := m.attempts[event.Hash]
m.clearMessageState(event.Hash)
attempt := m.attempts[hash]
m.clearMessageState(hash)
if state == EnvelopeSent {
return
}
@ -232,9 +262,9 @@ func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event whisper.EnvelopeEven
m.messages[envelopeID] = message
m.attempts[envelopeID] = attempt + 1
} else {
log.Debug("envelope expired", "hash", event.Hash, "state", state)
log.Debug("envelope expired", "hash", hash, "state", state)
if m.handler != nil {
m.handler.EnvelopeExpired(mID)
m.handler.EnvelopeExpired(mID, err)
}
}
}

View File

@ -34,7 +34,7 @@ var errProtocolNotInitialized = errors.New("procotol is not initialized")
// EnvelopeEventsHandler used for two different event types.
type EnvelopeEventsHandler interface {
EnvelopeSent(common.Hash)
EnvelopeExpired(common.Hash)
EnvelopeExpired(common.Hash, error)
MailServerRequestCompleted(common.Hash, common.Hash, []byte, error)
MailServerRequestExpired(common.Hash)
}

View File

@ -35,10 +35,15 @@ const (
p2pRequestCompleteCode = 125
)
type failureMessage struct {
Hash common.Hash
Error error
}
func newHandlerMock(buf int) handlerMock {
return handlerMock{
confirmations: make(chan common.Hash, buf),
expirations: make(chan common.Hash, buf),
expirations: make(chan failureMessage, buf),
requestsCompleted: make(chan common.Hash, buf),
requestsExpired: make(chan common.Hash, buf),
requestsFailed: make(chan common.Hash, buf),
@ -47,7 +52,7 @@ func newHandlerMock(buf int) handlerMock {
type handlerMock struct {
confirmations chan common.Hash
expirations chan common.Hash
expirations chan failureMessage
requestsCompleted chan common.Hash
requestsExpired chan common.Hash
requestsFailed chan common.Hash
@ -57,8 +62,8 @@ func (t handlerMock) EnvelopeSent(hash common.Hash) {
t.confirmations <- hash
}
func (t handlerMock) EnvelopeExpired(hash common.Hash) {
t.expirations <- hash
func (t handlerMock) EnvelopeExpired(hash common.Hash, err error) {
t.expirations <- failureMessage{Hash: hash, Error: err}
}
func (t handlerMock) MailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash common.Hash, cursor []byte, err error) {
@ -168,7 +173,7 @@ func (s *ShhExtSuite) TestPostMessageWithConfirmation() {
}
}
func (s *ShhExtSuite) TestWaitMessageExpired() {
func (s *ShhExtSuite) testWaitMessageExpired(expectedError string, ttl uint32) {
mock := newHandlerMock(1)
s.services[0].envelopesMonitor.handler = mock
symID, err := s.whisper[0].GenerateSymKey()
@ -180,7 +185,7 @@ func (s *ShhExtSuite) TestWaitMessageExpired() {
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
TTL: 1,
TTL: ttl,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
}
@ -189,7 +194,8 @@ func (s *ShhExtSuite) TestWaitMessageExpired() {
s.NoError(err)
select {
case expired := <-mock.expirations:
s.Equal(mid, expired)
s.Equal(mid, expired.Hash)
s.EqualError(expired.Error, expectedError)
case confirmed := <-mock.confirmations:
s.Fail("unexpected confirmation for hash", confirmed)
case <-time.After(10 * time.Second):
@ -197,6 +203,20 @@ func (s *ShhExtSuite) TestWaitMessageExpired() {
}
}
func (s *ShhExtSuite) TestWaitMessageExpired() {
s.testWaitMessageExpired("envelope expired due to connectivity issues", 1)
}
func (s *ShhExtSuite) TestErrorOnEnvelopeDelivery() {
// in the test we are sending message from peer 0 to peer 1
s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self())
s.Require().NoError(s.services[0].UpdateMailservers([]*enode.Node{s.nodes[1].Server().Self()}))
s.whisper[1].SetTimeSource(func() time.Time {
return time.Now().Add(time.Hour)
})
s.testWaitMessageExpired("envelope wasn't delivered due to time sync issues", 100)
}
func (s *ShhExtSuite) TestRequestMessagesErrors() {
var err error

View File

@ -14,8 +14,8 @@ func (h EnvelopeSignalHandler) EnvelopeSent(hash common.Hash) {
}
// EnvelopeExpired triggered when envelope is expired but wasn't delivered to any peer.
func (h EnvelopeSignalHandler) EnvelopeExpired(hash common.Hash) {
signal.SendEnvelopeExpired(hash)
func (h EnvelopeSignalHandler) EnvelopeExpired(hash common.Hash, err error) {
signal.SendEnvelopeExpired(hash, err)
}
// MailServerRequestCompleted triggered when the mailserver sends a message to notify that the request has been completed

View File

@ -32,7 +32,8 @@ const (
// EnvelopeSignal includes hash of the envelope.
type EnvelopeSignal struct {
Hash common.Hash `json:"hash"`
Hash common.Hash `json:"hash"`
Message string `json:"message"`
}
// MailServerResponseSignal holds the data received in the response from the mailserver.
@ -56,12 +57,16 @@ type BundleAddedSignal struct {
// SendEnvelopeSent triggered when envelope delivered at least to 1 peer.
func SendEnvelopeSent(hash common.Hash) {
send(EventEnvelopeSent, EnvelopeSignal{hash})
send(EventEnvelopeSent, EnvelopeSignal{Hash: hash})
}
// SendEnvelopeExpired triggered when envelope delivered at least to 1 peer.
func SendEnvelopeExpired(hash common.Hash) {
send(EventEnvelopeExpired, EnvelopeSignal{hash})
func SendEnvelopeExpired(hash common.Hash, err error) {
var message string
if err != nil {
message = err.Error()
}
send(EventEnvelopeExpired, EnvelopeSignal{Hash: hash, Message: message})
}
// SendMailServerRequestCompleted triggered when mail server response has been received
@ -81,7 +86,7 @@ func SendMailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash comm
// SendMailServerRequestExpired triggered when mail server request expires
func SendMailServerRequestExpired(hash common.Hash) {
send(EventMailServerRequestExpired, EnvelopeSignal{hash})
send(EventMailServerRequestExpired, EnvelopeSignal{Hash: hash})
}
// EnodeDiscoveredSignal includes enode address and topic

View File

@ -36,6 +36,9 @@ import (
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
)
// Whisper protocol parameters
@ -50,6 +53,7 @@ const (
powRequirementCode = 2 // PoW requirement
bloomFilterExCode = 3 // bloom filter exchange
batchAcknowledgedCode = 11 // confirmation that batch of envelopes was received
messageResponseCode = 12 // includes confirmation for delivery and information about errors
p2pSyncRequestCode = 123 // used to sync envelopes between two mail servers
p2pSyncResponseCode = 124 // used to sync envelopes between two mail servers
p2pRequestCompleteCode = 125 // peer-to-peer message, used by Dapp protocol
@ -84,6 +88,9 @@ const (
DefaultSyncAllowance = 10 // seconds
MaxLimitInSyncMailRequest = 1000
EnvelopeTimeNotSynced uint = iota + 1
EnvelopeOtherError
)
// MailServer represents a mail server, capable of
@ -134,3 +141,60 @@ type SyncResponse struct {
Final bool // if true it means all envelopes were processed
Error string
}
// MessagesResponse sent as a response after processing batch of envelopes.
type MessagesResponse struct {
// Hash is a hash of all envelopes sent in the single batch.
Hash common.Hash
// Per envelope error.
Errors []EnvelopeError
}
// EnvelopeError code and optional description of the error.
type EnvelopeError struct {
Hash common.Hash
Code uint
Description string
}
// MultiVersionResponse allows to decode response into chosen version.
type MultiVersionResponse struct {
Version uint
Response rlp.RawValue
}
// DecodeResponse1 decodes response into first version of the messages response.
func (m MultiVersionResponse) DecodeResponse1() (resp MessagesResponse, err error) {
return resp, rlp.DecodeBytes(m.Response, &resp)
}
// Version1MessageResponse first version of the message response.
type Version1MessageResponse struct {
Version uint
Response MessagesResponse
}
// NewMessagesResponse returns instane of the version messages response.
func NewMessagesResponse(batch common.Hash, errors []EnvelopeError) Version1MessageResponse {
return Version1MessageResponse{
Version: 1,
Response: MessagesResponse{
Hash: batch,
Errors: errors,
},
}
}
// ErrorToEnvelopeError converts common golang error into EnvelopeError with a code.
func ErrorToEnvelopeError(hash common.Hash, err error) EnvelopeError {
code := EnvelopeOtherError
switch err.(type) {
case TimeSyncError:
code = EnvelopeTimeNotSynced
}
return EnvelopeError{
Hash: hash,
Code: code,
Description: err.Error(),
}
}

View File

@ -20,6 +20,7 @@ import (
"bytes"
"crypto/ecdsa"
"crypto/sha256"
"errors"
"fmt"
"io"
"io/ioutil"
@ -37,11 +38,13 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/syndtr/goleveldb/leveldb/errors"
"golang.org/x/crypto/pbkdf2"
"golang.org/x/sync/syncmap"
)
// TimeSyncError error for clock skew errors.
type TimeSyncError error
// Statistics holds several message-related counter for analytics
// purposes.
type Statistics struct {
@ -835,8 +838,13 @@ func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
return whisper.runMessageLoop(whisperPeer, rw)
}
func (whisper *Whisper) sendConfirmation(peer enode.ID, rw p2p.MsgReadWriter, data []byte) {
func (whisper *Whisper) sendConfirmation(peer enode.ID, rw p2p.MsgReadWriter, data []byte,
envelopeErrors []EnvelopeError) {
batchHash := crypto.Keccak256Hash(data)
if err := p2p.Send(rw, messageResponseCode, NewMessagesResponse(batchHash, envelopeErrors)); err != nil {
log.Warn("failed to deliver messages response", "hash", batchHash, "envelopes errors", envelopeErrors,
"peer", peer, "error", err)
}
if err := p2p.Send(rw, batchAcknowledgedCode, batchHash); err != nil {
log.Warn("failed to deliver confirmation", "hash", batchHash, "peer", peer, "error", err)
}
@ -867,9 +875,6 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("failed to read envelopes data", "peer", p.peer.ID(), "error", err)
return errors.New("invalid enveloopes")
}
if !whisper.disableConfirmations {
go whisper.sendConfirmation(p.peer.ID(), rw, data)
}
var envelopes []*Envelope
if err := rlp.DecodeBytes(data, &envelopes); err != nil {
@ -877,12 +882,18 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
return errors.New("invalid envelopes")
}
trouble := false
envelopeErrors := []EnvelopeError{}
for _, env := range envelopes {
cached, err := whisper.add(env, whisper.LightClientMode())
if err != nil {
trouble = true
log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
_, isTimeSyncError := err.(TimeSyncError)
if !isTimeSyncError {
trouble = true
log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
}
envelopeErrors = append(envelopeErrors, ErrorToEnvelopeError(env.Hash(), err))
}
whisper.envelopeFeed.Send(EnvelopeEvent{
Event: EventEnvelopeReceived,
Hash: env.Hash(),
@ -892,14 +903,37 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
p.mark(env)
}
}
if !whisper.disableConfirmations {
go whisper.sendConfirmation(p.peer.ID(), rw, data, envelopeErrors)
}
if trouble {
return errors.New("invalid envelope")
}
case messageResponseCode:
var multiResponse MultiVersionResponse
if err := packet.Decode(&multiResponse); err != nil {
log.Error("failed to decode messages response", "peer", p.peer.ID(), "error", err)
return errors.New("invalid response message")
}
if multiResponse.Version == 1 {
response, err := multiResponse.DecodeResponse1()
if err != nil {
log.Error("failed to decode messages response into first version of response", "peer", p.peer.ID(), "error", err)
}
whisper.envelopeFeed.Send(EnvelopeEvent{
Batch: response.Hash,
Event: EventBatchAcknowledged,
Peer: p.peer.ID(),
Data: response.Errors,
})
} else {
log.Warn("unknown version of the messages response was received. response is ignored", "peer", p.peer.ID(), "version", multiResponse.Version)
}
case batchAcknowledgedCode:
var batchHash common.Hash
if err := packet.Decode(&batchHash); err != nil {
log.Warn("failed to decode confirmation into common.Hash", "peer", p.peer.ID(), "error", err)
log.Error("failed to decode confirmation into common.Hash", "peer", p.peer.ID(), "error", err)
return errors.New("invalid confirmation message")
}
whisper.envelopeFeed.Send(EnvelopeEvent{
@ -1076,11 +1110,11 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
sent := envelope.Expiry - envelope.TTL
envelopeAddedCounter.Inc(1)
if sent > now {
if sent-DefaultSyncAllowance > now {
envelopeErrFromFutureCounter.Inc(1)
return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
log.Warn("envelope created in the future", "hash", envelope.Hash())
return false, TimeSyncError(errors.New("envelope from future"))
}
// recalculate PoW, adjusted for the time difference, plus one second for latency
envelope.calculatePoW(sent - now + 1)
@ -1089,7 +1123,8 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
if envelope.Expiry < now {
if envelope.Expiry+DefaultSyncAllowance*2 < now {
envelopeErrVeryOldCounter.Inc(1)
return false, fmt.Errorf("very old message")
log.Warn("very old envelope", "hash", envelope.Hash())
return false, TimeSyncError(errors.New("very old envelope"))
}
log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
envelopeErrExpiredCounter.Inc(1)