diff --git a/pubsub.go b/pubsub.go index d8fc964..530c47a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -326,7 +326,7 @@ func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option { // WithEventTracer provides a tracer for the pubsub system func WithEventTracer(tracer EventTracer) Option { return func(p *PubSub) error { - p.tracer = &pubsubTracer{tracer: tracer} + p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID()} return nil } } @@ -582,8 +582,10 @@ func (p *PubSub) announce(topic string, sub bool) { for pid, peer := range p.peers { select { case peer <- out: + p.tracer.SendRPC(out, pid) default: log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) + p.tracer.DropRPC(out, pid) go p.announceRetry(pid, topic, sub) } } @@ -619,8 +621,10 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) { out := rpcWithSubs(subopt) select { case peer <- out: + p.tracer.SendRPC(out, pid) default: log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) + p.tracer.DropRPC(out, pid) go p.announceRetry(pid, topic, sub) } } diff --git a/trace.go b/trace.go index fc24f76..99b4fa5 100644 --- a/trace.go +++ b/trace.go @@ -12,6 +12,7 @@ type EventTracer interface { type pubsubTracer struct { tracer EventTracer + pid peer.ID } func (t *pubsubTracer) PublishMessage(msg *Message) {