diff --git a/whisper/whisperv5/doc.go b/whisper/whisperv5/doc.go index a6c9e610d..eb2b75210 100644 --- a/whisper/whisperv5/doc.go +++ b/whisper/whisperv5/doc.go @@ -85,3 +85,40 @@ type MailServer interface { Archive(env *Envelope) DeliverMail(whisperPeer *Peer, request *Envelope) } + +type envelopeSource int + +const ( + _ = iota + // peerSource indicates a source as a regular peer. + peerSource envelopeSource = iota + // p2pSource indicates that envelop was received from a trusted peer. + p2pSource +) + +// EnvelopeMeta keeps metadata of received envelopes. +type EnvelopeMeta struct { + Hash string + Topic TopicType + Size uint32 + Source envelopeSource + IsNew bool + Peer string +} + +// SourceString converts source to string. +func (m *EnvelopeMeta) SourceString() string { + switch m.Source { + case peerSource: + return "peer" + case p2pSource: + return "p2p" + default: + return "unknown" + } +} + +// EnvelopeTracer tracks received envelopes. +type EnvelopeTracer interface { + Trace(*EnvelopeMeta) +} diff --git a/whisper/whisperv5/whisper.go b/whisper/whisperv5/whisper.go index c39e8b3e0..631676328 100644 --- a/whisper/whisperv5/whisper.go +++ b/whisper/whisperv5/whisper.go @@ -77,7 +77,8 @@ type Whisper struct { statsMu sync.Mutex // guard stats stats Statistics // Statistics of whisper node - mailServer MailServer // MailServer interface + mailServer MailServer // MailServer interface + envelopeTracer EnvelopeTracer // Service collecting envelopes metadata } // New creates a Whisper client ready to communicate through the Ethereum P2P network. @@ -156,6 +157,12 @@ func (w *Whisper) RegisterServer(server MailServer) { w.mailServer = server } +// RegisterEnvelopeTracer registers an EnveloperTracer to collect information +// about received envelopes. +func (w *Whisper) RegisterEnvelopeTracer(tracer EnvelopeTracer) { + w.envelopeTracer = tracer +} + // Protocols returns the whisper sub-protocols ran by this particular client. func (w *Whisper) Protocols() []p2p.Protocol { return []p2p.Protocol{w.protocol} @@ -584,6 +591,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { log.Warn("failed to decode envelope, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid envelope") } + wh.traceEnvelope(&envelope, !wh.isEnvelopeCached(envelope.Hash()), peerSource, p) cached, err := wh.add(&envelope) if err != nil { log.Warn("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) @@ -604,6 +612,7 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { return errors.New("invalid direct message") } wh.postEvent(&envelope, true) + wh.traceEnvelope(&envelope, false, p2pSource, p) } case p2pRequestCode: // Must be processed if mail server is implemented. Otherwise ignore. @@ -699,6 +708,22 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) { return true, nil } +// traceEnvelope collects basic metadata about an envelope and sender peer. +func (w *Whisper) traceEnvelope(envelope *Envelope, isNew bool, source envelopeSource, peer *Peer) { + if w.envelopeTracer == nil { + return + } + + w.envelopeTracer.Trace(&EnvelopeMeta{ + Hash: envelope.Hash().String(), + Topic: BytesToTopic(envelope.Topic[:]), + Size: uint32(envelope.size()), + Source: source, + IsNew: isNew, + Peer: peer.peer.Info().ID, + }) +} + // postEvent queues the message for further processing. func (w *Whisper) postEvent(envelope *Envelope, isP2P bool) { // if the version of incoming message is higher than