From fd739731453637c8f53addf523ab044c66a9f209 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 4 Nov 2019 20:43:48 +0200 Subject: [PATCH] add tracing to floodsub/randomsub --- floodsub.go | 22 ++++++++++++++++++---- randomsub.go | 20 ++++++++++++++++---- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/floodsub.go b/floodsub.go index f70a0a7..63b4491 100644 --- a/floodsub.go +++ b/floodsub.go @@ -31,6 +31,7 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err type FloodSubRouter struct { p *PubSub protocols []protocol.ID + tracer *pubsubTracer } func (fs *FloodSubRouter) Protocols() []protocol.ID { @@ -39,11 +40,18 @@ func (fs *FloodSubRouter) Protocols() []protocol.ID { func (fs *FloodSubRouter) Attach(p *PubSub) { fs.p = p + if p.tracer != nil { + fs.tracer = p.tracer + } } -func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {} +func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) { + fs.tracer.AddPeer(p, proto) +} -func (fs *FloodSubRouter) RemovePeer(peer.ID) {} +func (fs *FloodSubRouter) RemovePeer(p peer.ID) { + fs.tracer.RemovePeer(p) +} func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool { // check all peers in the topic @@ -91,13 +99,19 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { select { case mch <- out: + fs.tracer.SendRPC(out, pid) default: log.Infof("dropping message to peer %s: queue full", pid) + fs.tracer.DropRPC(out, pid) // Drop it. The peer is too slow. } } } -func (fs *FloodSubRouter) Join(topic string) {} +func (fs *FloodSubRouter) Join(topic string) { + fs.tracer.Join(topic) +} -func (fs *FloodSubRouter) Leave(topic string) {} +func (fs *FloodSubRouter) Leave(topic string) { + fs.tracer.Join(topic) +} diff --git a/randomsub.go b/randomsub.go index 9435c71..700e420 100644 --- a/randomsub.go +++ b/randomsub.go @@ -29,8 +29,9 @@ func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er // RandomSubRouter is a router that implements a random propagation strategy. // For each message, it selects RandomSubD peers and forwards the message to them. type RandomSubRouter struct { - p *PubSub - peers map[peer.ID]protocol.ID + p *PubSub + peers map[peer.ID]protocol.ID + tracer *pubsubTracer } func (rs *RandomSubRouter) Protocols() []protocol.ID { @@ -39,13 +40,18 @@ func (rs *RandomSubRouter) Protocols() []protocol.ID { func (rs *RandomSubRouter) Attach(p *PubSub) { rs.p = p + if p.tracer != nil { + rs.tracer = p.tracer + } } func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID) { + rs.tracer.AddPeer(p, proto) rs.peers[p] = proto } func (rs *RandomSubRouter) RemovePeer(p peer.ID) { + rs.tracer.RemovePeer(p) delete(rs.peers, p) } @@ -132,12 +138,18 @@ func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) { select { case mch <- out: + rs.tracer.SendRPC(out, p) default: log.Infof("dropping message to peer %s: queue full", p) + rs.tracer.DropRPC(out, p) } } } -func (rs *RandomSubRouter) Join(topic string) {} +func (rs *RandomSubRouter) Join(topic string) { + rs.tracer.Join(topic) +} -func (rs *RandomSubRouter) Leave(topic string) {} +func (rs *RandomSubRouter) Leave(topic string) { + rs.tracer.Join(topic) +}