expose internalTracer as RawTracer

This commit is contained in:
vyzo 2021-03-10 13:39:56 +02:00
parent 05c505ef60
commit 5457a2845b
7 changed files with 58 additions and 36 deletions

View File

@ -111,7 +111,7 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
return res
}
var _ internalTracer = (*gossipTracer)(nil)
var _ RawTracer = (*gossipTracer)(nil)
func (gt *gossipTracer) fulfillPromise(msg *Message) {
mid := gt.msgID(msg.Message)

View File

@ -196,8 +196,8 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
tagTracer: newTagTracer(h.ConnManager()),
}
// use the withInternalTracer option to hook up the tag tracer
opts = append(opts, withInternalTracer(rt.tagTracer))
// hook the tag tracer
opts = append(opts, WithRawTracer(rt.tagTracer))
return NewPubSub(ctx, h, rt, opts...)
}
@ -232,12 +232,12 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt
// hook the tracer
if ps.tracer != nil {
ps.tracer.internal = append(ps.tracer.internal, gs.score, gs.gossipTracer)
ps.tracer.raw = append(ps.tracer.raw, gs.score, gs.gossipTracer)
} else {
ps.tracer = &pubsubTracer{
internal: []internalTracer{gs.score, gs.gossipTracer},
pid: ps.host.ID(),
msgID: ps.msgID,
raw: []RawTracer{gs.score, gs.gossipTracer},
pid: ps.host.ID(),
msgID: ps.msgID,
}
}

View File

@ -177,12 +177,12 @@ func WithPeerGater(params *PeerGaterParams) Option {
// hook the tracer
if ps.tracer != nil {
ps.tracer.internal = append(ps.tracer.internal, gs.gate)
ps.tracer.raw = append(ps.tracer.raw, gs.gate)
} else {
ps.tracer = &pubsubTracer{
internal: []internalTracer{gs.gate},
pid: ps.host.ID(),
msgID: ps.msgID,
raw: []RawTracer{gs.gate},
pid: ps.host.ID(),
msgID: ps.msgID,
}
}

View File

@ -430,13 +430,14 @@ func WithEventTracer(tracer EventTracer) Option {
}
}
// withInternalTracer adds an internal event tracer to the pubsub system
func withInternalTracer(tracer internalTracer) Option {
// WithRawTracer adds a raw tracer to the pubsub system.
// Multiple tracers can be added using multiple invocations of the option.
func WithRawTracer(tracer RawTracer) Option {
return func(p *PubSub) error {
if p.tracer != nil {
p.tracer.internal = append(p.tracer.internal, tracer)
p.tracer.raw = append(p.tracer.raw, tracer)
} else {
p.tracer = &pubsubTracer{internal: []internalTracer{tracer}, pid: p.host.ID(), msgID: p.msgID}
p.tracer = &pubsubTracer{raw: []RawTracer{tracer}, pid: p.host.ID(), msgID: p.msgID}
}
return nil
}

View File

@ -85,7 +85,7 @@ type peerScore struct {
inspectPeriod time.Duration
}
var _ internalTracer = (*peerScore)(nil)
var _ RawTracer = (*peerScore)(nil)
type messageDeliveries struct {
records map[string]*deliveryRecord

View File

@ -173,8 +173,8 @@ func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID {
return peers
}
// -- internalTracer interface methods
var _ internalTracer = (*tagTracer)(nil)
// -- RawTracer interface methods
var _ RawTracer = (*tagTracer)(nil)
func (t *tagTracer) AddPeer(p peer.ID, proto protocol.ID) {
t.tagPeerIfDirect(p)

View File

@ -9,32 +9,53 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
// Generic event tracer interface
// EventTracer is a generic event tracer interface.
// This is a high level tracing interface which delivers tracing events, as defined by the protobuf
// schema in pb/trace.proto.
type EventTracer interface {
Trace(evt *pb.TraceEvent)
}
// internal interface for score tracing
type internalTracer interface {
// RawTracer is a low level tracing interace that allows an application to trace the internal
// operation of the pubsub subsystem.
//
// Note that the tracers are invoked synchronously, which means that application tracers must
// take care to not block or modify arguments.
//
// Warning: this interface is not fixed, we may be adding new methods as necessitated by the system
// in the future.
type RawTracer interface {
// AddPeer is invoked when a new peer is added.
AddPeer(p peer.ID, proto protocol.ID)
// RemovePeer is invoked when a peer is removed.
RemovePeer(p peer.ID)
// Join is invoked when a new topic is joined
Join(topic string)
// Leave is invoked when a topic is abandoned
Leave(topic string)
// Graft is invoked when a new peer is grafted on the mesh (gossipsub)
Graft(p peer.ID, topic string)
// Prune is invoked when a peer is pruned from the message (gossipsub)
Prune(p peer.ID, topic string)
// ValidateMessage is invoked when a message first enters the validation pipeline.
ValidateMessage(msg *Message)
// DeliverMessage is invoked when a message is delivered
DeliverMessage(msg *Message)
// RejectMessage is invoked when a message is Rejected or Ignored.
// The reason argument can be one of the named strings Reject*.
RejectMessage(msg *Message, reason string)
// DuplicateMessage is invoked when a duplicate message is dropped.
DuplicateMessage(msg *Message)
// ThrottlePeer is invoked when a peer is throttled by the peer gater.
ThrottlePeer(p peer.ID)
}
// pubsub tracer details
type pubsubTracer struct {
tracer EventTracer
internal []internalTracer
pid peer.ID
msgID MsgIdFunction
tracer EventTracer
raw []RawTracer
pid peer.ID
msgID MsgIdFunction
}
func (t *pubsubTracer) PublishMessage(msg *Message) {
@ -66,7 +87,7 @@ func (t *pubsubTracer) ValidateMessage(msg *Message) {
}
if msg.ReceivedFrom != t.pid {
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.ValidateMessage(msg)
}
}
@ -78,7 +99,7 @@ func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
}
if msg.ReceivedFrom != t.pid {
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.RejectMessage(msg, reason)
}
}
@ -109,7 +130,7 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) {
}
if msg.ReceivedFrom != t.pid {
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.DuplicateMessage(msg)
}
}
@ -139,7 +160,7 @@ func (t *pubsubTracer) DeliverMessage(msg *Message) {
}
if msg.ReceivedFrom != t.pid {
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.DeliverMessage(msg)
}
}
@ -168,7 +189,7 @@ func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {
return
}
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.AddPeer(p, proto)
}
@ -196,7 +217,7 @@ func (t *pubsubTracer) RemovePeer(p peer.ID) {
return
}
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.RemovePeer(p)
}
@ -366,7 +387,7 @@ func (t *pubsubTracer) Join(topic string) {
return
}
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.Join(topic)
}
@ -392,7 +413,7 @@ func (t *pubsubTracer) Leave(topic string) {
return
}
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.Leave(topic)
}
@ -418,7 +439,7 @@ func (t *pubsubTracer) Graft(p peer.ID, topic string) {
return
}
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.Graft(p, topic)
}
@ -445,7 +466,7 @@ func (t *pubsubTracer) Prune(p peer.ID, topic string) {
return
}
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.Prune(p, topic)
}
@ -472,7 +493,7 @@ func (t *pubsubTracer) ThrottlePeer(p peer.ID) {
return
}
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.ThrottlePeer(p)
}
}