Emit messages logs for processing and sorting out messaging problems (#420)

* Add DeliveryService to Whisper to track incoming and outgoing messages.
* Have log tagged log lines for incoming and outgoing messages.
This commit is contained in:
Ewetumo Alexander 2017-11-06 20:10:52 +01:00 committed by Ivan Tomilov
parent fb75054a35
commit cb5ccb52c4
11 changed files with 305 additions and 11 deletions

View File

@ -477,7 +477,7 @@ func (s *ManagerTestSuite) TestNodeStartCrash() {
s.NoError(err)
// start node outside the manager (on the same port), so that manager node.Start() method fails
outsideNode, err := node.MakeNode(nodeConfig)
outsideNode, err := node.MakeNode(nodeConfig, node.LogDeliveryService{})
s.NoError(err)
err = outsideNode.Start()
s.NoError(err)

View File

@ -43,6 +43,34 @@ func (k *SelectedExtKey) Hex() string {
return k.Address.Hex()
}
// MessageState defines a struct to hold given facts about a message stat.
type MessageState struct {
// Type defines Direction type: IncomingMessage or OutgoingMessage.
Type string `json:"type"`
// Protocol defines means of transmission in whisper: RPC or P2P.
Protocol string `json:"protocol"`
// Status defines current status of message: Pending, Delivered, Rejected, etc.
Status string `json:"status"`
// Envelope struct holding encrypted message.
Envelope []byte `json:"envelope"`
// Time in of sent time of message.
TimeSent uint32 `json:"time,omitempty"`
// Received defines time when delivery notification was received
Received time.Time `json:"received"`
// Payload associated with envelope.
Payload []byte `json:"payload,omitempty"`
// Hash defines the Envelope's hash
Hash string `json:"envelope_hash"`
// FromDevice defines the device sending message if value is extractable.
FromDevice string `json:"from_device,omitempty"`
// ToDevice defines the receiving message if value is extractable.
ToDevice string `json:"to_device,omitempty"`
// RejectionError defines the error message when message ending with a Rejected status.
RejectionError string `json:"rejection_reason,omitempty"`
// Source of message when type is Outgoing which contains raw rpc data.
Source whisper.NewMessage `json:"source,omitempty"`
}
// NodeManager defines expected methods for managing Status node
type NodeManager interface {
// StartNode start Status node, fails if node is already started

78
geth/node/delivery.go Normal file
View File

@ -0,0 +1,78 @@
package node
import (
"encoding/base64"
"encoding/json"
gethcommon "github.com/ethereum/go-ethereum/common"
gethmessage "github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/log"
)
// LogDeliveryService implements the whisper.DeliveryServer which logs out
// stats of whisper.MessageState to the log.
type LogDeliveryService struct{}
// SendState logs incoming whisper.MesssageState into the log.
func (ld LogDeliveryService) SendState(state whisper.MessageState) {
var stat common.MessageState
var protocol string
var payload []byte
var from, to string
if state.IsP2P {
protocol = "P2P"
} else {
protocol = "RPC"
}
switch state.Direction {
case gethmessage.IncomingMessage:
payload = state.Received.Payload
if state.Received.Src != nil {
from = gethcommon.ToHex(crypto.FromECDSAPub(state.Received.Src))
}
if state.Received.Dst != nil {
to = gethcommon.ToHex(crypto.FromECDSAPub(state.Received.Dst))
}
case gethmessage.OutgoingMessage:
from = state.Source.Sig
if len(state.Source.PublicKey) == 0 {
to = string(state.Source.PublicKey)
} else {
to = state.Source.TargetPeer
}
}
if state.Reason != nil {
stat.RejectionError = state.Reason.Error()
}
stat.Protocol = protocol
stat.Payload = payload
stat.FromDevice = from
stat.ToDevice = to
stat.Received = state.Timestamp
stat.Source = state.Source
stat.Envelope = state.Envelope.Data
stat.Status = state.Status.String()
stat.Type = state.Direction.String()
stat.Hash = state.Envelope.Hash().String()
stat.TimeSent = state.Envelope.Expiry - state.Envelope.TTL
statdata, err := json.Marshal(stat)
if err != nil {
log.Warn("failed to marshal common.MessageStat", "err", err)
return
}
encodedStat := base64.StdEncoding.EncodeToString(statdata)
log.Info("Message delivery notification", "state", encodedStat)
}

View File

@ -69,7 +69,7 @@ func (m *NodeManager) startNode(config *params.NodeConfig) (<-chan struct{}, err
m.initLog(config)
ethNode, err := MakeNode(config)
ethNode, err := MakeNode(config, LogDeliveryService{})
if err != nil {
return nil, err
}

View File

@ -40,7 +40,7 @@ var (
)
// MakeNode create a geth node entity
func MakeNode(config *params.NodeConfig) (*node.Node, error) {
func MakeNode(config *params.NodeConfig, deliveryServer whisper.DeliveryServer) (*node.Node, error) {
// make sure data directory exists
if err := os.MkdirAll(filepath.Join(config.DataDir), os.ModePerm); err != nil {
return nil, err
@ -78,7 +78,7 @@ func MakeNode(config *params.NodeConfig) (*node.Node, error) {
}
// start Whisper service
if err := activateShhService(stack, config); err != nil {
if err := activateShhService(stack, config, deliveryServer); err != nil {
return nil, fmt.Errorf("%v: %v", ErrWhisperServiceRegistrationFailure, err)
}
@ -180,7 +180,7 @@ func activateEthService(stack *node.Node, config *params.NodeConfig) error {
}
// activateShhService configures Whisper and adds it to the given node.
func activateShhService(stack *node.Node, config *params.NodeConfig) error {
func activateShhService(stack *node.Node, config *params.NodeConfig, deliveryServer whisper.DeliveryServer) error {
if !config.WhisperConfig.Enabled {
log.Info("SHH protocol is disabled")
return nil
@ -190,6 +190,10 @@ func activateShhService(stack *node.Node, config *params.NodeConfig) error {
whisperConfig := config.WhisperConfig
whisperService := whisper.New(nil)
if deliveryServer != nil {
whisperService.RegisterDeliveryServer(deliveryServer)
}
// enable mail service
if whisperConfig.MailServerNode {
password, err := whisperConfig.ReadPasswordFile()

View File

@ -34,7 +34,7 @@ lint-gocyclo:
@gometalinter $(LINT_EXCLUDE) --disable-all --enable=gocyclo --cyclo-over=16 --deadline=45s $(LINT_FOLDERS)
lint-errcheck:
@echo "lint-errcheck"
@gometalinter $(LINT_EXCLUDE) --disable-all --enable=errcheck --deadline=45s $(LINT_FOLDERS)
@gometalinter $(LINT_EXCLUDE) --disable-all --enable=errcheck --deadline=1m $(LINT_FOLDERS)
lint-ineffassign:
@echo "lint-ineffassign"
@gometalinter $(LINT_EXCLUDE) --disable-all --enable=ineffassign --deadline=45s $(LINT_FOLDERS)

View File

@ -0,0 +1,65 @@
package message
// Direction defines a int type to indicate a message as either incoming or outgoing.
type Direction int
// consts of all message direction values.
const (
IncomingMessage Direction = iota + 1
OutgoingMessage
)
// String returns the representation of giving direction.
func (d Direction) String() string {
switch d {
case IncomingMessage:
return "IncomingMessage"
case OutgoingMessage:
return "OutgoingMessage"
}
return "MessageDirectionUnknown"
}
// Status defines a int type to indicate different status value of a
// message state.
type Status int
// consts of all message delivery status.
const (
PendingStatus Status = iota + 1
QueuedStatus
CachedStatus
SentStatus
ExpiredStatus
ProcessingStatus
ResentStatus
RejectedStatus
DeliveredStatus
)
// String returns the representation of giving state.
func (s Status) String() string {
switch s {
case PendingStatus:
return "Pending"
case QueuedStatus:
return "Queued"
case CachedStatus:
return "Cached"
case SentStatus:
return "Sent"
case ProcessingStatus:
return "Processing"
case ExpiredStatus:
return "ExpiredTTL"
case ResentStatus:
return "Resent"
case RejectedStatus:
return "Rejected"
case DeliveredStatus:
return "Delivered"
}
return "unknown"
}

View File

@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
@ -240,11 +241,15 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
var (
symKeyGiven = len(req.SymKeyID) > 0
pubKeyGiven = len(req.PublicKey) > 0
isP2PMessage = len(req.TargetPeer) > 0
err error
)
api.w.traceOutgoingDelivery(isP2PMessage, message.PendingStatus, &req, nil, nil, nil)
// user must specify either a symmetric or an asymmetric key
if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrSymAsym)
return false, ErrSymAsym
}
@ -260,6 +265,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// Set key that is used to sign the message
if len(req.Sig) > 0 {
if params.Src, err = api.w.GetPrivateKey(req.Sig); err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
}
@ -267,12 +273,15 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// Set symmetric key that is used to encrypt the message
if symKeyGiven {
if params.Topic == (TopicType{}) { // topics are mandatory with symmetric encryption
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrNoTopics)
return false, ErrNoTopics
}
if params.KeySym, err = api.w.GetSymKey(req.SymKeyID); err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
if !validateSymmetricKey(params.KeySym) {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrInvalidSymmetricKey)
return false, ErrInvalidSymmetricKey
}
}
@ -281,6 +290,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if pubKeyGiven {
params.Dst = crypto.ToECDSAPub(req.PublicKey)
if !ValidatePublicKey(params.Dst) {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, ErrInvalidPublicKey)
return false, ErrInvalidPublicKey
}
}
@ -288,11 +298,13 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
// encrypt and sent message
whisperMsg, err := NewSentMessage(params)
if err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
env, err := whisperMsg.Wrap(params)
if err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, nil, nil, err)
return false, err
}
@ -300,16 +312,28 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er
if len(req.TargetPeer) > 0 {
n, err := discover.ParseNode(req.TargetPeer)
if err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, err)
return false, fmt.Errorf("failed to parse target peer: %s", err)
}
return true, api.w.SendP2PMessage(n.ID[:], env)
api.w.traceOutgoingDelivery(isP2PMessage, message.SentStatus, &req, env, nil, nil)
if err := api.w.SendP2PMessage(n.ID[:], env); err != nil {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, err)
return true, err
}
api.w.traceOutgoingDelivery(isP2PMessage, message.DeliveredStatus, &req, env, nil, err)
return true, nil
}
// ensure that the message PoW meets the node's minimum accepted PoW
if req.PowTarget < api.w.MinPow() {
api.w.traceOutgoingDelivery(isP2PMessage, message.RejectedStatus, &req, env, nil, ErrTooLowPoW)
return false, ErrTooLowPoW
}
api.w.traceOutgoingDelivery(isP2PMessage, message.SentStatus, &req, env, nil, nil)
return true, api.w.Send(env)
}

View File

@ -33,6 +33,7 @@ import (
"fmt"
"time"
"github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/p2p"
)
@ -99,3 +100,23 @@ type NotificationServer interface {
// Stop stops notification sending loop, releasing related resources
Stop() error
}
// MessageState holds the current delivery status of a whisper p2p message.
type MessageState struct {
IsP2P bool `json:"is_p2p"`
Reason error `json:"reason"`
Envelope Envelope `json:"envelope"`
Timestamp time.Time `json:"timestamp"`
Source NewMessage `json:"source"`
Status message.Status `json:"status"`
Direction message.Direction `json:"direction"`
Received ReceivedMessage `json:"received"`
}
// DeliveryServer represents a small message status
// notification system where a message delivery status
// update event is delivered to it's underline system
// for both rpc messages and p2p messages.
type DeliveryServer interface {
SendState(MessageState)
}

View File

@ -18,10 +18,12 @@ package whisperv5
import (
"crypto/ecdsa"
"errors"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)
@ -115,15 +117,20 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
if match {
msg = env.Open(watcher)
if msg == nil {
err := errors.New("Envelope failed to be opened")
fs.whisper.traceIncomingDelivery(p2pMessage, message.RejectedStatus, nil, env, nil, err)
log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i)
}
} else {
err := errors.New("processing message: does not match")
fs.whisper.traceIncomingDelivery(p2pMessage, message.RejectedStatus, nil, env, nil, err)
log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i)
}
}
if match && msg != nil {
log.Trace("processing message: decrypted", "hash", env.Hash().Hex())
fs.whisper.traceIncomingDelivery(p2pMessage, message.DeliveredStatus, nil, env, msg, nil)
if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) {
watcher.Trigger(msg)
}

