status-go/geth-patches/0005-whisper-delivery.patch

406 lines
14 KiB
Diff
Raw Normal View History

diff --git a/common/message/message.go b/common/message/message.go
new file mode 100644
index 00000000..c9f8965c
--- /dev/null
+++ b/common/message/message.go
@@ -0,0 +1,65 @@
+package message
+
+// Direction defines a int type to indicate a message as either incoming or outgoing.
+type Direction int
+
+// consts of all message direction values.
+const (
+ IncomingMessage Direction = iota + 1
+ OutgoingMessage
+)
+
+// String returns the representation of giving direction.
+func (d Direction) String() string {
+ switch d {
+ case IncomingMessage:
+ return "IncomingMessage"
+ case OutgoingMessage:
+ return "OutgoingMessage"
+ }
+
+ return "MessageDirectionUnknown"
+}
+
+// Status defines a int type to indicate different status value of a
+// message state.
+type Status int
+
+// consts of all message delivery status.
+const (
+ PendingStatus Status = iota + 1
+ QueuedStatus
+ CachedStatus
+ SentStatus
+ ExpiredStatus
+ ProcessingStatus
+ ResentStatus
+ RejectedStatus
+ DeliveredStatus
+)
+
+// String returns the representation of giving state.
+func (s Status) String() string {
+ switch s {
+ case PendingStatus:
+ return "Pending"
+ case QueuedStatus:
+ return "Queued"
+ case CachedStatus:
+ return "Cached"
+ case SentStatus:
+ return "Sent"
+ case ProcessingStatus:
+ return "Processing"
+ case ExpiredStatus:
+ return "ExpiredTTL"
+ case ResentStatus:
+ return "Resent"
+ case RejectedStatus:
+ return "Rejected"
+ case DeliveredStatus:
+ return "Delivered"
+ }
+
+ return "unknown"
+}
diff --git a/whisper/whisperv5/api.go b/whisper/whisperv5/api.go
index e3c2f4a9..75ef8b66 100644
--- a/whisper/whisperv5/api.go
+++ b/whisper/whisperv5/api.go
@@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -238,13 +239,17 @@ type newMessageOverride struct {
// Post a message on the Whisper network.
func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, error) {
var (
- symKeyGiven = len(req.SymKeyID) > 0
- pubKeyGiven = len(req.PublicKey) > 0
- err error
+ symKeyGiven = len(req.SymKeyID) > 0
+ pubKeyGiven = len(req.PublicKey) > 0
+ isP2PMessage = len(req.TargetPeer) > 0
+ err error
)
+ api.w.traceOutgoingDelivery(isP2PMessage, message.PendingStatus, &req, nil, nil, nil)
+
// user must specify either a symmetric or an asymmetric key
if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrSymAsym)
return false, ErrSymAsym
}
@@ -260,6 +265,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// Set key that is used to sign the message
if len(req.Sig) > 0 {
if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
}
@@ -267,12 +273,15 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// Set symmetric key that is used to encrypt the message
if symKeyGiven {
if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrNoTopics)
return false, ErrNoTopics
}
if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
if !validateSymmetricKey(params.KeySym) {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrInvalidSymmetricKey)
return false, ErrInvalidSymmetricKey
}
}
@@ -281,6 +290,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if pubKeyGiven {
params.Dst = crypto.ToECDSAPub(req.PublicKey)
if !ValidatePublicKey(params.Dst) {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrInvalidPublicKey)
return false, ErrInvalidPublicKey
}
}
@@ -288,11 +298,13 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// encrypt and sent message
whisperMsg, err := NewSentMessage(params)
if err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
env, err := whisperMsg.Wrap(params)
if err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
@@ -300,16 +312,28 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if len(req.TargetPeer) > 0 {
n, err := discover.ParseNode(req.TargetPeer)
if err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, err)
return false, fmt.Errorf("failed to parse target peer: %s", err)
}
- return true, api.w.SendP2PMessage(n.ID[:], env)
+
+ api.w.traceOutgoingDelivery(isP2PMessage, message.SentStatus, &req, env, nil, nil)
+
+ if err := api.w.SendP2PMessage(n.ID[:], env); err != nil {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, err)
+ return true, err
+ }
+
+ api.w.traceOutgoingDelivery(isP2PMessage, message.DeliveredStatus, &req, env, nil, err)
+ return true, nil
}
// ensure that the message PoW meets the node's minimum accepted PoW
if req.PowTarget < api.w.MinPow() {
+ api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, ErrTooLowPoW)
return false, ErrTooLowPoW
}
+ api.w.traceOutgoingDelivery(isP2PMessage, message.SentStatus, &req, env, nil, nil)
return true, api.w.Send(env)
}
diff --git a/whisper/whisperv5/doc.go b/whisper/whisperv5/doc.go
index a6c9e610..b3bc9963 100644
--- a/whisper/whisperv5/doc.go
+++ b/whisper/whisperv5/doc.go
@@ -33,6 +33,7 @@ import (
"fmt"
"time"
+ "github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/p2p"
)
@@ -99,3 +100,23 @@ type NotificationServer interface {
// Stop stops notification sending loop, releasing related resources
Stop() error
}
+
+// MessageState holds the current delivery status of a whisper p2p message.
+type MessageState struct {
+ IsP2P bool `json:"is_p2p"`
+ Reason error `json:"reason"`
+ Envelope Envelope `json:"envelope"`
+ Timestamp time.Time `json:"timestamp"`
+ Source NewMessage `json:"source"`
+ Status message.Status `json:"status"`
+ Direction message.Direction `json:"direction"`
+ Received ReceivedMessage `json:"received"`
+}
+
+// DeliveryServer represents a small message status
+// notification system where a message delivery status
+// update event is delivered to it's underline system
+// for both rpc messages and p2p messages.
+type DeliveryServer interface {
+ SendState(MessageState)
+}
diff --git a/whisper/whisperv5/filter.go b/whisper/whisperv5/filter.go
index b5e893e0..71dc3b2d 100644
--- a/whisper/whisperv5/filter.go
+++ b/whisper/whisperv5/filter.go
@@ -18,10 +18,12 @@ package whisperv5
import (
"crypto/ecdsa"
+ "errors"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)
@@ -115,15 +117,20 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
if match {
msg = env.Open(watcher)
if msg == nil {
+ err := errors.New("Envelope failed to be opened")
+ fs.whisper.traceIncomingDelivery(p2pMessage, message.RejectedStatus, nil, env, nil, err)
log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i)
}
} else {
+ err := errors.New("processing message: does not match")
+ fs.whisper.traceIncomingDelivery(p2pMessage, message.RejectedStatus, nil, env, nil, err)
log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i)
}
}
if match && msg != nil {
log.Trace("processing message: decrypted", "hash", env.Hash().Hex())
+ fs.whisper.traceIncomingDelivery(p2pMessage, message.DeliveredStatus, nil, env, msg, nil)
if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) {
watcher.Trigger(msg)
}
diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go
index d1ef2445..58d0fd6e 100644
--- a/whisper/whisperv5/whisper.go
+++ b/whisper/whisperv5/whisper.go
@@ -27,6 +27,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
@@ -77,7 +78,8 @@ type Whisper struct {
statsMu sync.Mutex // guard stats
stats Statistics // Statistics of whisper node
- mailServer MailServer // MailServer interface
+ mailServer MailServer // MailServer interface
+ deliveryServer DeliveryServer // DeliveryServer interface
notificationServer NotificationServer
}
@@ -157,6 +159,11 @@ func (w *Whisper) RegisterServer(server MailServer) {
w.mailServer = server
}
+// RegisterDeliveryServer registers notification server with Whisper
+func (w *Whisper) RegisterDeliveryServer(server DeliveryServer) {
+ w.deliveryServer = server
+}
+
// RegisterNotificationServer registers notification server with Whisper
func (w *Whisper) RegisterNotificationServer(server NotificationServer) {
w.notificationServer = server
@@ -620,8 +627,11 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
var envelope Envelope
if err := packet.Decode(&envelope); err != nil {
log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ wh.traceIncomingDelivery(true, message.RejectedStatus, nil, &envelope, nil, err)
return errors.New("invalid direct message")
}
+
+ wh.traceIncomingDelivery(true, message.SentStatus, nil, &envelope, nil, nil)
wh.postEvent(&envelope, true)
}
case p2pRequestCode:
@@ -630,6 +640,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
var request Envelope
if err := packet.Decode(&request); err != nil {
log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
+ wh.traceIncomingDelivery(true, message.RejectedStatus, nil, &request, nil, err)
return errors.New("invalid p2p request")
}
wh.mailServer.DeliverMail(p, &request)
@@ -700,16 +711,22 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
if !wh.expirations[envelope.Expiry].Has(hash) {
wh.expirations[envelope.Expiry].Add(hash)
}
+
+ wh.traceIncomingDelivery(false, message.CachedStatus, nil, envelope, nil, nil)
}
wh.poolMu.Unlock()
if alreadyCached {
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
+ wh.traceIncomingDelivery(false, message.ResentStatus, nil, envelope, nil, nil)
} else {
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
wh.statsMu.Lock()
wh.stats.memoryUsed += envelope.size()
wh.statsMu.Unlock()
+
+ wh.traceIncomingDelivery(false, message.QueuedStatus, nil, envelope, nil, nil)
+
wh.postEvent(envelope, false) // notify the local node about the new message
if wh.mailServer != nil {
wh.mailServer.Archive(envelope)
@@ -718,6 +735,47 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
return true, nil
}
+func (w *Whisper) traceIncomingDelivery(isP2P bool, status message.Status, src *NewMessage, env *Envelope, rec *ReceivedMessage, err error) {
+ w.traceDelivery(isP2P, message.IncomingMessage, status, src, env, rec, err)
+}
+
+func (w *Whisper) traceOutgoingDelivery(isP2P bool, status message.Status, src *NewMessage, env *Envelope, rec *ReceivedMessage, err error) {
+ w.traceDelivery(isP2P, message.OutgoingMessage, status, src, env, rec, err)
+}
+
+func (w *Whisper) traceDelivery(isP2P bool, dir message.Direction, status message.Status, newmsg *NewMessage, envelope *Envelope, received *ReceivedMessage, err error) {
+ if w.deliveryServer == nil {
+ return
+ }
+
+ var env Envelope
+ var rec ReceivedMessage
+ var src NewMessage
+
+ if newmsg != nil {
+ src = *newmsg
+ }
+
+ if envelope != nil {
+ env = *envelope
+ }
+
+ if received != nil {
+ rec = *received
+ }
+
+ go w.deliveryServer.SendState(MessageState{
+ Reason: err,
+ Source: src,
+ Received: rec,
+ IsP2P: isP2P,
+ Status: status,
+ Envelope: env,
+ Direction: dir,
+ Timestamp: time.Now(),
+ })
+}
+
// postEvent queues the message for further processing.
func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
// if the version of incoming message is higher than
@@ -730,6 +788,13 @@ func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
w.checkOverflow()
w.messageQueue <- envelope
}
+
+ return
+ }
+
+ if w.deliveryServer != nil {
+ err := fmt.Errorf("Mismatch Envelope version(%d) to wanted Version(%d)", envelope.Ver(), EnvelopeVersion)
+ w.traceIncomingDelivery(isP2P, message.RejectedStatus, nil, envelope, nil, err)
}
}
@@ -759,9 +824,11 @@ func (w *Whisper) processQueue() {
return
case e = <-w.messageQueue:
+ w.traceIncomingDelivery(false, message.ProcessingStatus, nil, e, nil, nil)
w.filters.NotifyWatchers(e, false)
case e = <-w.p2pMsgQueue:
+ w.traceIncomingDelivery(true, message.ProcessingStatus, nil, e, nil, nil)
w.filters.NotifyWatchers(e, true)
}
}