diff --git a/floodsub.go b/floodsub.go index f70a0a7..63b4491 100644 --- a/floodsub.go +++ b/floodsub.go @@ -31,6 +31,7 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err type FloodSubRouter struct { p *PubSub protocols []protocol.ID + tracer *pubsubTracer } func (fs *FloodSubRouter) Protocols() []protocol.ID { @@ -39,11 +40,18 @@ func (fs *FloodSubRouter) Protocols() []protocol.ID { func (fs *FloodSubRouter) Attach(p *PubSub) { fs.p = p + if p.tracer != nil { + fs.tracer = p.tracer + } } -func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {} +func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) { + fs.tracer.AddPeer(p, proto) +} -func (fs *FloodSubRouter) RemovePeer(peer.ID) {} +func (fs *FloodSubRouter) RemovePeer(p peer.ID) { + fs.tracer.RemovePeer(p) +} func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool { // check all peers in the topic @@ -91,13 +99,19 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { select { case mch <- out: + fs.tracer.SendRPC(out, pid) default: log.Infof("dropping message to peer %s: queue full", pid) + fs.tracer.DropRPC(out, pid) // Drop it. The peer is too slow. } } } -func (fs *FloodSubRouter) Join(topic string) {} +func (fs *FloodSubRouter) Join(topic string) { + fs.tracer.Join(topic) +} -func (fs *FloodSubRouter) Leave(topic string) {} +func (fs *FloodSubRouter) Leave(topic string) { + fs.tracer.Join(topic) +} diff --git a/randomsub.go b/randomsub.go index 9435c71..700e420 100644 --- a/randomsub.go +++ b/randomsub.go @@ -29,8 +29,9 @@ func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er // RandomSubRouter is a router that implements a random propagation strategy. // For each message, it selects RandomSubD peers and forwards the message to them. type RandomSubRouter struct { - p *PubSub - peers map[peer.ID]protocol.ID + p *PubSub + peers map[peer.ID]protocol.ID + tracer *pubsubTracer } func (rs *RandomSubRouter) Protocols() []protocol.ID { @@ -39,13 +40,18 @@ func (rs *RandomSubRouter) Protocols() []protocol.ID { func (rs *RandomSubRouter) Attach(p *PubSub) { rs.p = p + if p.tracer != nil { + rs.tracer = p.tracer + } } func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID) { + rs.tracer.AddPeer(p, proto) rs.peers[p] = proto } func (rs *RandomSubRouter) RemovePeer(p peer.ID) { + rs.tracer.RemovePeer(p) delete(rs.peers, p) } @@ -132,12 +138,18 @@ func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) { select { case mch <- out: + rs.tracer.SendRPC(out, p) default: log.Infof("dropping message to peer %s: queue full", p) + rs.tracer.DropRPC(out, p) } } } -func (rs *RandomSubRouter) Join(topic string) {} +func (rs *RandomSubRouter) Join(topic string) { + rs.tracer.Join(topic) +} -func (rs *RandomSubRouter) Leave(topic string) {} +func (rs *RandomSubRouter) Leave(topic string) { + rs.tracer.Join(topic) +}