View File

@ -27,6 +27,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/message"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
@ -78,6 +79,7 @@ type Whisper struct {
stats Statistics // Statistics of whisper node
mailServer MailServer // MailServer interface
deliveryServer DeliveryServer // DeliveryServer interface
notificationServer NotificationServer
}
@ -157,6 +159,11 @@ func (w *Whisper) RegisterServer(server MailServer) {
w.mailServer = server
}
// RegisterDeliveryServer registers notification server with Whisper
func (w *Whisper) RegisterDeliveryServer(server DeliveryServer) {
w.deliveryServer = server
}
// RegisterNotificationServer registers notification server with Whisper
func (w *Whisper) RegisterNotificationServer(server NotificationServer) {
w.notificationServer = server
@ -620,8 +627,11 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
var envelope Envelope
if err := packet.Decode(&envelope); err != nil {
log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
wh.traceIncomingDelivery(true, message.RejectedStatus, nil, &envelope, nil, err)
return errors.New("invalid direct message")
}
wh.traceIncomingDelivery(true, message.SentStatus, nil, &envelope, nil, nil)
wh.postEvent(&envelope, true)
}
case p2pRequestCode:
@ -630,6 +640,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
var request Envelope
if err := packet.Decode(&request); err != nil {
log.Warn("failed to decode p2p request message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
wh.traceIncomingDelivery(true, message.RejectedStatus, nil, &request, nil, err)
return errors.New("invalid p2p request")
}
wh.mailServer.DeliverMail(p, &request)
@ -700,16 +711,22 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
if !wh.expirations[envelope.Expiry].Has(hash) {
wh.expirations[envelope.Expiry].Add(hash)
}
wh.traceIncomingDelivery(false, message.CachedStatus, nil, envelope, nil, nil)
}
wh.poolMu.Unlock()
if alreadyCached {
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
wh.traceIncomingDelivery(false, message.ResentStatus, nil, envelope, nil, nil)
} else {
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
wh.statsMu.Lock()
wh.stats.memoryUsed += envelope.size()
wh.statsMu.Unlock()
wh.traceIncomingDelivery(false, message.QueuedStatus, nil, envelope, nil, nil)
wh.postEvent(envelope, false) // notify the local node about the new message
if wh.mailServer != nil {
wh.mailServer.Archive(envelope)
@ -718,6 +735,47 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
return true, nil
}
func (w *Whisper) traceIncomingDelivery(isP2P bool, status message.Status, src *NewMessage, env *Envelope, rec *ReceivedMessage, err error) {
w.traceDelivery(isP2P, message.IncomingMessage, status, src, env, rec, err)
}
func (w *Whisper) traceOutgoingDelivery(isP2P bool, status message.Status, src *NewMessage, env *Envelope, rec *ReceivedMessage, err error) {
w.traceDelivery(isP2P, message.OutgoingMessage, status, src, env, rec, err)
}
func (w *Whisper) traceDelivery(isP2P bool, dir message.Direction, status message.Status, newmsg *NewMessage, envelope *Envelope, received *ReceivedMessage, err error) {
if w.deliveryServer == nil {
return
}
var env Envelope
var rec ReceivedMessage
var src NewMessage
if newmsg != nil {
src = *newmsg
}
if envelope != nil {
env = *envelope
}
if received != nil {
rec = *received
}
go w.deliveryServer.SendState(MessageState{
Reason: err,
Source: src,
Received: rec,
IsP2P: isP2P,
Status: status,
Envelope: env,
Direction: dir,
Timestamp: time.Now(),
})
}
// postEvent queues the message for further processing.
func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
// if the version of incoming message is higher than
@ -730,6 +788,13 @@ func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) {
w.checkOverflow()
w.messageQueue <- envelope
}
return
}
if w.deliveryServer != nil {
err := fmt.Errorf("Mismatch Envelope version(%d) to wanted Version(%d)", envelope.Ver(), EnvelopeVersion)
w.traceIncomingDelivery(isP2P, message.RejectedStatus, nil, envelope, nil, err)
}
}
@ -759,9 +824,11 @@ func (w *Whisper) processQueue() {
return
case e = <-w.messageQueue:
w.traceIncomingDelivery(false, message.ProcessingStatus, nil, e, nil, nil)
w.filters.NotifyWatchers(e, false)
case e = <-w.p2pMsgQueue:
w.traceIncomingDelivery(true, message.ProcessingStatus, nil, e, nil, nil)
w.filters.NotifyWatchers(e, true)
}
}