From fb11aa9857a3dfcc8a623166d173e51e639d8249 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 11 Nov 2019 18:32:12 +0200 Subject: [PATCH] initialize tracer with peer ID, trace RPC from join/leave announcements --- pubsub.go | 6 +++++- trace.go | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pubsub.go b/pubsub.go index d8fc964..530c47a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -326,7 +326,7 @@ func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option { // WithEventTracer provides a tracer for the pubsub system func WithEventTracer(tracer EventTracer) Option { return func(p *PubSub) error { - p.tracer = &pubsubTracer{tracer: tracer} + p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID()} return nil } } @@ -582,8 +582,10 @@ func (p *PubSub) announce(topic string, sub bool) { for pid, peer := range p.peers { select { case peer <- out: + p.tracer.SendRPC(out, pid) default: log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) + p.tracer.DropRPC(out, pid) go p.announceRetry(pid, topic, sub) } } @@ -619,8 +621,10 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) { out := rpcWithSubs(subopt) select { case peer <- out: + p.tracer.SendRPC(out, pid) default: log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid) + p.tracer.DropRPC(out, pid) go p.announceRetry(pid, topic, sub) } } diff --git a/trace.go b/trace.go index fc24f76..99b4fa5 100644 --- a/trace.go +++ b/trace.go @@ -12,6 +12,7 @@ type EventTracer interface { type pubsubTracer struct { tracer EventTracer + pid peer.ID } func (t *pubsubTracer) PublishMessage(msg *Message) {