diff --git a/gossip_tracer.go b/gossip_tracer.go index 6db3fd1..a55f38b 100644 --- a/gossip_tracer.go +++ b/gossip_tracer.go @@ -111,7 +111,7 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int { return res } -var _ internalTracer = (*gossipTracer)(nil) +var _ RawTracer = (*gossipTracer)(nil) func (gt *gossipTracer) fulfillPromise(msg *Message) { mid := gt.msgID(msg.Message) diff --git a/gossipsub.go b/gossipsub.go index e3c9809..837ed15 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -196,8 +196,8 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er tagTracer: newTagTracer(h.ConnManager()), } - // use the withInternalTracer option to hook up the tag tracer - opts = append(opts, withInternalTracer(rt.tagTracer)) + // hook the tag tracer + opts = append(opts, WithRawTracer(rt.tagTracer)) return NewPubSub(ctx, h, rt, opts...) } @@ -232,12 +232,12 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt // hook the tracer if ps.tracer != nil { - ps.tracer.internal = append(ps.tracer.internal, gs.score, gs.gossipTracer) + ps.tracer.raw = append(ps.tracer.raw, gs.score, gs.gossipTracer) } else { ps.tracer = &pubsubTracer{ - internal: []internalTracer{gs.score, gs.gossipTracer}, - pid: ps.host.ID(), - msgID: ps.msgID, + raw: []RawTracer{gs.score, gs.gossipTracer}, + pid: ps.host.ID(), + msgID: ps.msgID, } } diff --git a/peer_gater.go b/peer_gater.go index 0a00ded..e6d6bc1 100644 --- a/peer_gater.go +++ b/peer_gater.go @@ -177,12 +177,12 @@ func WithPeerGater(params *PeerGaterParams) Option { // hook the tracer if ps.tracer != nil { - ps.tracer.internal = append(ps.tracer.internal, gs.gate) + ps.tracer.raw = append(ps.tracer.raw, gs.gate) } else { ps.tracer = &pubsubTracer{ - internal: []internalTracer{gs.gate}, - pid: ps.host.ID(), - msgID: ps.msgID, + raw: []RawTracer{gs.gate}, + pid: ps.host.ID(), + msgID: ps.msgID, } } diff --git a/pubsub.go b/pubsub.go index 7ec045a..dd05864 100644 --- a/pubsub.go +++ b/pubsub.go @@ -430,13 +430,14 @@ func WithEventTracer(tracer EventTracer) Option { } } -// withInternalTracer adds an internal event tracer to the pubsub system -func withInternalTracer(tracer internalTracer) Option { +// WithRawTracer adds a raw tracer to the pubsub system. +// Multiple tracers can be added using multiple invocations of the option. +func WithRawTracer(tracer RawTracer) Option { return func(p *PubSub) error { if p.tracer != nil { - p.tracer.internal = append(p.tracer.internal, tracer) + p.tracer.raw = append(p.tracer.raw, tracer) } else { - p.tracer = &pubsubTracer{internal: []internalTracer{tracer}, pid: p.host.ID(), msgID: p.msgID} + p.tracer = &pubsubTracer{raw: []RawTracer{tracer}, pid: p.host.ID(), msgID: p.msgID} } return nil } diff --git a/score.go b/score.go index e02339b..53bb2a5 100644 --- a/score.go +++ b/score.go @@ -85,7 +85,7 @@ type peerScore struct { inspectPeriod time.Duration } -var _ internalTracer = (*peerScore)(nil) +var _ RawTracer = (*peerScore)(nil) type messageDeliveries struct { records map[string]*deliveryRecord diff --git a/tag_tracer.go b/tag_tracer.go index bd6d5e2..2d0f19c 100644 --- a/tag_tracer.go +++ b/tag_tracer.go @@ -173,8 +173,8 @@ func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID { return peers } -// -- internalTracer interface methods -var _ internalTracer = (*tagTracer)(nil) +// -- RawTracer interface methods +var _ RawTracer = (*tagTracer)(nil) func (t *tagTracer) AddPeer(p peer.ID, proto protocol.ID) { t.tagPeerIfDirect(p) diff --git a/trace.go b/trace.go index 313a356..3ee4bb6 100644 --- a/trace.go +++ b/trace.go @@ -9,32 +9,53 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" ) -// Generic event tracer interface +// EventTracer is a generic event tracer interface. +// This is a high level tracing interface which delivers tracing events, as defined by the protobuf +// schema in pb/trace.proto. type EventTracer interface { Trace(evt *pb.TraceEvent) } -// internal interface for score tracing -type internalTracer interface { +// RawTracer is a low level tracing interace that allows an application to trace the internal +// operation of the pubsub subsystem. +// +// Note that the tracers are invoked synchronously, which means that application tracers must +// take care to not block or modify arguments. +// +// Warning: this interface is not fixed, we may be adding new methods as necessitated by the system +// in the future. +type RawTracer interface { + // AddPeer is invoked when a new peer is added. AddPeer(p peer.ID, proto protocol.ID) + // RemovePeer is invoked when a peer is removed. RemovePeer(p peer.ID) + // Join is invoked when a new topic is joined Join(topic string) + // Leave is invoked when a topic is abandoned Leave(topic string) + // Graft is invoked when a new peer is grafted on the mesh (gossipsub) Graft(p peer.ID, topic string) + // Prune is invoked when a peer is pruned from the message (gossipsub) Prune(p peer.ID, topic string) + // ValidateMessage is invoked when a message first enters the validation pipeline. ValidateMessage(msg *Message) + // DeliverMessage is invoked when a message is delivered DeliverMessage(msg *Message) + // RejectMessage is invoked when a message is Rejected or Ignored. + // The reason argument can be one of the named strings Reject*. RejectMessage(msg *Message, reason string) + // DuplicateMessage is invoked when a duplicate message is dropped. DuplicateMessage(msg *Message) + // ThrottlePeer is invoked when a peer is throttled by the peer gater. ThrottlePeer(p peer.ID) } // pubsub tracer details type pubsubTracer struct { - tracer EventTracer - internal []internalTracer - pid peer.ID - msgID MsgIdFunction + tracer EventTracer + raw []RawTracer + pid peer.ID + msgID MsgIdFunction } func (t *pubsubTracer) PublishMessage(msg *Message) { @@ -66,7 +87,7 @@ func (t *pubsubTracer) ValidateMessage(msg *Message) { } if msg.ReceivedFrom != t.pid { - for _, tr := range t.internal { + for _, tr := range t.raw { tr.ValidateMessage(msg) } } @@ -78,7 +99,7 @@ func (t *pubsubTracer) RejectMessage(msg *Message, reason string) { } if msg.ReceivedFrom != t.pid { - for _, tr := range t.internal { + for _, tr := range t.raw { tr.RejectMessage(msg, reason) } } @@ -109,7 +130,7 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) { } if msg.ReceivedFrom != t.pid { - for _, tr := range t.internal { + for _, tr := range t.raw { tr.DuplicateMessage(msg) } } @@ -139,7 +160,7 @@ func (t *pubsubTracer) DeliverMessage(msg *Message) { } if msg.ReceivedFrom != t.pid { - for _, tr := range t.internal { + for _, tr := range t.raw { tr.DeliverMessage(msg) } } @@ -168,7 +189,7 @@ func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) { return } - for _, tr := range t.internal { + for _, tr := range t.raw { tr.AddPeer(p, proto) } @@ -196,7 +217,7 @@ func (t *pubsubTracer) RemovePeer(p peer.ID) { return } - for _, tr := range t.internal { + for _, tr := range t.raw { tr.RemovePeer(p) } @@ -366,7 +387,7 @@ func (t *pubsubTracer) Join(topic string) { return } - for _, tr := range t.internal { + for _, tr := range t.raw { tr.Join(topic) } @@ -392,7 +413,7 @@ func (t *pubsubTracer) Leave(topic string) { return } - for _, tr := range t.internal { + for _, tr := range t.raw { tr.Leave(topic) } @@ -418,7 +439,7 @@ func (t *pubsubTracer) Graft(p peer.ID, topic string) { return } - for _, tr := range t.internal { + for _, tr := range t.raw { tr.Graft(p, topic) } @@ -445,7 +466,7 @@ func (t *pubsubTracer) Prune(p peer.ID, topic string) { return } - for _, tr := range t.internal { + for _, tr := range t.raw { tr.Prune(p, topic) } @@ -472,7 +493,7 @@ func (t *pubsubTracer) ThrottlePeer(p peer.ID) { return } - for _, tr := range t.internal { + for _, tr := range t.raw { tr.ThrottlePeer(p) } }