initialize tracer with peer ID, trace RPC from join/leave announcements
This commit is contained in:
parent
958e09a5b3
commit
fb11aa9857
|
@ -326,7 +326,7 @@ func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option {
|
||||||
// WithEventTracer provides a tracer for the pubsub system
|
// WithEventTracer provides a tracer for the pubsub system
|
||||||
func WithEventTracer(tracer EventTracer) Option {
|
func WithEventTracer(tracer EventTracer) Option {
|
||||||
return func(p *PubSub) error {
|
return func(p *PubSub) error {
|
||||||
p.tracer = &pubsubTracer{tracer: tracer}
|
p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID()}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -582,8 +582,10 @@ func (p *PubSub) announce(topic string, sub bool) {
|
||||||
for pid, peer := range p.peers {
|
for pid, peer := range p.peers {
|
||||||
select {
|
select {
|
||||||
case peer <- out:
|
case peer <- out:
|
||||||
|
p.tracer.SendRPC(out, pid)
|
||||||
default:
|
default:
|
||||||
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
|
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
|
||||||
|
p.tracer.DropRPC(out, pid)
|
||||||
go p.announceRetry(pid, topic, sub)
|
go p.announceRetry(pid, topic, sub)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -619,8 +621,10 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
|
||||||
out := rpcWithSubs(subopt)
|
out := rpcWithSubs(subopt)
|
||||||
select {
|
select {
|
||||||
case peer <- out:
|
case peer <- out:
|
||||||
|
p.tracer.SendRPC(out, pid)
|
||||||
default:
|
default:
|
||||||
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
|
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
|
||||||
|
p.tracer.DropRPC(out, pid)
|
||||||
go p.announceRetry(pid, topic, sub)
|
go p.announceRetry(pid, topic, sub)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue