From 93f4f91dcfae1fc39f09f1b451f149ad094e6e1c Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 2 Sep 2020 21:32:10 +0300 Subject: [PATCH] trace throttle peers to avoid breaking promises unfairly --- gossip_tracer.go | 31 ++++++++++++++++++++++++------- peer_gater.go | 1 + pubsub.go | 1 + score.go | 2 ++ tag_tracer.go | 3 ++- trace.go | 11 +++++++++++ 6 files changed, 41 insertions(+), 8 deletions(-) diff --git a/gossip_tracer.go b/gossip_tracer.go index 42f7f5f..fd0d160 100644 --- a/gossip_tracer.go +++ b/gossip_tracer.go @@ -70,6 +70,7 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int { var res map[peer.ID]int now := time.Now() + // find broken promises from peers for mid, peers := range gt.promises { for p, expire := range peers { if expire.Before(now) { @@ -91,7 +92,7 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int { var _ internalTracer = (*gossipTracer)(nil) -func (gt *gossipTracer) fullfillPromise(msg *Message) { +func (gt *gossipTracer) fulfillPromise(msg *Message) { mid := gt.msgID(msg.Message) gt.Lock() @@ -101,12 +102,12 @@ func (gt *gossipTracer) fullfillPromise(msg *Message) { } func (gt *gossipTracer) DeliverMessage(msg *Message) { - // someone delivered a message, fullfill promises for it - gt.fullfillPromise(msg) + // someone delivered a message, fulfill promises for it + gt.fulfillPromise(msg) } func (gt *gossipTracer) RejectMessage(msg *Message, reason string) { - // A message got rejected, so we can fullfill promises and let the score penalty apply + // A message got rejected, so we can fulfill promises and let the score penalty apply // from invalid message delivery. // We do take exception and apply promise penalty regardless in the following cases, where // the peer delivered an obviously invalid message. @@ -117,14 +118,14 @@ func (gt *gossipTracer) RejectMessage(msg *Message, reason string) { return } - gt.fullfillPromise(msg) + gt.fulfillPromise(msg) } func (gt *gossipTracer) ValidateMessage(msg *Message) { - // we consider the promise fullfilled as soon as the message begins validation + // we consider the promise fulfilled as soon as the message begins validation // if it was a case of signature issue it would have been rejected immediately // without triggering the Validate trace - gt.fullfillPromise(msg) + gt.fulfillPromise(msg) } func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {} @@ -134,3 +135,19 @@ func (gt *gossipTracer) Leave(topic string) {} func (gt *gossipTracer) Graft(p peer.ID, topic string) {} func (gt *gossipTracer) Prune(p peer.ID, topic string) {} func (gt *gossipTracer) DuplicateMessage(msg *Message) {} + +func (gt *gossipTracer) ThrottlePeer(p peer.ID) { + gt.Lock() + defer gt.Unlock() + + // remove promises for peers that have been throttled so that they are not unfairly penalized + for mid, peers := range gt.promises { + _, hasPromise := peers[p] + if hasPromise { + delete(peers, p) + if len(peers) == 0 { + delete(gt.promises, mid) + } + } + } +} diff --git a/peer_gater.go b/peer_gater.go index 92531e9..231b950 100644 --- a/peer_gater.go +++ b/peer_gater.go @@ -233,3 +233,4 @@ func (pg *peerGater) RejectMessage(msg *Message, reason string) { } func (pg *peerGater) DuplicateMessage(msg *Message) {} +func (pg *peerGater) ThrottlePeer(p peer.ID) {} diff --git a/pubsub.go b/pubsub.go index 5d4f775..6cab8d4 100644 --- a/pubsub.go +++ b/pubsub.go @@ -943,6 +943,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { if len(rpc.GetPublish()) > 0 { log.Debugf("peer %s was throttled by router; ignoring %d payload messages", rpc.from, len(rpc.GetPublish())) } + p.tracer.ThrottlePeer(rpc.from) case AcceptAll: for _, pmsg := range rpc.GetPublish() { diff --git a/score.go b/score.go index 92e3610..c5f97d6 100644 --- a/score.go +++ b/score.go @@ -749,6 +749,8 @@ func (ps *peerScore) DuplicateMessage(msg *Message) { } } +func (ps *peerScore) ThrottlePeer(p peer.ID) {} + // message delivery records func (d *messageDeliveries) getRecord(id string) *deliveryRecord { rec, ok := d.records[id] diff --git a/tag_tracer.go b/tag_tracer.go index 9591939..e2922e9 100644 --- a/tag_tracer.go +++ b/tag_tracer.go @@ -252,4 +252,5 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) { } } -func (t *tagTracer) RemovePeer(peer.ID) {} +func (t *tagTracer) RemovePeer(peer.ID) {} +func (gt *tagTracer) ThrottlePeer(p peer.ID) {} diff --git a/trace.go b/trace.go index f7b4d5a..df6740e 100644 --- a/trace.go +++ b/trace.go @@ -26,6 +26,7 @@ type internalTracer interface { DeliverMessage(msg *Message) RejectMessage(msg *Message, reason string) DuplicateMessage(msg *Message) + ThrottlePeer(p peer.ID) } // pubsub tracer details @@ -464,3 +465,13 @@ func (t *pubsubTracer) Prune(p peer.ID, topic string) { t.tracer.Trace(evt) } + +func (t *pubsubTracer) ThrottlePeer(p peer.ID) { + if t == nil { + return + } + + for _, tr := range t.internal { + tr.ThrottlePeer(p) + } +}