diff --git a/gossipsub.go b/gossipsub.go index 1b2026c..e65f782 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -65,6 +65,7 @@ type GossipSubRouter struct { gossip map[peer.ID][]*pb.ControlIHave // pending gossip control map[peer.ID]*pb.ControlMessage // pending control messages mcache *MessageCache + tracer *pubsubTracer } func (gs *GossipSubRouter) Protocols() []protocol.ID { @@ -73,16 +74,21 @@ func (gs *GossipSubRouter) Protocols() []protocol.ID { func (gs *GossipSubRouter) Attach(p *PubSub) { gs.p = p + if p.tracer != nil { + gs.tracer = p.tracer + } go gs.heartbeatTimer() } func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { log.Debugf("PEERUP: Add new peer %s using %s", p, proto) + gs.tracer.AddPeer(p, proto) gs.peers[p] = proto } func (gs *GossipSubRouter) RemovePeer(p peer.ID) { log.Debugf("PEERDOWN: Remove disconnected peer %s", p) + gs.tracer.RemovePeer(p) delete(gs.peers, p) for _, peers := range gs.mesh { delete(peers, p) @@ -208,6 +214,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. prune = append(prune, topic) } else { log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic) + gs.tracer.Graft(p, topic) peers[p] = struct{}{} gs.tagPeer(p, topic) } @@ -231,6 +238,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { peers, ok := gs.mesh[topic] if ok { log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic) + gs.tracer.Prune(p, topic) delete(peers, p) gs.untagPeer(p, topic) } @@ -294,6 +302,7 @@ func (gs *GossipSubRouter) Join(topic string) { } log.Debugf("JOIN %s", topic) + gs.tracer.Join(topic) gmap, ok = gs.fanout[topic] if ok { @@ -319,6 +328,7 @@ func (gs *GossipSubRouter) Join(topic string) { for p := range gmap { log.Debugf("JOIN: Add mesh link to %s in %s", p, topic) + gs.tracer.Graft(p, topic) gs.sendGraft(p, topic) gs.tagPeer(p, topic) } @@ -331,11 +341,13 @@ func (gs *GossipSubRouter) Leave(topic string) { } log.Debugf("LEAVE %s", topic) + gs.tracer.Leave(topic) delete(gs.mesh, topic) for p := range gmap { log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic) + gs.tracer.Prune(p, topic) gs.sendPrune(p, topic) gs.untagPeer(p, topic) } @@ -384,8 +396,10 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { select { case mch <- out: + gs.tracer.SendRPC(out, p) default: log.Infof("dropping message to peer %s: queue full", p) + gs.tracer.DropRPC(out, p) // push control messages that need to be retried ctl := out.GetControl() if ctl != nil { @@ -443,6 +457,7 @@ func (gs *GossipSubRouter) heartbeat() { for _, p := range plst { log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic) + gs.tracer.Graft(p, topic) peers[p] = struct{}{} gs.tagPeer(p, topic) topics := tograft[p] @@ -458,6 +473,7 @@ func (gs *GossipSubRouter) heartbeat() { for _, p := range plst[:idontneed] { log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic) + gs.tracer.Prune(p, topic) delete(peers, p) gs.untagPeer(p, topic) topics := toprune[p] diff --git a/pubsub.go b/pubsub.go index e46f4df..7237259 100644 --- a/pubsub.go +++ b/pubsub.go @@ -46,6 +46,8 @@ type PubSub struct { disc *discover + tracer *pubsubTracer + // size of the outbound message channel that we maintain for each peer peerOutboundQueueSize int @@ -321,6 +323,14 @@ func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option { } } +// WithEventTracer provides a tracer for the pubsub system +func WithEventTracer(tracer EventTracer) Option { + return func(p *PubSub) error { + p.tracer = &pubsubTracer{tracer: tracer} + return nil + } +} + // processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { defer func() { @@ -671,6 +681,8 @@ func (p *PubSub) notifyLeave(topic string, pid peer.ID) { } func (p *PubSub) handleIncomingRPC(rpc *RPC) { + p.tracer.RecvRPC(rpc) + for _, subopt := range rpc.GetSubscriptions() { t := subopt.GetTopicid() if subopt.GetSubscribe() { @@ -724,24 +736,28 @@ func (p *PubSub) pushMsg(msg *Message) { // reject messages from blacklisted peers if p.blacklist.Contains(src) { log.Warningf("dropping message from blacklisted peer %s", src) + p.tracer.RejectMessage(msg, "blacklisted peer") return } // even if they are forwarded by good peers if p.blacklist.Contains(msg.GetFrom()) { log.Warningf("dropping message from blacklisted source %s", src) + p.tracer.RejectMessage(msg, "blacklisted source") return } // reject unsigned messages when strict before we even process the id if p.signStrict && msg.Signature == nil { log.Debugf("dropping unsigned message from %s", src) + p.tracer.RejectMessage(msg, "missing signature") return } // have we already seen and validated this message? id := msgID(msg.Message) if p.seenMessage(id) { + p.tracer.DuplicateMessage(msg) return } @@ -755,6 +771,7 @@ func (p *PubSub) pushMsg(msg *Message) { } func (p *PubSub) publishMessage(msg *Message) { + p.tracer.DeliverMessage(msg) p.notifySubs(msg) p.rt.Publish(msg.ReceivedFrom, msg.Message) } diff --git a/trace.go b/trace.go new file mode 100644 index 0000000..08ec219 --- /dev/null +++ b/trace.go @@ -0,0 +1,87 @@ +package pubsub + +import ( + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) + +// Generic event tracer interface +type EventTracer interface { + Trace(evt interface{}) +} + +type pubsubTracer struct { + tracer EventTracer +} + +func (t *pubsubTracer) RejectMessage(msg *Message, reason string) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) DuplicateMessage(msg *Message) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) DeliverMessage(msg *Message) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) RemovePeer(p peer.ID) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) RecvRPC(rpc *RPC) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) Join(topic string) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) Leave(topic string) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) Graft(p peer.ID, topic string) { + if t != nil { + // TODO + } +} + +func (t *pubsubTracer) Prune(p peer.ID, topic string) { + if t != nil { + // TODO + } +} diff --git a/validation.go b/validation.go index 9253f2a..164bfa9 100644 --- a/validation.go +++ b/validation.go @@ -31,6 +31,8 @@ type ValidatorOpt func(addVal *addValReq) error type validation struct { p *PubSub + tracer *pubsubTracer + // topicVals tracks per topic validators topicVals map[string]*topicVal @@ -90,6 +92,9 @@ func newValidation() *validation { // workers func (v *validation) Start(p *PubSub) { v.p = p + if p.tracer != nil { + v.tracer = p.tracer + } for i := 0; i < v.validateWorkers; i++ { go v.validateWorker() } @@ -148,6 +153,7 @@ func (v *validation) Push(src peer.ID, msg *Message) bool { case v.validateQ <- &validateReq{vals, src, msg}: default: log.Warningf("message validation throttled; dropping message from %s", src) + v.tracer.RejectMessage(msg, "validation throttled") } return false } @@ -190,6 +196,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) { if msg.Signature != nil { if !v.validateSignature(msg) { log.Warningf("message signature validation failed; dropping message from %s", src) + v.tracer.RejectMessage(msg, "invalid signature") return } } @@ -198,6 +205,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) { // and avoid invoking user validators more than once id := msgID(msg.Message) if !v.p.markSeen(id) { + v.tracer.DuplicateMessage(msg) return } @@ -214,6 +222,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) { for _, val := range inline { if !val.validateMsg(v.p.ctx, src, msg) { log.Debugf("message validation failed; dropping message from %s", src) + v.tracer.RejectMessage(msg, "validation failed") return } } @@ -228,6 +237,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) { }() default: log.Warningf("message validation throttled; dropping message from %s", src) + v.tracer.RejectMessage(msg, "validation throttled") } return } @@ -249,6 +259,7 @@ func (v *validation) validateSignature(msg *Message) bool { func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) { if !v.validateTopic(vals, src, msg) { log.Warningf("message validation failed; dropping message from %s", src) + v.tracer.RejectMessage(msg, "validation failed") return } @@ -286,6 +297,7 @@ loop: } if throttle { + v.tracer.RejectMessage(msg, "validation throttled") return false } @@ -310,6 +322,7 @@ func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Messag default: log.Debugf("validation throttled for topic %s", val.topic) + v.tracer.RejectMessage(msg, "validation throttled") return false } }