mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-08 15:53:07 +00:00
trace throttle peers to avoid breaking promises unfairly
This commit is contained in:
parent
854ca9a6ad
commit
93f4f91dcf
@ -70,6 +70,7 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
|
|||||||
var res map[peer.ID]int
|
var res map[peer.ID]int
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
// find broken promises from peers
|
||||||
for mid, peers := range gt.promises {
|
for mid, peers := range gt.promises {
|
||||||
for p, expire := range peers {
|
for p, expire := range peers {
|
||||||
if expire.Before(now) {
|
if expire.Before(now) {
|
||||||
@ -91,7 +92,7 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
|
|||||||
|
|
||||||
var _ internalTracer = (*gossipTracer)(nil)
|
var _ internalTracer = (*gossipTracer)(nil)
|
||||||
|
|
||||||
func (gt *gossipTracer) fullfillPromise(msg *Message) {
|
func (gt *gossipTracer) fulfillPromise(msg *Message) {
|
||||||
mid := gt.msgID(msg.Message)
|
mid := gt.msgID(msg.Message)
|
||||||
|
|
||||||
gt.Lock()
|
gt.Lock()
|
||||||
@ -101,12 +102,12 @@ func (gt *gossipTracer) fullfillPromise(msg *Message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (gt *gossipTracer) DeliverMessage(msg *Message) {
|
func (gt *gossipTracer) DeliverMessage(msg *Message) {
|
||||||
// someone delivered a message, fullfill promises for it
|
// someone delivered a message, fulfill promises for it
|
||||||
gt.fullfillPromise(msg)
|
gt.fulfillPromise(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
|
func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
|
||||||
// A message got rejected, so we can fullfill promises and let the score penalty apply
|
// A message got rejected, so we can fulfill promises and let the score penalty apply
|
||||||
// from invalid message delivery.
|
// from invalid message delivery.
|
||||||
// We do take exception and apply promise penalty regardless in the following cases, where
|
// We do take exception and apply promise penalty regardless in the following cases, where
|
||||||
// the peer delivered an obviously invalid message.
|
// the peer delivered an obviously invalid message.
|
||||||
@ -117,14 +118,14 @@ func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
gt.fullfillPromise(msg)
|
gt.fulfillPromise(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gt *gossipTracer) ValidateMessage(msg *Message) {
|
func (gt *gossipTracer) ValidateMessage(msg *Message) {
|
||||||
// we consider the promise fullfilled as soon as the message begins validation
|
// we consider the promise fulfilled as soon as the message begins validation
|
||||||
// if it was a case of signature issue it would have been rejected immediately
|
// if it was a case of signature issue it would have been rejected immediately
|
||||||
// without triggering the Validate trace
|
// without triggering the Validate trace
|
||||||
gt.fullfillPromise(msg)
|
gt.fulfillPromise(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {}
|
func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {}
|
||||||
@ -134,3 +135,19 @@ func (gt *gossipTracer) Leave(topic string) {}
|
|||||||
func (gt *gossipTracer) Graft(p peer.ID, topic string) {}
|
func (gt *gossipTracer) Graft(p peer.ID, topic string) {}
|
||||||
func (gt *gossipTracer) Prune(p peer.ID, topic string) {}
|
func (gt *gossipTracer) Prune(p peer.ID, topic string) {}
|
||||||
func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
|
func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
|
||||||
|
|
||||||
|
func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
|
||||||
|
gt.Lock()
|
||||||
|
defer gt.Unlock()
|
||||||
|
|
||||||
|
// remove promises for peers that have been throttled so that they are not unfairly penalized
|
||||||
|
for mid, peers := range gt.promises {
|
||||||
|
_, hasPromise := peers[p]
|
||||||
|
if hasPromise {
|
||||||
|
delete(peers, p)
|
||||||
|
if len(peers) == 0 {
|
||||||
|
delete(gt.promises, mid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -233,3 +233,4 @@ func (pg *peerGater) RejectMessage(msg *Message, reason string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pg *peerGater) DuplicateMessage(msg *Message) {}
|
func (pg *peerGater) DuplicateMessage(msg *Message) {}
|
||||||
|
func (pg *peerGater) ThrottlePeer(p peer.ID) {}
|
||||||
|
|||||||
@ -943,6 +943,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
|||||||
if len(rpc.GetPublish()) > 0 {
|
if len(rpc.GetPublish()) > 0 {
|
||||||
log.Debugf("peer %s was throttled by router; ignoring %d payload messages", rpc.from, len(rpc.GetPublish()))
|
log.Debugf("peer %s was throttled by router; ignoring %d payload messages", rpc.from, len(rpc.GetPublish()))
|
||||||
}
|
}
|
||||||
|
p.tracer.ThrottlePeer(rpc.from)
|
||||||
|
|
||||||
case AcceptAll:
|
case AcceptAll:
|
||||||
for _, pmsg := range rpc.GetPublish() {
|
for _, pmsg := range rpc.GetPublish() {
|
||||||
|
|||||||
2
score.go
2
score.go
@ -749,6 +749,8 @@ func (ps *peerScore) DuplicateMessage(msg *Message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ps *peerScore) ThrottlePeer(p peer.ID) {}
|
||||||
|
|
||||||
// message delivery records
|
// message delivery records
|
||||||
func (d *messageDeliveries) getRecord(id string) *deliveryRecord {
|
func (d *messageDeliveries) getRecord(id string) *deliveryRecord {
|
||||||
rec, ok := d.records[id]
|
rec, ok := d.records[id]
|
||||||
|
|||||||
@ -252,4 +252,5 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tagTracer) RemovePeer(peer.ID) {}
|
func (t *tagTracer) RemovePeer(peer.ID) {}
|
||||||
|
func (gt *tagTracer) ThrottlePeer(p peer.ID) {}
|
||||||
|
|||||||
11
trace.go
11
trace.go
@ -26,6 +26,7 @@ type internalTracer interface {
|
|||||||
DeliverMessage(msg *Message)
|
DeliverMessage(msg *Message)
|
||||||
RejectMessage(msg *Message, reason string)
|
RejectMessage(msg *Message, reason string)
|
||||||
DuplicateMessage(msg *Message)
|
DuplicateMessage(msg *Message)
|
||||||
|
ThrottlePeer(p peer.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// pubsub tracer details
|
// pubsub tracer details
|
||||||
@ -464,3 +465,13 @@ func (t *pubsubTracer) Prune(p peer.ID, topic string) {
|
|||||||
|
|
||||||
t.tracer.Trace(evt)
|
t.tracer.Trace(evt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *pubsubTracer) ThrottlePeer(p peer.ID) {
|
||||||
|
if t == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tr := range t.internal {
|
||||||
|
tr.ThrottlePeer(p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user