add tracing to floodsub/randomsub

This commit is contained in:
vyzo 2019-11-04 20:43:48 +02:00
parent 89c7ed46e3
commit fd73973145
2 changed files with 34 additions and 8 deletions

View File

@ -31,6 +31,7 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err
type FloodSubRouter struct { type FloodSubRouter struct {
p *PubSub p *PubSub
protocols []protocol.ID protocols []protocol.ID
tracer *pubsubTracer
} }
func (fs *FloodSubRouter) Protocols() []protocol.ID { func (fs *FloodSubRouter) Protocols() []protocol.ID {
@ -39,11 +40,18 @@ func (fs *FloodSubRouter) Protocols() []protocol.ID {
func (fs *FloodSubRouter) Attach(p *PubSub) { func (fs *FloodSubRouter) Attach(p *PubSub) {
fs.p = p fs.p = p
if p.tracer != nil {
fs.tracer = p.tracer
}
} }
func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {} func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
fs.tracer.AddPeer(p, proto)
}
func (fs *FloodSubRouter) RemovePeer(peer.ID) {} func (fs *FloodSubRouter) RemovePeer(p peer.ID) {
fs.tracer.RemovePeer(p)
}
func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool { func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic // check all peers in the topic
@ -91,13 +99,19 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
select { select {
case mch <- out: case mch <- out:
fs.tracer.SendRPC(out, pid)
default: default:
log.Infof("dropping message to peer %s: queue full", pid) log.Infof("dropping message to peer %s: queue full", pid)
fs.tracer.DropRPC(out, pid)
// Drop it. The peer is too slow. // Drop it. The peer is too slow.
} }
} }
} }
func (fs *FloodSubRouter) Join(topic string) {} func (fs *FloodSubRouter) Join(topic string) {
fs.tracer.Join(topic)
}
func (fs *FloodSubRouter) Leave(topic string) {} func (fs *FloodSubRouter) Leave(topic string) {
fs.tracer.Join(topic)
}

View File

@ -29,8 +29,9 @@ func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
// RandomSubRouter is a router that implements a random propagation strategy. // RandomSubRouter is a router that implements a random propagation strategy.
// For each message, it selects RandomSubD peers and forwards the message to them. // For each message, it selects RandomSubD peers and forwards the message to them.
type RandomSubRouter struct { type RandomSubRouter struct {
p *PubSub p *PubSub
peers map[peer.ID]protocol.ID peers map[peer.ID]protocol.ID
tracer *pubsubTracer
} }
func (rs *RandomSubRouter) Protocols() []protocol.ID { func (rs *RandomSubRouter) Protocols() []protocol.ID {
@ -39,13 +40,18 @@ func (rs *RandomSubRouter) Protocols() []protocol.ID {
func (rs *RandomSubRouter) Attach(p *PubSub) { func (rs *RandomSubRouter) Attach(p *PubSub) {
rs.p = p rs.p = p
if p.tracer != nil {
rs.tracer = p.tracer
}
} }
func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID) { func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
rs.tracer.AddPeer(p, proto)
rs.peers[p] = proto rs.peers[p] = proto
} }
func (rs *RandomSubRouter) RemovePeer(p peer.ID) { func (rs *RandomSubRouter) RemovePeer(p peer.ID) {
rs.tracer.RemovePeer(p)
delete(rs.peers, p) delete(rs.peers, p)
} }
@ -132,12 +138,18 @@ func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) {
select { select {
case mch <- out: case mch <- out:
rs.tracer.SendRPC(out, p)
default: default:
log.Infof("dropping message to peer %s: queue full", p) log.Infof("dropping message to peer %s: queue full", p)
rs.tracer.DropRPC(out, p)
} }
} }
} }
func (rs *RandomSubRouter) Join(topic string) {} func (rs *RandomSubRouter) Join(topic string) {
rs.tracer.Join(topic)
}
func (rs *RandomSubRouter) Leave(topic string) {} func (rs *RandomSubRouter) Leave(topic string) {
rs.tracer.Join(topic)
}