gossip tracer preliminaries

This commit is contained in:
vyzo 2020-05-07 21:26:33 +03:00
parent d3ae6ab3a7
commit 4394e52a6f
2 changed files with 86 additions and 5 deletions

59
gossip_tracer.go Normal file
View File

@ -0,0 +1,59 @@
package pubsub
import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
// gossipTracer is an internal tracer that tracks IWANT requests in order to penalize
// peers who don't follow up on IWANT requests after an IHAVE advertisement.
// The tracking of promises is probabilistic to avoid using too much memory.
type gossipTracer struct {
msgID MsgIdFunction
}
func newGossipTracer() *gossipTracer {
return &gossipTracer{
msgID: DefaultMsgIdFn,
}
}
func (gt *gossipTracer) Start(gs *GossipSubRouter) {
if gt == nil {
return
}
gt.msgID = gs.p.msgID
}
func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) {
if gt == nil {
return
}
// TODO
}
func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
// TODO
return nil
}
var _ internalTracer = (*gossipTracer)(nil)
func (gt *gossipTracer) DeliverMessage(msg *Message) {
// TODO
}
func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
// TODO
}
func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {}
func (gt *gossipTracer) RemovePeer(p peer.ID) {}
func (gt *gossipTracer) Join(topic string) {}
func (gt *gossipTracer) Leave(topic string) {}
func (gt *gossipTracer) Graft(p peer.ID, topic string) {}
func (gt *gossipTracer) Prune(p peer.ID, topic string) {}
func (gt *gossipTracer) ValidateMessage(msg *Message) {}
func (gt *gossipTracer) DuplicateMessage(msg *Message) {}

View File

@ -147,11 +147,17 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt
gs.acceptPXThreshold = thresholds.AcceptPXThreshold
gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold
gs.gossipTracer = newGossipTracer()
// hook the tracer
if ps.tracer != nil {
ps.tracer.internal = append(ps.tracer.internal, gs.score)
ps.tracer.internal = append(ps.tracer.internal, gs.score, gs.gossipTracer)
} else {
ps.tracer = &pubsubTracer{internal: []internalTracer{gs.score}, pid: ps.host.ID(), msgID: ps.msgID}
ps.tracer = &pubsubTracer{
internal: []internalTracer{gs.score, gs.gossipTracer},
pid: ps.host.ID(),
msgID: ps.msgID,
}
}
return nil
@ -234,9 +240,11 @@ type GossipSubRouter struct {
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
backoff map[string]map[peer.ID]time.Time // prune backoff
connect chan connectInfo // px connection requests
mcache *MessageCache
tracer *pubsubTracer
score *peerScore
mcache *MessageCache
tracer *pubsubTracer
score *peerScore
gossipTracer *gossipTracer
// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
// nodes.
@ -298,6 +306,9 @@ func (gs *GossipSubRouter) Attach(p *PubSub) {
// start the scoring
gs.score.Start(gs)
// and the gossip tracing
gs.gossipTracer.Start(gs)
// start using the same msg ID function as PubSub for caching messages.
gs.mcache.SetMsgIdFn(p.msgID)
@ -460,6 +471,8 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
iwantlst = iwantlst[:iask]
gs.iasked[p] += iask
gs.gossipTracer.AddPromise(p, iwantlst)
return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}}
}
@ -1091,6 +1104,9 @@ func (gs *GossipSubRouter) heartbeat() {
// clean up iasked counters
gs.clearIHaveCounters()
// apply IWANT request penalties
gs.applyIwantPenalties()
// ensure direct peers are connected
gs.directConnect()
@ -1273,6 +1289,12 @@ func (gs *GossipSubRouter) clearIHaveCounters() {
}
}
func (gs *GossipSubRouter) applyIwantPenalties() {
for p, count := range gs.gossipTracer.GetBrokenPromises() {
gs.score.AddPenalty(p, count)
}
}
func (gs *GossipSubRouter) clearBackoff() {
// we only clear once every 15 ticks to avoid iterating over the map(s) too much
if gs.heartbeatTicks%15 != 0 {