diff --git a/whisper/whisperv6/events.go b/whisper/whisperv6/events.go new file mode 100644 index 00000000..e03ec9de --- /dev/null +++ b/whisper/whisperv6/events.go @@ -0,0 +1,23 @@ +package whisperv6 + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +// EventType used to define known envelope events. +type EventType string + +const ( + // EventEnvelopeSent fires when envelope was sent to a peer. + EventEnvelopeSent EventType = "envelope.sent" + // EventEnvelopeExpired fires when envelop expired + EventEnvelopeExpired EventType = "envelope.expired" +) + +// EnvelopeEvent used for envelopes events. +type EnvelopeEvent struct { + Event EventType + Hash common.Hash + Peer discover.NodeID +} diff --git a/whisper/whisperv6/peer.go b/whisper/whisperv6/peer.go index 79cc2127..018d8f82 100644 --- a/whisper/whisperv6/peer.go +++ b/whisper/whisperv6/peer.go @@ -204,6 +204,11 @@ func (peer *Peer) broadcast() error { // mark envelopes only if they were successfully sent for _, e := range bundle { peer.mark(e) + peer.host.envelopeFeed.Send(EnvelopeEvent{ + Event: EventEnvelopeSent, + Hash: e.Hash(), + Peer: peer.peer.ID(), // specifically discover.NodeID because it can be pretty printed + }) } log.Trace("broadcast", "num. messages", len(bundle)) diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index 414aa788..3c3c66ad 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -29,6 +29,7 @@ import ( mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" @@ -89,6 +90,8 @@ type Whisper struct { mailServer MailServer // MailServer interface envelopeTracer EnvelopeTracer // Service collecting envelopes metadata + + envelopeFeed event.Feed } // New creates a Whisper client ready to communicate through the Ethereum P2P network. @@ -133,6 +136,12 @@ func New(cfg *Config) *Whisper { return whisper } +// SubscribeEnvelopeEvents subscribes to envelopes feed. +// In order to prevent blocking whisper producers events must be amply buffered. +func (whisper *Whisper) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription { + return whisper.envelopeFeed.Subscribe(events) +} + // MinPow returns the PoW value required by this node. func (whisper *Whisper) MinPow() float64 { val, exist := whisper.settings.Load(minPowIdx) @@ -986,6 +995,10 @@ func (whisper *Whisper) expire() { hashSet.Each(func(v interface{}) bool { sz := whisper.envelopes[v.(common.Hash)].size() delete(whisper.envelopes, v.(common.Hash)) + whisper.envelopeFeed.Send(EnvelopeEvent{ + Hash: v.(common.Hash), + Event: EventEnvelopeExpired, + }) whisper.stats.messagesCleared++ whisper.stats.memoryCleared += sz whisper.stats.memoryUsed -= sz