mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-08 07:43:11 +00:00
rename RawTracer's DroppedInSubscribe into UndeliverableMessage
This commit is contained in:
parent
9be1c593c6
commit
65f48fb2c8
@ -162,7 +162,7 @@ func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
|
|||||||
func (gt *gossipTracer) RecvRPC(rpc *RPC) {}
|
func (gt *gossipTracer) RecvRPC(rpc *RPC) {}
|
||||||
func (gt *gossipTracer) SendRPC(rpc *RPC, p peer.ID) {}
|
func (gt *gossipTracer) SendRPC(rpc *RPC, p peer.ID) {}
|
||||||
func (gt *gossipTracer) DropRPC(rpc *RPC, p peer.ID) {}
|
func (gt *gossipTracer) DropRPC(rpc *RPC, p peer.ID) {}
|
||||||
func (gt *gossipTracer) DroppedInSubscribe(msg *Message) {}
|
func (gt *gossipTracer) UndeliverableMessage(msg *Message) {}
|
||||||
|
|
||||||
func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
|
func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
|
||||||
gt.Lock()
|
gt.Lock()
|
||||||
|
|||||||
@ -450,4 +450,4 @@ func (pg *peerGater) SendRPC(rpc *RPC, p peer.ID) {}
|
|||||||
|
|
||||||
func (pg *peerGater) DropRPC(rpc *RPC, p peer.ID) {}
|
func (pg *peerGater) DropRPC(rpc *RPC, p peer.ID) {}
|
||||||
|
|
||||||
func (pg *peerGater) DroppedInSubscribe(msg *Message) {}
|
func (pg *peerGater) UndeliverableMessage(msg *Message) {}
|
||||||
|
|||||||
@ -842,7 +842,7 @@ func (p *PubSub) notifySubs(msg *Message) {
|
|||||||
select {
|
select {
|
||||||
case f.ch <- msg:
|
case f.ch <- msg:
|
||||||
default:
|
default:
|
||||||
p.tracer.DroppedInSubscribe(msg)
|
p.tracer.UndeliverableMessage(msg)
|
||||||
log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
|
log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
2
score.go
2
score.go
@ -825,7 +825,7 @@ func (ps *peerScore) SendRPC(rpc *RPC, p peer.ID) {}
|
|||||||
|
|
||||||
func (ps *peerScore) DropRPC(rpc *RPC, p peer.ID) {}
|
func (ps *peerScore) DropRPC(rpc *RPC, p peer.ID) {}
|
||||||
|
|
||||||
func (ps *peerScore) DroppedInSubscribe(msg *Message) {}
|
func (ps *peerScore) UndeliverableMessage(msg *Message) {}
|
||||||
|
|
||||||
// message delivery records
|
// message delivery records
|
||||||
func (d *messageDeliveries) getRecord(id string) *deliveryRecord {
|
func (d *messageDeliveries) getRecord(id string) *deliveryRecord {
|
||||||
|
|||||||
@ -256,4 +256,4 @@ func (t *tagTracer) ThrottlePeer(p peer.ID) {}
|
|||||||
func (t *tagTracer) RecvRPC(rpc *RPC) {}
|
func (t *tagTracer) RecvRPC(rpc *RPC) {}
|
||||||
func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {}
|
func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {}
|
||||||
func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {}
|
func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {}
|
||||||
func (t *tagTracer) DroppedInSubscribe(msg *Message) {}
|
func (t *tagTracer) UndeliverableMessage(msg *Message) {}
|
||||||
|
|||||||
8
trace.go
8
trace.go
@ -54,9 +54,9 @@ type RawTracer interface {
|
|||||||
SendRPC(rpc *RPC, p peer.ID)
|
SendRPC(rpc *RPC, p peer.ID)
|
||||||
// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
|
// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
|
||||||
DropRPC(rpc *RPC, p peer.ID)
|
DropRPC(rpc *RPC, p peer.ID)
|
||||||
// DroppedInSubscribe is invoked when the consumer of Subscribe is not reading messages fast enough and
|
// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
|
||||||
// the pressure release mechanism trigger, dropping messages.
|
// the pressure release mechanism trigger, dropping messages.
|
||||||
DroppedInSubscribe(msg *Message)
|
UndeliverableMessage(msg *Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
// pubsub tracer details
|
// pubsub tracer details
|
||||||
@ -328,13 +328,13 @@ func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) {
|
|||||||
t.tracer.Trace(evt)
|
t.tracer.Trace(evt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *pubsubTracer) DroppedInSubscribe(msg *Message) {
|
func (t *pubsubTracer) UndeliverableMessage(msg *Message) {
|
||||||
if t == nil {
|
if t == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tr := range t.raw {
|
for _, tr := range t.raw {
|
||||||
tr.DroppedInSubscribe(msg)
|
tr.UndeliverableMessage(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user