diff --git a/common/message/message.go b/common/message/message.go new file mode 100644 index 00000000..c9f8965c --- /dev/null +++ b/common/message/message.go @@ -0,0 +1,65 @@ +package message + +// Direction defines a int type to indicate a message as either incoming or outgoing. +type Direction int + +// consts of all message direction values. +const ( + IncomingMessage Direction = iota + 1 + OutgoingMessage +) + +// String returns the representation of giving direction. +func (d Direction) String() string { + switch d { + case IncomingMessage: + return "IncomingMessage" + case OutgoingMessage: + return "OutgoingMessage" + } + + return "MessageDirectionUnknown" +} + +// Status defines a int type to indicate different status value of a +// message state. +type Status int + +// consts of all message delivery status. +const ( + PendingStatus Status = iota + 1 + QueuedStatus + CachedStatus + SentStatus + ExpiredStatus + ProcessingStatus + ResentStatus + RejectedStatus + DeliveredStatus +) + +// String returns the representation of giving state. +func (s Status) String() string { + switch s { + case PendingStatus: + return "Pending" + case QueuedStatus: + return "Queued" + case CachedStatus: + return "Cached" + case SentStatus: + return "Sent" + case ProcessingStatus: + return "Processing" + case ExpiredStatus: + return "ExpiredTTL" + case ResentStatus: + return "Resent" + case RejectedStatus: + return "Rejected" + case DeliveredStatus: + return "Delivered" + } + + return "unknown" +} diff --git a/whisper/whisperv5/api.go b/whisper/whisperv5/api.go index e3c2f4a9..75ef8b66 100644 --- a/whisper/whisperv5/api.go +++ b/whisper/whisperv5/api.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/common/message" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" @@ -238,13 +239,17 @@ type newMessageOverride struct { // Post a message on the Whisper network. func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, error) { var ( - symKeyGiven = len(req.SymKeyID) > 0 - pubKeyGiven = len(req.PublicKey) > 0 - err error + symKeyGiven = len(req.SymKeyID) > 0 + pubKeyGiven = len(req.PublicKey) > 0 + isP2PMessage = len(req.TargetPeer) > 0 + err error ) + api.w.traceOutgoingDelivery(isP2PMessage, message.PendingStatus, &req, nil, nil, nil) + // user must specify either a symmetric or an asymmetric key if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) { + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrSymAsym) return false, ErrSymAsym } @@ -260,6 +265,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er // Set key that is used to sign the message if len(req.Sig) > 0 { if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil { + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err) return false, err } } @@ -267,12 +273,15 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er // Set symmetric key that is used to encrypt the message if symKeyGiven { if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrNoTopics) return false, ErrNoTopics } if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil { + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err) return false, err } if !validateSymmetricKey(params.KeySym) { + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrInvalidSymmetricKey) return false, ErrInvalidSymmetricKey } } @@ -281,6 +290,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er if pubKeyGiven { params.Dst = crypto.ToECDSAPub(req.PublicKey) if !ValidatePublicKey(params.Dst) { + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrInvalidPublicKey) return false, ErrInvalidPublicKey } } @@ -288,11 +298,13 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er // encrypt and sent message whisperMsg, err := NewSentMessage(params) if err != nil { + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err) return false, err } env, err := whisperMsg.Wrap(params) if err != nil { + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err) return false, err } @@ -300,16 +312,28 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er if len(req.TargetPeer) > 0 { n, err := discover.ParseNode(req.TargetPeer) if err != nil { + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, err) return false, fmt.Errorf("failed to parse target peer: %s", err) } - return true, api.w.SendP2PMessage(n.ID[:], env) + + api.w.traceOutgoingDelivery(isP2PMessage, message.SentStatus, &req, env, nil, nil) + + if err := api.w.SendP2PMessage(n.ID[:], env); err != nil { + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, err) + return true, err + } + + api.w.traceOutgoingDelivery(isP2PMessage, message.DeliveredStatus, &req, env, nil, err) + return true, nil } // ensure that the message PoW meets the node's minimum accepted PoW if req.PowTarget < api.w.MinPow() { + api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, ErrTooLowPoW) return false, ErrTooLowPoW } + api.w.traceOutgoingDelivery(isP2PMessage, message.SentStatus, &req, env, nil, nil) return true, api.w.Send(env) } diff --git a/whisper/whisperv5/doc.go b/whisper/whisperv5/doc.go index a6c9e610..b3bc9963 100644 --- a/whisper/whisperv5/doc.go +++ b/whisper/whisperv5/doc.go @@ -33,6 +33,7 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/common/message" "github.com/ethereum/go-ethereum/p2p" ) @@ -99,3 +100,23 @@ type NotificationServer interface { // Stop stops notification sending loop, releasing related resources Stop() error } + +// MessageState holds the current delivery status of a whisper p2p message. +type MessageState struct { + IsP2P bool `json:"is_p2p"` + Reason error `json:"reason"` + Envelope Envelope `json:"envelope"` + Timestamp time.Time `json:"timestamp"` + Source NewMessage `json:"source"` + Status message.Status `json:"status"` + Direction message.Direction `json:"direction"` + Received ReceivedMessage `json:"received"` +} + +// DeliveryServer represents a small message status +// notification system where a message delivery status +// update event is delivered to it's underline system +// for both rpc messages and p2p messages. +type DeliveryServer interface { + SendState(MessageState) +} diff --git a/whisper/whisperv5/filter.go b/whisper/whisperv5/filter.go index b5e893e0..71dc3b2d 100644 --- a/whisper/whisperv5/filter.go +++ b/whisper/whisperv5/filter.go @@ -18,10 +18,12 @@ package whisperv5 import ( "crypto/ecdsa" + "errors" "fmt" "sync" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/message" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" ) @@ -115,15 +117,20 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) { if match { msg = env.Open(watcher) if msg == nil { + err := errors.New("Envelope failed to be opened") + fs.whisper.traceIncomingDelivery(p2pMessage, message.RejectedStatus, nil, env, nil, err) log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i) } } else { + err := errors.New("processing message: does not match") + fs.whisper.traceIncomingDelivery(p2pMessage, message.RejectedStatus, nil, env, nil, err) log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i) } } if match && msg != nil { log.Trace("processing message: decrypted", "hash", env.Hash().Hex()) + fs.whisper.traceIncomingDelivery(p2pMessage, message.DeliveredStatus, nil, env, msg, nil) if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) { watcher.Trigger(msg) } diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go index d1ef2445..58d0fd6e 100644 --- a/whisper/whisperv5/whisper.go +++ b/whisper/whisperv5/whisper.go @@ -27,6 +27,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/message" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" @@ -77,7 +78,8 @@ type Whisper struct { statsMu sync.Mutex // guard stats stats Statistics // Statistics of whisper node - mailServer MailServer // MailServer interface + mailServer MailServer // MailServer interface + deliveryServer DeliveryServer // DeliveryServer interface notificationServer NotificationServer } @@ -157,6 +159,11 @@ func (w *Whisper) RegisterServer(server MailServer) { w.mailServer = server } +// RegisterDeliveryServer registers notification server with Whisper +func (w *Whisper) RegisterDeliveryServer(server DeliveryServer) { + w.deliveryServer = server +} + // RegisterNotificationServer registers notification server with Whisper func (w *Whisper) RegisterNotificationServer(server NotificationServer) { w.notificationServer = server @@ -620,8 +627,11 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { var envelope Envelope if err := packet.Decode(&envelope); err != nil { log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + wh.traceIncomingDelivery(true, message.RejectedStatus, nil, &envelope, nil, err) return errors.New("invalid direct message") } + + wh.traceIncomingDelivery(true, message.SentStatus, nil, &envelope, nil, nil) wh.postEvent(&envelope, true) } case p2pRequestCode: @@ -630,6 +640,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { var request Envelope if err := packet.Decode(&request); err != nil { log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err) + wh.traceIncomingDelivery(true, message.RejectedStatus, nil, &request, nil, err) return errors.New("invalid p2p request") } wh.mailServer.DeliverMail(p, &request) @@ -700,16 +711,22 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { if !wh.expirations[envelope.Expiry].Has(hash) { wh.expirations[envelope.Expiry].Add(hash) } + + wh.traceIncomingDelivery(false, message.CachedStatus, nil, envelope, nil, nil) } wh.poolMu.Unlock() if alreadyCached { log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex()) + wh.traceIncomingDelivery(false, message.ResentStatus, nil, envelope, nil, nil) } else { log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex()) wh.statsMu.Lock() wh.stats.memoryUsed += envelope.size() wh.statsMu.Unlock() + + wh.traceIncomingDelivery(false, message.QueuedStatus, nil, envelope, nil, nil) + wh.postEvent(envelope, false) // notify the local node about the new message if wh.mailServer != nil { wh.mailServer.Archive(envelope) @@ -718,6 +735,47 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { return true, nil } +func (w *Whisper) traceIncomingDelivery(isP2P bool, status message.Status, src *NewMessage, env *Envelope, rec *ReceivedMessage, err error) { + w.traceDelivery(isP2P, message.IncomingMessage, status, src, env, rec, err) +} + +func (w *Whisper) traceOutgoingDelivery(isP2P bool, status message.Status, src *NewMessage, env *Envelope, rec *ReceivedMessage, err error) { + w.traceDelivery(isP2P, message.OutgoingMessage, status, src, env, rec, err) +} + +func (w *Whisper) traceDelivery(isP2P bool, dir message.Direction, status message.Status, newmsg *NewMessage, envelope *Envelope, received *ReceivedMessage, err error) { + if w.deliveryServer == nil { + return + } + + var env Envelope + var rec ReceivedMessage + var src NewMessage + + if newmsg != nil { + src = *newmsg + } + + if envelope != nil { + env = *envelope + } + + if received != nil { + rec = *received + } + + go w.deliveryServer.SendState(MessageState{ + Reason: err, + Source: src, + Received: rec, + IsP2P: isP2P, + Status: status, + Envelope: env, + Direction: dir, + Timestamp: time.Now(), + }) +} + // postEvent queues the message for further processing. func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) { // if the version of incoming message is higher than @@ -730,6 +788,13 @@ func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) { w.checkOverflow() w.messageQueue <- envelope } + + return + } + + if w.deliveryServer != nil { + err := fmt.Errorf("Mismatch Envelope version(%d) to wanted Version(%d)", envelope.Ver(), EnvelopeVersion) + w.traceIncomingDelivery(isP2P, message.RejectedStatus, nil, envelope, nil, err) } } @@ -759,9 +824,11 @@ func (w *Whisper) processQueue() { return case e = <-w.messageQueue: + w.traceIncomingDelivery(false, message.ProcessingStatus, nil, e, nil, nil) w.filters.NotifyWatchers(e, false) case e = <-w.p2pMsgQueue: + w.traceIncomingDelivery(true, message.ProcessingStatus, nil, e, nil, nil) w.filters.NotifyWatchers(e, true) } }