mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 21:03:07 +00:00
add a new RawTracer event to track messages dropped in Subscribe
This commit is contained in:
parent
3c7689d482
commit
9be1c593c6
@ -162,6 +162,7 @@ func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
|
||||
func (gt *gossipTracer) RecvRPC(rpc *RPC) {}
|
||||
func (gt *gossipTracer) SendRPC(rpc *RPC, p peer.ID) {}
|
||||
func (gt *gossipTracer) DropRPC(rpc *RPC, p peer.ID) {}
|
||||
func (gt *gossipTracer) DroppedInSubscribe(msg *Message) {}
|
||||
|
||||
func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
|
||||
gt.Lock()
|
||||
|
||||
@ -449,3 +449,5 @@ func (pg *peerGater) RecvRPC(rpc *RPC) {}
|
||||
func (pg *peerGater) SendRPC(rpc *RPC, p peer.ID) {}
|
||||
|
||||
func (pg *peerGater) DropRPC(rpc *RPC, p peer.ID) {}
|
||||
|
||||
func (pg *peerGater) DroppedInSubscribe(msg *Message) {}
|
||||
|
||||
@ -842,6 +842,7 @@ func (p *PubSub) notifySubs(msg *Message) {
|
||||
select {
|
||||
case f.ch <- msg:
|
||||
default:
|
||||
p.tracer.DroppedInSubscribe(msg)
|
||||
log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
|
||||
}
|
||||
}
|
||||
|
||||
2
score.go
2
score.go
@ -825,6 +825,8 @@ func (ps *peerScore) SendRPC(rpc *RPC, p peer.ID) {}
|
||||
|
||||
func (ps *peerScore) DropRPC(rpc *RPC, p peer.ID) {}
|
||||
|
||||
func (ps *peerScore) DroppedInSubscribe(msg *Message) {}
|
||||
|
||||
// message delivery records
|
||||
func (d *messageDeliveries) getRecord(id string) *deliveryRecord {
|
||||
rec, ok := d.records[id]
|
||||
|
||||
@ -251,8 +251,9 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tagTracer) RemovePeer(peer.ID) {}
|
||||
func (t *tagTracer) ThrottlePeer(p peer.ID) {}
|
||||
func (t *tagTracer) RecvRPC(rpc *RPC) {}
|
||||
func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {}
|
||||
func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {}
|
||||
func (t *tagTracer) RemovePeer(peer.ID) {}
|
||||
func (t *tagTracer) ThrottlePeer(p peer.ID) {}
|
||||
func (t *tagTracer) RecvRPC(rpc *RPC) {}
|
||||
func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {}
|
||||
func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {}
|
||||
func (t *tagTracer) DroppedInSubscribe(msg *Message) {}
|
||||
|
||||
15
trace.go
15
trace.go
@ -16,7 +16,7 @@ type EventTracer interface {
|
||||
Trace(evt *pb.TraceEvent)
|
||||
}
|
||||
|
||||
// RawTracer is a low level tracing interace that allows an application to trace the internal
|
||||
// RawTracer is a low level tracing interface that allows an application to trace the internal
|
||||
// operation of the pubsub subsystem.
|
||||
//
|
||||
// Note that the tracers are invoked synchronously, which means that application tracers must
|
||||
@ -54,6 +54,9 @@ type RawTracer interface {
|
||||
SendRPC(rpc *RPC, p peer.ID)
|
||||
// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
|
||||
DropRPC(rpc *RPC, p peer.ID)
|
||||
// DroppedInSubscribe is invoked when the consumer of Subscribe is not reading messages fast enough and
|
||||
// the pressure release mechanism trigger, dropping messages.
|
||||
DroppedInSubscribe(msg *Message)
|
||||
}
|
||||
|
||||
// pubsub tracer details
|
||||
@ -325,6 +328,16 @@ func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) {
|
||||
t.tracer.Trace(evt)
|
||||
}
|
||||
|
||||
func (t *pubsubTracer) DroppedInSubscribe(msg *Message) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, tr := range t.raw {
|
||||
tr.DroppedInSubscribe(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta {
|
||||
rpcMeta := new(pb.TraceEvent_RPCMeta)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user